(.module: {#.doc "The actor model of concurrency."} [lux #* [control monad ["p" parser] ["ex" exception (#+ exception:)]] ["." io (#- run) ("io/." Monad)] [data ["." product] ["e" error] [text format] [collection ["." list ("list/." Monoid Monad Fold)]]] ["." macro (#+ with-gensyms Monad) ["." code] ["s" syntax (#+ syntax: Syntax)] [syntax ["cs" common] [common ["csr" reader] ["csw" writer]]]] [type abstract]] [// ["." atom (#+ Atom atom)] ["." promise (#+ Promise promise) ("promise/." Monad)] ["." task (#+ Task)]]) (exception: #export poisoned) (exception: #export (dead {actor-name Text} {message-name Text}) (ex.report ["Actor" actor-name] ["Message" message-name])) ## [Types] (with-expansions [ (as-is (-> s (Actor s) (Task s))) (as-is [Text s (List )]) (as-is (Rec Mailbox (Promise [ Mailbox])))] (def: (obituary mailbox) (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) (case (promise.poll mailbox) (#.Some [head tail]) (#.Cons head (obituary tail)) #.None #.Nil)) (abstract: #export (Actor s) {#.doc "An actor, defined as all the necessities it requires."} {#mailbox (Atom ) #obituary (Promise )} ## TODO: Delete after new-luxc becomes the new standard compiler. (def: (actor mailbox obituary) (All [s] (-> (Atom ) (Promise ) (Actor s))) (:abstraction {#mailbox mailbox #obituary obituary})) (type: #export (Message s) ) (type: #export (Obituary s) ) (type: #export (Behavior s) {#.doc "An actor's behavior when messages are received."} {#handle (-> (Message s) s (Actor s) (Task s)) #end (-> Text s (Promise Any))}) (def: #export (spawn behavior init) {#.doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s] (-> (Behavior s) s (IO (Actor s)))) (io (let [[handle end] behavior self (actor (atom (promise #.None)) (promise #.None)) process (loop [state init |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))] (do promise.Monad [[head tail] |mailbox| ?state' (handle head state self)] (case ?state' (#e.Error error) (do @ [_ (end error state)] (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] (get@ #obituary (:representation self)))) (wrap []))) (#e.Success state') (recur state' tail))))] self))) (def: #export (alive? actor) (All [s] (-> (Actor s) Bit)) (case (promise.poll (get@ #obituary (:representation actor))) #.None #1 _ #0)) (def: #export (send message actor) {#.doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bit))) (if (alive? actor) (let [entry [message (promise #.None)]] (do Monad [|mailbox| (atom.read (get@ #mailbox (:representation actor)))] (loop [|mailbox| |mailbox|] (case (promise.poll |mailbox|) #.None (do @ [resolved? (promise.resolve entry |mailbox|)] (if resolved? (do @ [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] (wrap #1)) (recur |mailbox|))) (#.Some [_ |mailbox|']) (recur |mailbox|'))))) (io/wrap #0))) )) ## [Values] (def: (default-handle message state self) (All [s] (-> (Message s) s (Actor s) (Task s))) (message state self)) (def: (default-end cause state) (All [s] (-> Text s (Promise Any))) (promise/wrap [])) (def: #export default-behavior (All [s] (Behavior s)) {#handle default-handle #end default-end}) (def: #export (poison actor) {#.doc (doc "Kills the actor by sending a message that will kill it upon processing," "but allows the actor to handle previous messages.")} (All [s] (-> (Actor s) (IO Bit))) (send (function (_ state self) (task.throw poisoned [])) actor)) ## [Syntax] (do-template [ ] [(def: #export ( name) (-> Name cs.Annotations cs.Annotations) (|>> (#.Cons [(name-of ) (code.tag name)]))) (def: #export ( name) (-> Name (Meta Name)) (do Monad [[_ annotations _] (macro.find-def name)] (case (macro.get-tag-ann (name-of ) annotations) (#.Some actor-name) (wrap actor-name) _ (macro.fail (format "Definition is not " ".")))))] [with-actor resolve-actor #..actor "an actor"] [with-message resolve-message #..message "a message"] ) (def: actor-decl^ (Syntax [Text (List Text)]) (p.either (s.form (p.and s.local-identifier (p.some s.local-identifier))) (p.and s.local-identifier (:: p.Monad wrap (list))))) (do-template [ ] [(def: #export (-> Text Text) (|>> (format "@")))] [state-name "State"] [behavior-name "Behavior"] [new-name "new"] ) (type: HandleC [[Text Text Text] Code]) (type: StopC [[Text Text] Code]) (type: BehaviorC [(Maybe HandleC) (Maybe StopC)]) (def: behavior^ (s.Syntax BehaviorC) (let [handle-args ($_ p.and s.local-identifier s.local-identifier s.local-identifier) stop-args ($_ p.and s.local-identifier s.local-identifier)] (p.and (p.maybe (s.form (p.and (s.form (p.after (s.this (' handle)) handle-args)) s.any))) (p.maybe (s.form (p.and (s.form (p.after (s.this (' stop)) stop-args)) s.any)))))) (syntax: #export (actor: {export csr.export} {[_name _vars] actor-decl^} {annotations (p.default cs.empty-annotations csr.annotations)} state-type {[?handle ?stop] behavior^}) {#.doc (doc "Defines an actor, with its behavior and internal state." (actor: #export Counter Nat ((stop cause state) (:: promise.Monad wrap (log! (if (ex.match? ..poisoned cause) (format "Counter was poisoned: " (%n state)) cause))))) (actor: #export (Stack a) (List a) ((handle message state self) (do task.Monad [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] (wrap output)))))} (with-gensyms [g!_ g!init] (do @ [module macro.current-module-name #let [g!type (code.local-identifier (state-name _name)) g!behavior (code.local-identifier (behavior-name _name)) g!actor (code.local-identifier _name) g!new (code.local-identifier (new-name _name)) g!vars (list/map code.local-identifier _vars)]] (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars)) (~ state-type))) (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars)) (~ (|> annotations (with-actor [module _name]) csw.annotations)) (..Actor ((~ g!type) (~+ g!vars))))) (` (def: (~+ (csw.export export)) (~ g!behavior) (All [(~+ g!vars)] (..Behavior ((~ g!type) (~+ g!vars)))) {#..handle (~ (case ?handle #.None (` (~! ..default-handle)) (#.Some [[messageN stateN selfN] bodyC]) (` (function ((~ g!_) (~ (code.local-identifier messageN)) (~ (code.local-identifier stateN)) (~ (code.local-identifier selfN))) (do task.Monad [] (~ bodyC)))))) #..end (~ (case ?stop #.None (` (~! ..default-end)) (#.Some [[causeN stateN] bodyC]) (` (function ((~ g!_) (~ (code.local-identifier causeN)) (~ (code.local-identifier stateN))) (do promise.Monad [] (~ bodyC))))))})) (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) (All [(~+ g!vars)] (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) (..spawn (~ g!behavior) (~ g!init)))))) ))) (type: Signature {#vars (List Text) #name Text #inputs (List cs.Typed-Input) #state Text #self Text #output Code}) (def: signature^ (s.Syntax Signature) (s.form ($_ p.and (p.default (list) (s.tuple (p.some s.local-identifier))) s.local-identifier (p.some csr.typed-input) s.local-identifier s.local-identifier s.any))) (def: reference^ (s.Syntax [Name (List Text)]) (p.either (s.form (p.and s.identifier (p.some s.local-identifier))) (p.and s.identifier (:: p.Monad wrap (list))))) (syntax: #export (message: {export csr.export} {[actor-name actor-vars] reference^} {signature signature^} {annotations (p.default cs.empty-annotations csr.annotations)} body) {#.doc (doc "A message can access the actor's state through the state parameter." "A message can also access the actor itself through the self parameter." "A message's output must be a task containing a 2-tuple with the updated state and a return value." "A message may succeed or fail (in case of failure, the actor dies)." (message: #export Counter (count! [increment Nat] state self Nat) (let [state' (n/+ increment state)] (task.return [state' state']))) (message: #export (Stack a) (push [value a] state self (List a)) (let [state' (#.Cons value state)] (task.return [state' state']))))} (with-gensyms [g!_ g!return g!error g!task g!sent?] (do @ [current-module macro.current-module-name actor-name (resolve-actor actor-name) #let [message-name [current-module (get@ #name signature)] g!type (code.identifier (product.both id state-name actor-name)) g!message (code.local-identifier (get@ #name signature)) g!actor-vars (list/map code.local-identifier actor-vars) actorC (` ((~ (code.identifier actor-name)) (~+ g!actor-vars))) g!all-vars (|> (get@ #vars signature) (list/map code.local-identifier) (list/compose g!actor-vars)) g!inputsC (|> (get@ #inputs signature) (list/map product.left)) g!inputsT (|> (get@ #inputs signature) (list/map product.right)) g!state (|> signature (get@ #state) code.local-identifier) g!self (|> signature (get@ #self) code.local-identifier) g!actor-refs (: (List Code) (if (list.empty? actor-vars) (list) (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`)))))) ref-replacements (|> (if (list.empty? actor-vars) (list) (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`))))) (: (List Code)) (list.zip2 g!all-vars) (: (List [Code Code]))) g!outputT (list/fold (function (_ [g!var g!ref] outputT) (code.replace g!var g!ref outputT)) (get@ #output signature) ref-replacements)]] (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self)) (~ (|> annotations (with-message actor-name) csw.annotations)) (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) (let [(~ g!task) (task.task (~ g!outputT))] (io.run (do io.Monad [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) (do promise.Monad [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) (~ g!outputT)]) (do task.Monad [] (~ body)))] (case (~ g!return) (#.Right [(~ g!state) (~ g!return)]) (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) (task.return (~ g!state))) (#.Left (~ g!error)) (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) (task.fail (~ g!error)))) )) (~ g!self))] (if (~ g!sent?) ((~' wrap) (~ g!task)) ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name))) (~ (code.text (%name message-name)))])))))))) )) )))