aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/actor.lux
diff options
context:
space:
mode:
authorEduardo Julian2017-01-04 19:23:27 -0400
committerEduardo Julian2017-01-04 19:23:27 -0400
commitdf88c658e85f072679085b16a95120ab5cdfc078 (patch)
tree3189529180690b6073149bc58fab0d6cbb2ea75e /stdlib/source/lux/concurrency/actor.lux
parentcc5f798e1ab7e636d38a6f85c30c146ca7963b07 (diff)
- Updated the documentation of most modules left.
- Minor refactorings.
Diffstat (limited to 'stdlib/source/lux/concurrency/actor.lux')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux138
1 files changed, 72 insertions, 66 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index cea170b2c..650065a0e 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -3,7 +3,7 @@
## If a copy of the MPL was not distributed with this file,
## You can obtain one at http://mozilla.org/MPL/2.0/.
-(;module:
+(;module: {#;doc "The actor model of concurrency."}
lux
(lux (control monad)
(codata [io #- run]
@@ -18,32 +18,35 @@
["s" syntax #+ syntax: Syntax]
(syntax [common]))
[type])
- (.. [promise #+ Monad<Promise>]
+ (.. ["P" promise #+ Monad<Promise>]
[stm #+ Monad<STM>]
[frp]))
## [Types]
(type: #export (Actor s m)
+ {#;doc "An actor, defined as all the necessities it requires."}
{#mailbox (stm;Var m)
- #kill-signal (promise;Promise Unit)
- #obituary (promise;Promise [(Maybe Text) s (List m)])})
+ #kill-signal (P;Promise Unit)
+ #obituary (P;Promise [(Maybe Text) s (List m)])})
-(type: #export (Proc s m)
- {#step (-> (Actor s m) (-> m s (promise;Promise (Error s))))
- #end (-> (Maybe Text) s (promise;Promise Unit))})
+(type: #export (Behavior s m)
+ {#;doc "An actor's behavior when messages are received."}
+ {#step (-> (Actor s m) (-> m s (P;Promise (Error s))))
+ #end (-> (Maybe Text) s (P;Promise Unit))})
## [Values]
-(def: #export (spawn init [proc on-death])
- {#;doc "Given a procedure and initial state, launches an actor and returns it."}
- (All [s m] (-> s (Proc s m) (IO (Actor s m))))
- (io (let [mailbox (stm;var (:! ($ +1) []))
- kill-signal (promise;promise Unit)
- obituary (promise;promise [(Maybe Text) ($ +0) (List ($ +1))])
+(def: #export (spawn init behavior)
+ {#;doc "Given a behavior and initial state, spawns an actor and returns it."}
+ (All [s m] (-> s (Behavior s m) (IO (Actor s m))))
+ (io (let [[step on-death] behavior
+ mailbox (stm;var (:! ($ +1) []))
+ kill-signal (P;promise Unit)
+ obituary (P;promise [(Maybe Text) ($ +0) (List ($ +1))])
self {#mailbox mailbox
#kill-signal kill-signal
#obituary obituary}
mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox))
- proc (proc self)
+ step (step self)
|mailbox| (stm;var mailbox-chan)
_ (:: Monad<Promise> map
(lambda [_]
@@ -60,11 +63,11 @@
(#;Some [message messages'])
(do Monad<Promise>
[#let [_ (io;run (stm;write! messages' |mailbox|))]
- ?state' (proc message state)]
+ ?state' (step message state)]
(case ?state'
(#;Left error)
(do @
- [#let [_ (io;run (promise;resolve [] kill-signal))
+ [#let [_ (io;run (P;resolve [] kill-signal))
_ (io;run (frp;close messages'))
death-message (#;Some error)]
_ (on-death death-message state)
@@ -86,12 +89,12 @@
(def: #export poison
{#;doc "Immediately kills the given actor (if it's not already dead)."}
(All [s m] (-> (Actor s m) (io;IO Bool)))
- (|>. (get@ #kill-signal) (promise;resolve [])))
+ (|>. (get@ #kill-signal) (P;resolve [])))
(def: #export (alive? actor)
(All [s m] (-> (Actor s m) Bool))
- (case [(promise;poll (get@ #kill-signal actor))
- (promise;poll (get@ #obituary actor))]
+ (case [(P;poll (get@ #kill-signal actor))
+ (P;poll (get@ #obituary actor))]
[#;None #;None]
true
@@ -99,18 +102,20 @@
false))
(def: #export (send message actor)
- (All [s m] (-> m (Actor s m) (promise;Promise Bool)))
+ {#;doc "Communicate with an actor through message passing."}
+ (All [s m] (-> m (Actor s m) (P;Promise Bool)))
(if (alive? actor)
(exec (io;run (stm;write! message (get@ #mailbox actor)))
(:: Monad<Promise> wrap true))
(:: Monad<Promise> wrap false)))
-(def: #export (keep-alive init proc)
- {#;doc "Given initial-state and a procedure, launches and actor that will reboot if it dies of errors.
- However, it can still be killed."}
- (All [s m] (-> s (Proc s m) (IO (Actor s m))))
+(def: #export (keep-alive init behavior)
+ {#;doc "Given initial-state and a behavior, spawns an actor that will reboot if it dies of errors.
+
+ However, if it is killed, it won't re-spawn."}
+ (All [s m] (-> s (Behavior s m) (IO (Actor s m))))
(io (let [ka-actor (: (Actor (Actor ($ +0) ($ +1)) ($ +1))
- (io;run (spawn (io;run (spawn init proc))
+ (io;run (spawn (io;run (spawn init behavior))
{#step (lambda [*self* message server]
(do Monad<Promise>
[was-sent? (send message server)]
@@ -118,17 +123,19 @@
(wrap (#;Right server))
(do @
[[?cause state unprocessed-messages] (get@ #obituary server)]
- (exec (log! (format "ACTOR DIED:\n" (default "" ?cause) "\n RESTARTING"))
+ (exec (log! (format "ACTOR DIED:" "\n"
+ (default "" ?cause) "\n"
+ "RESTARTING" "\n"))
(do @
- [#let [new-server (io;run (spawn state proc))
+ [#let [new-server (io;run (spawn state behavior))
mailbox (get@ #mailbox new-server)]
- _ (promise;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))]
+ _ (P;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))]
(wrap (#;Right new-server))))
))))
#end (lambda [_ server] (exec (io;run (poison server))
(:: Monad<Promise> wrap [])))})))]
- (update@ #obituary (: (-> (promise;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))])
- (promise;Promise [(Maybe Text) ($ +0) (List ($ +1))]))
+ (update@ #obituary (: (-> (P;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))])
+ (P;Promise [(Maybe Text) ($ +0) (List ($ +1))]))
(lambda [process]
(do Monad<Promise>
[[_ server unprocessed-messages-0] process
@@ -193,23 +200,22 @@
state-type
[methods (s;many method^)]
[?stop (s;opt stop^)])
- {#;doc (doc "Allows defining an actor, with a set of methods that can be called on it."
- "The methods can return promisehronous outputs."
- "The methods can access the actor's state through the *state* variable."
- "The methods can also access the actor itself through the *self* variable."
+ {#;doc (doc "Allows defining an actor, with a pice of state and a set of methods that can be called on it."
+ "A method can access the actor's state through the *state* variable."
+ "A method can also access the actor itself through the *self* variable."
+ "A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type."
+ "A method's output must be a promise containing a 2-tuple with the updated state and a return value."
+ "All methods are run implicitly within the Promise monad."
(actor: #export Adder
Int
- (method: (count! {to-add Int})
+ (method: (count! [to-add Int])
[Int Int]
- (if (>= 0 to-add)
- (do Monad<Promise>
- [#let [new-state (i.+ to-add *state*)]]
+ (if (i.>= 0 to-add)
+ (let [new-state (i.+ to-add *state*)]
(wrap (#;Right [new-state [*state* new-state]])))
- (do Monad<Promise>
- []
- (wrap (#;Left "Can't add negative numbers!")))))
+ (wrap (#;Left "Can't add negative numbers!"))))
))}
(with-gensyms [g!message g!error g!return g!error g!output]
(let [g!state-name (ast;symbol ["" (format _name "//STATE")])
@@ -217,62 +223,62 @@
g!self (ast;symbol ["" "*self*"])
g!state (ast;symbol ["" "*state*"])
g!cause (ast;symbol ["" "*cause*"])
- g!stop-body (default (` (:: promise;Monad<Promise> (~' wrap) [])) ?stop)
+ g!stop-body (default (` (:: P;Monad<Promise> (~' wrap) [])) ?stop)
protocol (List/map (lambda [(^slots [#name #vars #args #return #body])]
- (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (promise;Promise (~ return)))))
+ (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (P;Promise (~ return)))))
methods)
protocol-pm (List/map (: (-> Method [AST AST])
(lambda [(^slots [#name #vars #args #return #body])]
(let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] ast;symbol)))
- body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (promise;Promise (Error [(~ g!state-name) (~ return)])))
+ body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (P;Promise (Error [(~ g!state-name) (~ return)])))
(lambda (~ (ast;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] ast;symbol) args))]
- (do promise;Monad<Promise>
+ (do P;Monad<Promise>
[]
(~ body)))))]
[(` [[(~@ arg-names)] (~ g!return)])
- (` (do promise;Monad<Promise>
+ (` (do P;Monad<Promise>
[(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))]
(case (~ g!output)
(#;Right [(~ g!state) (~ g!output)])
- (exec (io;run (promise;resolve (~ g!output) (~ g!return)))
+ (exec (io;run (P;resolve (~ g!output) (~ g!return)))
((~' wrap) (#;Right (~ g!state))))
(#;Left (~ g!error))
((~' wrap) (#;Left (~ g!error))))
))])))
methods)
- g!proc (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)]
- (case (~ g!message)
- (~@ (if (n.= +1 (list;size protocol-pm))
- (List/join (List/map (lambda [[pattern clause]]
- (list pattern clause))
- protocol-pm))
- (List/join (List/map (lambda [[method [pattern clause]]]
- (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern)))
- clause))
- (list;zip2 methods protocol-pm)))))
- ))
- #end (lambda [(~ g!cause) (~ g!state)]
- (do promise;Monad<Promise>
- []
- (~ g!stop-body)))})
+ g!behavior (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)]
+ (case (~ g!message)
+ (~@ (if (n.= +1 (list;size protocol-pm))
+ (List/join (List/map (lambda [[pattern clause]]
+ (list pattern clause))
+ protocol-pm))
+ (List/join (List/map (lambda [[method [pattern clause]]]
+ (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern)))
+ clause))
+ (list;zip2 methods protocol-pm)))))
+ ))
+ #end (lambda [(~ g!cause) (~ g!state)]
+ (do P;Monad<Promise>
+ []
+ (~ g!stop-body)))})
g!actor-name (ast;symbol ["" _name])
g!methods (List/map (: (-> Method AST)
(lambda [(^slots [#name #vars #args #return #body])]
(let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] ast;symbol)))
type (` (-> (~@ (List/map product;right args))
(~ g!actor-name)
- (promise;Promise (~ return))))]
+ (P;Promise (~ return))))]
(` (def: (~@ (common;gen-export-level _ex-lev)) ((~ (ast;symbol ["" name])) (~@ arg-names) (~ g!self))
(~ type)
- (let [(~ g!output) (promise;promise (~ return))]
+ (let [(~ g!output) (P;promise (~ return))]
(exec (send ((~ (ast;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self))
(~ g!output))))))))
methods)]
(wrap (list& (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!state-name) (~ state-type)))
(` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!protocol-name) (~@ protocol)))
(` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name))))
- (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Proc (~ g!state-name) (~ g!protocol-name)))))
- (~ g!proc)))
+ (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Behavior (~ g!state-name) (~ g!protocol-name)))))
+ (~ g!behavior)))
g!methods))
)))