(;module: {#;doc "The actor model of concurrency."} lux (lux (control monad) [io #- run] function (data [error #- fail] text/format (coll [list "List/" Monoid Monad]) [product] [number "Nat/" Codec]) [compiler #+ with-gensyms] (macro [ast] ["s" syntax #+ syntax: Syntax] (syntax [common])) [type]) (.. ["P" promise #+ Monad] [stm #+ Monad] [frp])) ## [Types] (type: #export (Actor s m) {#;doc "An actor, defined as all the necessities it requires."} {#mailbox (stm;Var m) #kill-signal (P;Promise Unit) #obituary (P;Promise [(Maybe Text) s (List m)])}) (type: #export (Behavior s m) {#;doc "An actor's behavior when messages are received."} {#step (-> (Actor s m) (-> m s (P;Promise (Error s)))) #end (-> (Maybe Text) s (P;Promise Unit))}) ## [Values] (def: #export (spawn init behavior) {#;doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s m] (-> s (Behavior s m) (IO (Actor s m)))) (io (let [[step on-death] behavior mailbox (stm;var (:! ($ +1) [])) kill-signal (P;promise Unit) obituary (P;promise [(Maybe Text) ($ +0) (List ($ +1))]) self {#mailbox mailbox #kill-signal kill-signal #obituary obituary} mailbox-chan (io;run (stm;follow mailbox)) step (step self) |mailbox| (stm;var mailbox-chan) _ (:: Monad map (function [_] (io;run (do Monad [mb (stm;read! |mailbox|)] (frp;close mb)))) kill-signal) process (loop [state init messages mailbox-chan] (do Monad [?messages+ messages] (case ?messages+ ## No kill-signal so far, so I may proceed... (#;Some [message messages']) (do Monad [#let [_ (io;run (stm;write! messages' |mailbox|))] ?state' (step message state)] (case ?state' (#;Left error) (do @ [#let [_ (io;run (P;resolve [] kill-signal)) _ (io;run (frp;close messages')) death-message (#;Some error)] _ (on-death death-message state) remaining-messages (frp;consume messages')] (wrap [death-message state (#;Cons message remaining-messages)])) (#;Right state') (recur state' messages'))) ## Otherwise, clean-up and return current state. #;None (do Monad [#let [_ (io;run (frp;close messages)) death-message #;None] _ (on-death death-message state)] (wrap [death-message state (list)])))))] self))) (def: #export poison {#;doc "Immediately kills the given actor (if it's not already dead)."} (All [s m] (-> (Actor s m) (io;IO Bool))) (|>. (get@ #kill-signal) (P;resolve []))) (def: #export (alive? actor) (All [s m] (-> (Actor s m) Bool)) (case [(P;poll (get@ #kill-signal actor)) (P;poll (get@ #obituary actor))] [#;None #;None] true _ false)) (def: #export (send message actor) {#;doc "Communicate with an actor through message passing."} (All [s m] (-> m (Actor s m) (P;Promise Bool))) (if (alive? actor) (exec (io;run (stm;write! message (get@ #mailbox actor))) (:: Monad wrap true)) (:: Monad wrap false))) (def: #export (keep-alive init behavior) {#;doc "Given initial-state and a behavior, spawns an actor that will reboot if it dies of errors. However, if it is killed, it won't re-spawn."} (All [s m] (-> s (Behavior s m) (IO (Actor s m)))) (io (let [ka-actor (: (Actor (Actor ($ +0) ($ +1)) ($ +1)) (io;run (spawn (io;run (spawn init behavior)) {#step (function [*self* message server] (do Monad [was-sent? (send message server)] (if was-sent? (wrap (#;Right server)) (do @ [[?cause state unprocessed-messages] (get@ #obituary server)] (exec (log! (format "ACTOR DIED:" "\n" (default "" ?cause) "\n" "RESTARTING" "\n")) (do @ [#let [new-server (io;run (spawn state behavior)) mailbox (get@ #mailbox new-server)] _ (P;future (mapM io;Monad ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))] (wrap (#;Right new-server)))) )))) #end (function [_ server] (exec (io;run (poison server)) (:: Monad wrap [])))})))] (update@ #obituary (: (-> (P;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))]) (P;Promise [(Maybe Text) ($ +0) (List ($ +1))])) (function [process] (do Monad [[_ server unprocessed-messages-0] process [cause state unprocessed-messages-1] (get@ #obituary server)] (wrap [cause state (List/append unprocessed-messages-0 unprocessed-messages-1)])))) ka-actor)))) ## [Syntax] (type: Method {#name Text #vars (List Text) #args (List [Text AST]) #return AST #body AST}) (def: method^ (Syntax Method) (s;form (do s;Monad [_ (s;this! (' method:)) vars (s;default (list) (s;tuple (s;some s;local-symbol))) [name args] (s;form ($_ s;seq s;local-symbol (s;many common;typed-arg) )) return s;any body s;any] (wrap {#name name #vars vars #args args #return return #body body})))) (def: stop^ (Syntax AST) (s;form (do s;Monad [_ (s;this! (' stop:))] s;any))) (def: actor-decl^ (Syntax [(List Text) Text (List [Text AST])]) (s;seq (s;default (list) (s;tuple (s;some s;local-symbol))) (s;either (s;form (s;seq s;local-symbol (s;many common;typed-arg))) (s;seq s;local-symbol (:: s;Monad wrap (list)))))) (def: (actor-def-decl [_vars _name _args] return-type) (-> [(List Text) Text (List [Text AST])] AST (List AST)) (let [decl (` ((~ (ast;symbol ["" (format _name "//new")])) (~@ (List/map (|>. product;left [""] ast;symbol) _args)))) base-type (` (-> (~@ (List/map product;right _args)) (~ return-type))) type (case _vars #;Nil base-type _ (` (All [(~@ (List/map (|>. [""] ast;symbol) _vars))] (~ base-type))))] (list decl type))) (syntax: #export (actor: [_ex-lev common;export-level] [(^@ decl [_vars _name _args]) actor-decl^] state-type [methods (s;many method^)] [?stop (s;opt stop^)]) {#;doc (doc "Allows defining an actor, with a pice of state and a set of methods that can be called on it." "A method can access the actor's state through the *state* variable." "A method can also access the actor itself through the *self* variable." "A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type." "A method's output must be a promise containing a 2-tuple with the updated state and a return value." "All methods are run implicitly within the Promise monad." (actor: #export Adder Int (method: (count! [to-add Int]) [Int Int] (if (i.>= 0 to-add) (let [new-state (i.+ to-add *state*)] (wrap (#;Right [new-state [*state* new-state]]))) (wrap (#;Left "Can't add negative numbers!")))) ))} (with-gensyms [g!message g!error g!return g!error g!output] (let [g!state-name (ast;symbol ["" (format _name "//STATE")]) g!protocol-name (ast;symbol ["" (format _name "//PROTOCOL")]) g!self (ast;symbol ["" "*self*"]) g!state (ast;symbol ["" "*state*"]) g!cause (ast;symbol ["" "*cause*"]) g!stop-body (default (` (:: P;Monad (~' wrap) [])) ?stop) protocol (List/map (function [(^slots [#name #vars #args #return #body])] (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (P;Promise (~ return))))) methods) protocol-pm (List/map (: (-> Method [AST AST]) (function [(^slots [#name #vars #args #return #body])] (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] ast;symbol))) body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (P;Promise (Error [(~ g!state-name) (~ return)]))) (function (~ (ast;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] ast;symbol) args))] (do P;Monad [] (~ body)))))] [(` [[(~@ arg-names)] (~ g!return)]) (` (do P;Monad [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))] (case (~ g!output) (#;Right [(~ g!state) (~ g!output)]) (exec (io;run (P;resolve (~ g!output) (~ g!return))) ((~' wrap) (#;Right (~ g!state)))) (#;Left (~ g!error)) ((~' wrap) (#;Left (~ g!error)))) ))]))) methods) g!behavior (` {#step (function [(~ g!self) (~ g!message) (~ g!state)] (case (~ g!message) (~@ (if (n.= +1 (list;size protocol-pm)) (List/join (List/map (function [[pattern clause]] (list pattern clause)) protocol-pm)) (List/join (List/map (function [[method [pattern clause]]] (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern))) clause)) (list;zip2 methods protocol-pm))))) )) #end (function [(~ g!cause) (~ g!state)] (do P;Monad [] (~ g!stop-body)))}) g!actor-name (ast;symbol ["" _name]) g!methods (List/map (: (-> Method AST) (function [(^slots [#name #vars #args #return #body])] (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] ast;symbol))) type (` (-> (~@ (List/map product;right args)) (~ g!actor-name) (P;Promise (~ return))))] (` (def: (~@ (common;gen-export-level _ex-lev)) ((~ (ast;symbol ["" name])) (~@ arg-names) (~ g!self)) (~ type) (let [(~ g!output) (P;promise (~ return))] (exec (send ((~ (ast;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self)) (~ g!output)))))))) methods)] (wrap (list& (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!state-name) (~ state-type))) (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!protocol-name) (~@ protocol))) (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name)))) (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Behavior (~ g!state-name) (~ g!protocol-name))))) (~ g!behavior))) g!methods)) )))