diff options
author | Eduardo Julian | 2017-01-04 19:23:27 -0400 |
---|---|---|
committer | Eduardo Julian | 2017-01-04 19:23:27 -0400 |
commit | df88c658e85f072679085b16a95120ab5cdfc078 (patch) | |
tree | 3189529180690b6073149bc58fab0d6cbb2ea75e /stdlib/source/lux/concurrency/actor.lux | |
parent | cc5f798e1ab7e636d38a6f85c30c146ca7963b07 (diff) |
- Updated the documentation of most modules left.
- Minor refactorings.
Diffstat (limited to 'stdlib/source/lux/concurrency/actor.lux')
-rw-r--r-- | stdlib/source/lux/concurrency/actor.lux | 138 |
1 files changed, 72 insertions, 66 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux index cea170b2c..650065a0e 100644 --- a/stdlib/source/lux/concurrency/actor.lux +++ b/stdlib/source/lux/concurrency/actor.lux @@ -3,7 +3,7 @@ ## If a copy of the MPL was not distributed with this file, ## You can obtain one at http://mozilla.org/MPL/2.0/. -(;module: +(;module: {#;doc "The actor model of concurrency."} lux (lux (control monad) (codata [io #- run] @@ -18,32 +18,35 @@ ["s" syntax #+ syntax: Syntax] (syntax [common])) [type]) - (.. [promise #+ Monad<Promise>] + (.. ["P" promise #+ Monad<Promise>] [stm #+ Monad<STM>] [frp])) ## [Types] (type: #export (Actor s m) + {#;doc "An actor, defined as all the necessities it requires."} {#mailbox (stm;Var m) - #kill-signal (promise;Promise Unit) - #obituary (promise;Promise [(Maybe Text) s (List m)])}) + #kill-signal (P;Promise Unit) + #obituary (P;Promise [(Maybe Text) s (List m)])}) -(type: #export (Proc s m) - {#step (-> (Actor s m) (-> m s (promise;Promise (Error s)))) - #end (-> (Maybe Text) s (promise;Promise Unit))}) +(type: #export (Behavior s m) + {#;doc "An actor's behavior when messages are received."} + {#step (-> (Actor s m) (-> m s (P;Promise (Error s)))) + #end (-> (Maybe Text) s (P;Promise Unit))}) ## [Values] -(def: #export (spawn init [proc on-death]) - {#;doc "Given a procedure and initial state, launches an actor and returns it."} - (All [s m] (-> s (Proc s m) (IO (Actor s m)))) - (io (let [mailbox (stm;var (:! ($ +1) [])) - kill-signal (promise;promise Unit) - obituary (promise;promise [(Maybe Text) ($ +0) (List ($ +1))]) +(def: #export (spawn init behavior) + {#;doc "Given a behavior and initial state, spawns an actor and returns it."} + (All [s m] (-> s (Behavior s m) (IO (Actor s m)))) + (io (let [[step on-death] behavior + mailbox (stm;var (:! ($ +1) [])) + kill-signal (P;promise Unit) + obituary (P;promise [(Maybe Text) ($ +0) (List ($ +1))]) self {#mailbox mailbox #kill-signal kill-signal #obituary obituary} mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox)) - proc (proc self) + step (step self) |mailbox| (stm;var mailbox-chan) _ (:: Monad<Promise> map (lambda [_] @@ -60,11 +63,11 @@ (#;Some [message messages']) (do Monad<Promise> [#let [_ (io;run (stm;write! messages' |mailbox|))] - ?state' (proc message state)] + ?state' (step message state)] (case ?state' (#;Left error) (do @ - [#let [_ (io;run (promise;resolve [] kill-signal)) + [#let [_ (io;run (P;resolve [] kill-signal)) _ (io;run (frp;close messages')) death-message (#;Some error)] _ (on-death death-message state) @@ -86,12 +89,12 @@ (def: #export poison {#;doc "Immediately kills the given actor (if it's not already dead)."} (All [s m] (-> (Actor s m) (io;IO Bool))) - (|>. (get@ #kill-signal) (promise;resolve []))) + (|>. (get@ #kill-signal) (P;resolve []))) (def: #export (alive? actor) (All [s m] (-> (Actor s m) Bool)) - (case [(promise;poll (get@ #kill-signal actor)) - (promise;poll (get@ #obituary actor))] + (case [(P;poll (get@ #kill-signal actor)) + (P;poll (get@ #obituary actor))] [#;None #;None] true @@ -99,18 +102,20 @@ false)) (def: #export (send message actor) - (All [s m] (-> m (Actor s m) (promise;Promise Bool))) + {#;doc "Communicate with an actor through message passing."} + (All [s m] (-> m (Actor s m) (P;Promise Bool))) (if (alive? actor) (exec (io;run (stm;write! message (get@ #mailbox actor))) (:: Monad<Promise> wrap true)) (:: Monad<Promise> wrap false))) -(def: #export (keep-alive init proc) - {#;doc "Given initial-state and a procedure, launches and actor that will reboot if it dies of errors. - However, it can still be killed."} - (All [s m] (-> s (Proc s m) (IO (Actor s m)))) +(def: #export (keep-alive init behavior) + {#;doc "Given initial-state and a behavior, spawns an actor that will reboot if it dies of errors. + + However, if it is killed, it won't re-spawn."} + (All [s m] (-> s (Behavior s m) (IO (Actor s m)))) (io (let [ka-actor (: (Actor (Actor ($ +0) ($ +1)) ($ +1)) - (io;run (spawn (io;run (spawn init proc)) + (io;run (spawn (io;run (spawn init behavior)) {#step (lambda [*self* message server] (do Monad<Promise> [was-sent? (send message server)] @@ -118,17 +123,19 @@ (wrap (#;Right server)) (do @ [[?cause state unprocessed-messages] (get@ #obituary server)] - (exec (log! (format "ACTOR DIED:\n" (default "" ?cause) "\n RESTARTING")) + (exec (log! (format "ACTOR DIED:" "\n" + (default "" ?cause) "\n" + "RESTARTING" "\n")) (do @ - [#let [new-server (io;run (spawn state proc)) + [#let [new-server (io;run (spawn state behavior)) mailbox (get@ #mailbox new-server)] - _ (promise;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))] + _ (P;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))] (wrap (#;Right new-server)))) )))) #end (lambda [_ server] (exec (io;run (poison server)) (:: Monad<Promise> wrap [])))})))] - (update@ #obituary (: (-> (promise;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))]) - (promise;Promise [(Maybe Text) ($ +0) (List ($ +1))])) + (update@ #obituary (: (-> (P;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))]) + (P;Promise [(Maybe Text) ($ +0) (List ($ +1))])) (lambda [process] (do Monad<Promise> [[_ server unprocessed-messages-0] process @@ -193,23 +200,22 @@ state-type [methods (s;many method^)] [?stop (s;opt stop^)]) - {#;doc (doc "Allows defining an actor, with a set of methods that can be called on it." - "The methods can return promisehronous outputs." - "The methods can access the actor's state through the *state* variable." - "The methods can also access the actor itself through the *self* variable." + {#;doc (doc "Allows defining an actor, with a pice of state and a set of methods that can be called on it." + "A method can access the actor's state through the *state* variable." + "A method can also access the actor itself through the *self* variable." + "A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type." + "A method's output must be a promise containing a 2-tuple with the updated state and a return value." + "All methods are run implicitly within the Promise monad." (actor: #export Adder Int - (method: (count! {to-add Int}) + (method: (count! [to-add Int]) [Int Int] - (if (>= 0 to-add) - (do Monad<Promise> - [#let [new-state (i.+ to-add *state*)]] + (if (i.>= 0 to-add) + (let [new-state (i.+ to-add *state*)] (wrap (#;Right [new-state [*state* new-state]]))) - (do Monad<Promise> - [] - (wrap (#;Left "Can't add negative numbers!"))))) + (wrap (#;Left "Can't add negative numbers!")))) ))} (with-gensyms [g!message g!error g!return g!error g!output] (let [g!state-name (ast;symbol ["" (format _name "//STATE")]) @@ -217,62 +223,62 @@ g!self (ast;symbol ["" "*self*"]) g!state (ast;symbol ["" "*state*"]) g!cause (ast;symbol ["" "*cause*"]) - g!stop-body (default (` (:: promise;Monad<Promise> (~' wrap) [])) ?stop) + g!stop-body (default (` (:: P;Monad<Promise> (~' wrap) [])) ?stop) protocol (List/map (lambda [(^slots [#name #vars #args #return #body])] - (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (promise;Promise (~ return))))) + (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (P;Promise (~ return))))) methods) protocol-pm (List/map (: (-> Method [AST AST]) (lambda [(^slots [#name #vars #args #return #body])] (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] ast;symbol))) - body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (promise;Promise (Error [(~ g!state-name) (~ return)]))) + body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (P;Promise (Error [(~ g!state-name) (~ return)]))) (lambda (~ (ast;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] ast;symbol) args))] - (do promise;Monad<Promise> + (do P;Monad<Promise> [] (~ body)))))] [(` [[(~@ arg-names)] (~ g!return)]) - (` (do promise;Monad<Promise> + (` (do P;Monad<Promise> [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))] (case (~ g!output) (#;Right [(~ g!state) (~ g!output)]) - (exec (io;run (promise;resolve (~ g!output) (~ g!return))) + (exec (io;run (P;resolve (~ g!output) (~ g!return))) ((~' wrap) (#;Right (~ g!state)))) (#;Left (~ g!error)) ((~' wrap) (#;Left (~ g!error)))) ))]))) methods) - g!proc (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)] - (case (~ g!message) - (~@ (if (n.= +1 (list;size protocol-pm)) - (List/join (List/map (lambda [[pattern clause]] - (list pattern clause)) - protocol-pm)) - (List/join (List/map (lambda [[method [pattern clause]]] - (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern))) - clause)) - (list;zip2 methods protocol-pm))))) - )) - #end (lambda [(~ g!cause) (~ g!state)] - (do promise;Monad<Promise> - [] - (~ g!stop-body)))}) + g!behavior (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)] + (case (~ g!message) + (~@ (if (n.= +1 (list;size protocol-pm)) + (List/join (List/map (lambda [[pattern clause]] + (list pattern clause)) + protocol-pm)) + (List/join (List/map (lambda [[method [pattern clause]]] + (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern))) + clause)) + (list;zip2 methods protocol-pm))))) + )) + #end (lambda [(~ g!cause) (~ g!state)] + (do P;Monad<Promise> + [] + (~ g!stop-body)))}) g!actor-name (ast;symbol ["" _name]) g!methods (List/map (: (-> Method AST) (lambda [(^slots [#name #vars #args #return #body])] (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] ast;symbol))) type (` (-> (~@ (List/map product;right args)) (~ g!actor-name) - (promise;Promise (~ return))))] + (P;Promise (~ return))))] (` (def: (~@ (common;gen-export-level _ex-lev)) ((~ (ast;symbol ["" name])) (~@ arg-names) (~ g!self)) (~ type) - (let [(~ g!output) (promise;promise (~ return))] + (let [(~ g!output) (P;promise (~ return))] (exec (send ((~ (ast;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self)) (~ g!output)))))))) methods)] (wrap (list& (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!state-name) (~ state-type))) (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!protocol-name) (~@ protocol))) (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name)))) - (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Proc (~ g!state-name) (~ g!protocol-name))))) - (~ g!proc))) + (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Behavior (~ g!state-name) (~ g!protocol-name))))) + (~ g!behavior))) g!methods)) ))) |