(.module: {#.doc "The actor model of concurrency."} lux (lux (control monad ["p" parser] ["ex" exception #+ exception:]) [io #- run "io/" Monad] (data [product] ["e" error] text/format (coll [list "list/" Monoid Monad Fold])) [macro #+ with-gensyms Monad] (macro [code] ["s" syntax #+ syntax: Syntax] (syntax ["cs" common] (common ["csr" reader] ["csw" writer]))) (type abstract)) (// [atom #+ Atom atom] [promise #+ Promise promise "promise/" Monad] [task #+ Task])) (exception: #export Poisoned) (exception: #export Dead) ## [Types] (with-expansions [ (as-is (-> s (Actor s) (Task s))) (as-is [Text s (List )]) (as-is (Rec Mailbox (Promise [ Mailbox])))] (def: (obituary mailbox) (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) (case (promise.poll mailbox) (#.Some [head tail]) (#.Cons head (obituary tail)) #.None #.Nil)) (abstract: #export (Actor s) {#.doc "An actor, defined as all the necessities it requires."} {#mailbox (Atom ) #obituary (Promise )} ## TODO: Delete after new-luxc becomes the new standard compiler. (def: (actor mailbox obituary) (All [s] (-> (Atom ) (Promise ) (Actor s))) (@abstraction {#mailbox mailbox #obituary obituary})) (type: #export (Message s) ) (type: #export (Obituary s) ) (type: #export (Behavior s) {#.doc "An actor's behavior when messages are received."} {#handle (-> (Message s) s (Actor s) (Task s)) #end (-> Text s (Promise Unit))}) (def: #export (spawn behavior init) {#.doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s] (-> (Behavior s) s (IO (Actor s)))) (io (let [[handle end] behavior self (actor (atom (promise #.None)) (promise #.None)) process (loop [state init |mailbox| (io.run (atom.read (get@ #mailbox (@representation self))))] (do promise.Monad [[head tail] |mailbox| ?state' (handle head state self)] (case ?state' (#e.Error error) (do @ [_ (end error state)] (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] (get@ #obituary (@representation self)))) (wrap []))) (#e.Success state') (recur state' tail))))] self))) (def: #export (alive? actor) (All [s] (-> (Actor s) Bool)) (case (promise.poll (get@ #obituary (@representation actor))) #.None true _ false)) (def: #export (send message actor) {#.doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bool))) (if (alive? actor) (let [entry [message (promise #.None)]] (do Monad [|mailbox| (atom.read (get@ #mailbox (@representation actor)))] (loop [|mailbox| |mailbox|] (case (promise.poll |mailbox|) #.None (do @ [resolved? (promise.resolve entry |mailbox|)] (if resolved? (do @ [_ (atom.write (product.right entry) (get@ #mailbox (@representation actor)))] (wrap true)) (recur |mailbox|))) (#.Some [_ |mailbox|']) (recur |mailbox|'))))) (io/wrap false))) )) ## [Values] (def: (default-handle message state self) (All [s] (-> (Message s) s (Actor s) (Task s))) (message state self)) (def: (default-end cause state) (All [s] (-> Text s (Promise Unit))) (promise/wrap [])) (def: #export default-behavior (All [s] (Behavior s)) {#handle default-handle #end default-end}) (def: #export (poison actor) {#.doc "Kills the actor by sending a message that will kill it upon processing, but allows the actor to handle previous messages."} (All [s] (-> (Actor s) (IO Bool))) (send (function [state self] (task.throw Poisoned "")) actor)) ## [Syntax] (do-template [ ] [(def: #export ( name) (-> Ident cs.Annotations cs.Annotations) (|>> (#.Cons [(ident-for ) (code.tag name)]))) (def: #export ( name) (-> Ident (Meta Ident)) (do Monad [[_ annotations _] (macro.find-def name)] (case (macro.get-tag-ann (ident-for ) annotations) (#.Some actor-name) (wrap actor-name) _ (macro.fail (format "Definition is not " ".")))))] [with-actor resolve-actor #..actor "an actor"] [with-message resolve-message #..message "a message"] ) (def: actor-decl^ (Syntax [Text (List Text)]) (p.either (s.form (p.seq s.local-symbol (p.some s.local-symbol))) (p.seq s.local-symbol (:: p.Monad wrap (list))))) (do-template [ ] [(def: #export (-> Text Text) (|>> (format "@")))] [state-name "State"] [behavior-name "Behavior"] [new-name "new"] ) (type: HandleC [[Text Text Text] Code]) (type: StopC [[Text Text] Code]) (type: BehaviorC [(Maybe HandleC) (Maybe StopC)]) (def: behavior^ (s.Syntax BehaviorC) (let [handle-args ($_ p.seq s.local-symbol s.local-symbol s.local-symbol) stop-args ($_ p.seq s.local-symbol s.local-symbol)] (p.seq (p.maybe (s.form (p.seq (s.form (p.after (s.this (' handle)) handle-args)) s.any))) (p.maybe (s.form (p.seq (s.form (p.after (s.this (' stop)) stop-args)) s.any)))))) (syntax: #export (actor: [export csr.export] [[_name _vars] actor-decl^] [annotations (p.default cs.empty-annotations csr.annotations)] state-type [[?handle ?stop] behavior^]) {#.doc (doc "Defines an actor, with its behavior and internal state." (actor: #export Counter Nat ((stop cause state) (:: promise.Monad wrap (log! (if (ex.match? ..Poisoned cause) (format "Counter was poisoned: " (%n state)) cause))))) (actor: #export (Stack a) (List a) ((handle message state self) (do task.Monad [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] (wrap output)))))} (with-gensyms [g!init] (do @ [module macro.current-module-name #let [g!type (code.local-symbol (state-name _name)) g!behavior (code.local-symbol (behavior-name _name)) g!actor (code.local-symbol _name) g!new (code.local-symbol (new-name _name)) g!vars (list/map code.local-symbol _vars)]] (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars)) (~ state-type))) (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars)) (~ (|> annotations (with-actor [module _name]) csw.annotations)) (..Actor ((~ g!type) (~+ g!vars))))) (` (def: (~+ (csw.export export)) (~ g!behavior) (All [(~+ g!vars)] (..Behavior ((~ g!type) (~+ g!vars)))) {#..handle (~ (case ?handle #.None (` (~! ..default-handle)) (#.Some [[messageN stateN selfN] bodyC]) (` (function [(~ (code.local-symbol messageN)) (~ (code.local-symbol stateN)) (~ (code.local-symbol selfN))] (do task.Monad [] (~ bodyC)))))) #..end (~ (case ?stop #.None (` (~! ..default-end)) (#.Some [[causeN stateN] bodyC]) (` (function [(~ (code.local-symbol causeN)) (~ (code.local-symbol stateN))] (do promise.Monad [] (~ bodyC))))))})) (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) (All [(~+ g!vars)] (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) (..spawn (~ g!behavior) (~ g!init)))))) ))) (type: Signature {#vars (List Text) #name Text #inputs (List [Text Code]) #state Text #self Text #output Code}) (def: signature^ (s.Syntax Signature) (s.form ($_ p.seq (p.default (list) (s.tuple (p.some s.local-symbol))) s.local-symbol (p.some csr.typed-input) s.local-symbol s.local-symbol s.any))) (def: reference^ (s.Syntax [Ident (List Text)]) (p.either (s.form (p.seq s.symbol (p.some s.local-symbol))) (p.seq s.symbol (:: p.Monad wrap (list))))) (syntax: #export (message: [export csr.export] [[actor-name actor-vars] reference^] [signature signature^] [annotations (p.default cs.empty-annotations csr.annotations)] body) {#.doc (doc "A message can access the actor's state through the state parameter." "A message can also access the actor itself through the self parameter." "A message's output must be a task containing a 2-tuple with the updated state and a return value." "A message may succeed or fail (in case of failure, the actor dies)." (message: #export Counter (count! [increment Nat] state self Nat) (let [state' (n/+ increment state)] (task.return [state' state']))) (message: #export (Stack a) (push [value a] state self (List a)) (let [state' (#.Cons value state)] (task.return [state' state']))))} (with-gensyms [g!return g!error g!task g!sent?] (do @ [current-module macro.current-module-name actor-name (resolve-actor actor-name) #let [message-name [current-module (get@ #name signature)] g!type (code.symbol (product.both id state-name actor-name)) g!message (code.local-symbol (get@ #name signature)) g!actor-vars (list/map code.local-symbol actor-vars) actorC (` ((~ (code.symbol actor-name)) (~+ g!actor-vars))) g!all-vars (|> (get@ #vars signature) (list/map code.local-symbol) (list/compose g!actor-vars)) g!inputsC (|> (get@ #inputs signature) (list/map (|>> product.left code.local-symbol))) g!inputsT (|> (get@ #inputs signature) (list/map product.right)) g!state (|> signature (get@ #state) code.local-symbol) g!self (|> signature (get@ #self) code.local-symbol) g!actor-refs (: (List Code) (if (list.empty? actor-vars) (list) (|> actor-vars list.size n/dec (list.n/range +0) (list/map (|>> code.nat (~) ($) (`)))))) ref-replacements (|> (if (list.empty? actor-vars) (list) (|> actor-vars list.size n/dec (list.n/range +0) (list/map (|>> code.nat (~) ($) (`))))) (: (List Code)) (list.zip2 g!all-vars) (: (List [Code Code]))) g!outputT (list/fold (function [[g!var g!ref] outputT] (code.replace g!var g!ref outputT)) (get@ #output signature) ref-replacements)]] (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self)) (~ (|> annotations (with-message actor-name) csw.annotations)) (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) (let [(~ g!task) (task.task (~ g!outputT))] (io.run (do io.Monad [(~ g!sent?) (..send (function [(~ g!state) (~ g!self)] (do promise.Monad [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) (~ g!outputT)]) (do task.Monad [] (~ body)))] (case (~ g!return) (#.Right [(~ g!state) (~ g!return)]) (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) (task.return (~ g!state))) (#.Left (~ g!error)) (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) (task.fail (~ g!error)))) )) (~ g!self))] (if (~ g!sent?) ((~' wrap) (~ g!task)) ((~' wrap) (<| (task.throw ..Dead) (~ (code.text (format " Actor: " (%ident actor-name) "\n" "Message: " (%ident message-name) "\n"))))))))))) )) )))