(.module: {#.doc "The actor model of concurrency."} lux (lux (control monad ["p" parser] ["ex" exception #+ exception:]) [io #- run "io/" Monad] (data text/format (coll [list "list/" Monoid Monad Fold]) [product]) [macro #+ with-gensyms Monad] (macro [code] ["s" syntax #+ syntax: Syntax] (syntax ["cs" common] (common ["csr" reader] ["csw" writer]))) (type abstract) (lang [type])) (// ["A" atom] ["P" promise "P/" Monad] ["T" task] [stm #+ Monad] [frp])) (exception: #export Poisoned) (exception: #export Killed) (exception: #export Dead) ## [Types] (with-expansions [ (as-is (-> s (Actor s) (T.Task s))) (as-is [Text s (List )])] (abstract: #export (Actor s) {#.doc "An actor, defined as all the necessities it requires."} {#mailbox (stm.Var ) #kill-switch (P.Promise Unit) #obituary (P.Promise )} (type: #export (Message s) ) (type: #export (Obituary s) ) (type: #export (Behavior s) {#.doc "An actor's behavior when messages are received."} {#handle (-> (Message s) s (Actor s) (T.Task s)) #end (-> Text s (P.Promise Unit))}) (def: #export (spawn behavior init) {#.doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s] (-> (Behavior s) s (IO (Actor s)))) (io (let [[handle end] behavior self (: (Actor ($ +0)) (@abstract {#mailbox (stm.var (:! (Message ($ +0)) [])) #kill-switch (: (P.Promise Unit) (P.promise #.None)) #obituary (: (P.Promise (Obituary ($ +0))) (P.promise #.None))})) mailbox-channel (io.run (stm.follow (get@ #mailbox (@repr self)))) |mailbox| (stm.var mailbox-channel) _ (P/map (function [_] (io.run (do Monad [mb (stm.read! |mailbox|)] (frp.close mb)))) (get@ #kill-switch (@repr self))) process (loop [state init messages mailbox-channel] (do P.Monad [?messages+ messages] (case ?messages+ ## No kill-switch so far, so I may proceed... (#.Some [message messages']) (do P.Monad [#let [_ (io.run (stm.write! messages' |mailbox|))] ?state' (handle message state self)] (case ?state' (#.Left error) (do @ [#let [_ (io.run (do Monad [_ (P.resolve [] (get@ #kill-switch (@repr self)))] (frp.close messages')))] _ (end error state) remaining-messages (frp.consume messages')] (wrap [error state (#.Cons message remaining-messages)])) (#.Right state') (recur state' messages'))) ## Otherwise, clean-up and return current state. #.None (do P.Monad [#let [_ (io.run (frp.close messages)) death-message (Killed "")] _ (end death-message state)] (wrap [death-message state (list)])))))] self))) (def: #export (alive? actor) (All [s] (-> (Actor s) Bool)) (case [(P.poll (get@ #kill-switch (@repr actor))) (P.poll (get@ #obituary (@repr actor)))] [#.None #.None] true _ false)) (def: #export (send message actor) {#.doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bool))) (if (alive? actor) (do Monad [_ (stm.write! message (get@ #mailbox (@repr actor)))] (wrap true)) (io/wrap false))) (def: #export (kill actor) {#.doc "Immediately kills the given actor (if it is not already dead)."} (All [s] (-> (Actor s) (io.IO Bool))) (if (alive? actor) (|> actor @repr (get@ #kill-switch) (P.resolve [])) (io/wrap false))) )) ## [Values] (def: #export (default-handle message state self) (All [s] (-> (Message s) s (Actor s) (T.Task s))) (message state self)) (def: #export (default-end cause state) (All [s] (-> Text s (P.Promise Unit))) (P/wrap [])) (def: #export default-behavior (All [s] (Behavior s)) {#handle default-handle #end default-end}) (def: #export (poison actor) {#.doc "Kills the actor by sending a message that will kill it upon processing, but allows the actor to handle previous messages."} (All [s] (-> (Actor s) (IO Bool))) (send (function [state self] (T.throw Poisoned "")) actor)) ## [Syntax] (do-template [ ] [(def: #export ( name) (-> Ident cs.Annotations cs.Annotations) (|>> (#.Cons [(ident-for ) (code.tag name)]))) (def: #export ( name) (-> Ident (Meta Ident)) (do Monad [[_ annotations _] (macro.find-def name)] (case (macro.get-tag-ann (ident-for ) annotations) (#.Some actor-name) (wrap actor-name) _ (macro.fail (format "Definition is not " ".")))))] [with-actor resolve-actor #..actor "an actor"] [with-message resolve-message #..message "a message"] ) (def: actor-decl^ (Syntax [Text (List Text)]) (p.either (s.form (p.seq s.local-symbol (p.some s.local-symbol))) (p.seq s.local-symbol (:: p.Monad wrap (list))))) (do-template [ ] [(def: #export (-> Text Text) (|>> (format "@")))] [state-name "State"] [behavior-name "Behavior"] [new-name "new"] ) (type: HandleC [[Text Text Text] Code]) (type: StopC [[Text Text] Code]) (type: BehaviorC [(Maybe HandleC) (Maybe StopC)]) (def: behavior^ (s.Syntax BehaviorC) (let [handle-args ($_ p.seq s.local-symbol s.local-symbol s.local-symbol) stop-args ($_ p.seq s.local-symbol s.local-symbol)] (p.seq (p.maybe (s.form (p.seq (s.form (p.after (s.this (' handle)) handle-args)) s.any))) (p.maybe (s.form (p.seq (s.form (p.after (s.this (' stop)) stop-args)) s.any)))))) (syntax: #export (actor: [export csr.export] [[_name _vars] actor-decl^] [annotations (p.default cs.empty-annotations csr.annotations)] state-type [[?handle ?stop] behavior^]) {#.doc (doc "Defines an actor, with its behavior and internal state." (actor: #export Counter Nat ((stop cause state) (:: P.Monad wrap (log! (if (ex.match? ..Killed cause) (format "Counter was killed: " (%n state)) cause))))) (actor: #export (Stack a) (List a) ((handle message state self) (do T.Monad [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] (wrap output)))))} (with-gensyms [g!init] (do @ [module macro.current-module-name #let [g!type (code.local-symbol (state-name _name)) g!behavior (code.local-symbol (behavior-name _name)) g!actor (code.local-symbol _name) g!new (code.local-symbol (new-name _name)) g!vars (list/map code.local-symbol _vars)]] (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars)) (~ state-type))) (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars)) (~ (|> annotations (with-actor [module _name]) csw.annotations)) (..Actor ((~ g!type) (~+ g!vars))))) (` (def: (~+ (csw.export export)) (~ g!behavior) (All [(~+ g!vars)] (..Behavior ((~ g!type) (~+ g!vars)))) {#..handle (~ (case ?handle #.None (` ..default-handle) (#.Some [[messageN stateN selfN] bodyC]) (` (function [(~ (code.local-symbol messageN)) (~ (code.local-symbol stateN)) (~ (code.local-symbol selfN))] (do T.Monad [] (~ bodyC)))))) #..end (~ (case ?stop #.None (` ..default-end) (#.Some [[causeN stateN] bodyC]) (` (function [(~ (code.local-symbol causeN)) (~ (code.local-symbol stateN))] (do P.Monad [] (~ bodyC))))))})) (` (def: (~+ (csw.export export)) ((~ g!new) (~@ g!init)) (All [(~+ g!vars)] (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) (..spawn (~ g!behavior) (~@ g!init)))))) ))) (type: Signature {#vars (List Text) #name Text #inputs (List [Text Code]) #state Text #self Text #output Code}) (def: signature^ (s.Syntax Signature) (s.form ($_ p.seq (p.default (list) (s.tuple (p.some s.local-symbol))) s.local-symbol (p.some csr.typed-input) s.local-symbol s.local-symbol s.any))) (def: reference^ (s.Syntax [Ident (List Text)]) (p.either (s.form (p.seq s.symbol (p.some s.local-symbol))) (p.seq s.symbol (:: p.Monad wrap (list))))) (syntax: #export (message: [export csr.export] [[actor-name actor-vars] reference^] [signature signature^] [annotations (p.default cs.empty-annotations csr.annotations)] body) {#.doc (doc "A message can access the actor's state through the state parameter." "A message can also access the actor itself through the self parameter." "A message's output must be a task containing a 2-tuple with the updated state and a return value." "A message may succeed or fail (in case of failure, the actor dies)." (message: #export Counter (count! [increment Nat] state self Nat) (let [state' (n/+ increment state)] (T.return [state' state']))) (message: #export (Stack a) (push [value a] state self (List a)) (let [state' (#.Cons value state)] (T.return [state' state']))))} (with-gensyms [g!return g!error g!task g!sent?] (do @ [actor-name (resolve-actor actor-name) #let [g!type (code.symbol (product.both id state-name actor-name)) g!message (code.local-symbol (get@ #name signature)) g!actor-vars (list/map code.local-symbol actor-vars) actorC (` ((~ (code.symbol actor-name)) (~+ g!actor-vars))) g!all-vars (|> (get@ #vars signature) (list/map code.local-symbol) (list/compose g!actor-vars)) g!inputsC (|> (get@ #inputs signature) (list/map (|>> product.left code.local-symbol))) g!inputsT (|> (get@ #inputs signature) (list/map product.right)) g!state (|> signature (get@ #state) code.local-symbol) g!self (|> signature (get@ #self) code.local-symbol) g!actor-refs (: (List Code) (if (list.empty? actor-vars) (list) (|> actor-vars list.size n/dec (list.n/range +0) (list/map (|>> code.nat (~) ($) (`)))))) ref-replacements (|> (if (list.empty? actor-vars) (list) (|> actor-vars list.size n/dec (list.n/range +0) (list/map (|>> code.nat (~) ($) (`))))) (: (List Code)) (list.zip2 g!all-vars) (: (List [Code Code]))) g!outputT (list/fold (function [[g!var g!ref] outputT] (code.replace g!var g!ref outputT)) (get@ #output signature) ref-replacements)]] (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self)) (~ (|> annotations (with-message actor-name) csw.annotations)) (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (T.Task (~ (get@ #output signature))))) (let [(~@ g!task) (T.task (~ g!outputT))] (io.run (do io.Monad [(~@ g!sent?) (..send (function [(~ g!state) (~ g!self)] (do P.Monad [(~@ g!return) (: (T.Task [((~ g!type) (~+ g!actor-refs)) (~ g!outputT)]) (do T.Monad [] (~ body)))] (case (~@ g!return) (#.Right [(~ g!state) (~@ g!return)]) (exec (io.run (P.resolve (#.Right (~@ g!return)) (~@ g!task))) (T.return (~ g!state))) (#.Left (~@ g!error)) (exec (io.run (P.resolve (#.Left (~@ g!error)) (~@ g!task))) (T.fail (~@ g!error)))) )) (~ g!self))] (if (~@ g!sent?) ((~' wrap) (~@ g!task)) ((~' wrap) (T.throw ..Dead "")))))))) )) )))