aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/actor.lux
diff options
context:
space:
mode:
authorEduardo Julian2017-06-24 23:58:46 -0400
committerEduardo Julian2017-06-24 23:58:46 -0400
commitfed9d0eb94a8808fe119f39fddf882754cd58788 (patch)
treee4ae053453913c63b80f2fe2b120b67405df42bb /stdlib/source/lux/concurrency/actor.lux
parent6b04fc4a718b7eb40e72fdb02a8fa4f7cf4ea64a (diff)
- Re-designed actors so that their messages are now functions with access to the actor's state, and to the actor itself.
- When creating channels and promises, the type is now mandatory. - Minor refactorings.
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux353
1 files changed, 166 insertions, 187 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index 5f75dc912..0b9c1032f 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -1,14 +1,12 @@
(;module: {#;doc "The actor model of concurrency."}
lux
(lux (control monad
- ["p" parser])
- [io #- run]
- function
- (data ["R" result]
- text/format
- (coll [list "List/" Monoid<List> Monad<List>])
- [product]
- [number "Nat/" Codec<Text,Nat>])
+ ["p" parser]
+ ["ex" exception #+ exception:])
+ [io #- run "io/" Monad<IO>]
+ (data text/format
+ (coll [list "L/" Monoid<List> Monad<List>])
+ [product])
[macro #+ with-gensyms]
(macro [code]
["s" syntax #+ syntax: Syntax]
@@ -16,82 +14,86 @@
(common ["csr" reader]
["csw" writer])))
[type])
- (.. ["P" promise #+ Monad<Promise>]
+ (.. ["A" atom]
+ ["P" promise "P/" Monad<Promise>]
+ ["T" task]
[stm #+ Monad<STM>]
[frp]))
+(exception: #export Poisoned)
+(exception: #export Killed)
+
## [Types]
-(type: #export (Actor s m)
- {#;doc "An actor, defined as all the necessities it requires."}
- {#mailbox (stm;Var m)
- #kill-signal (P;Promise Unit)
- #obituary (P;Promise [(Maybe Text) s (List m)])})
+(with-expansions
+ [<Message> (as-is (-> s (Actor s) (T;Task s)))
+ <Obituary> (as-is [Text s (List <Message>)])]
+ (type: #export (Actor s)
+ {#;doc "An actor, defined as all the necessities it requires."}
+ {#mailbox (stm;Var <Message>)
+ #kill-switch (P;Promise Unit)
+ #obituary (P;Promise <Obituary>)})
+
+ (type: #export (Message s)
+ <Message>)
+
+ (type: #export (Obituary s)
+ <Obituary>))
-(type: #export (Behavior s m)
+(type: #export (Behavior s)
{#;doc "An actor's behavior when messages are received."}
- {#step (-> (Actor s m) (-> m s (P;Promise (R;Result s))))
- #end (-> (Maybe Text) s (P;Promise Unit))})
+ {#step (-> (Message s) s (Actor s) (T;Task s))
+ #end (-> Text s (P;Promise Unit))})
## [Values]
(def: #export (spawn init behavior)
{#;doc "Given a behavior and initial state, spawns an actor and returns it."}
- (All [s m] (-> s (Behavior s m) (IO (Actor s m))))
- (io (let [[step on-death] behavior
- mailbox (stm;var (:! ($ +1) []))
- kill-signal (P;promise Unit)
- obituary (P;promise [(Maybe Text) ($ +0) (List ($ +1))])
- self {#mailbox mailbox
- #kill-signal kill-signal
- #obituary obituary}
- mailbox-chan (io;run (stm;follow mailbox))
- step (step self)
+ (All [s] (-> s (Behavior s) (IO (Actor s))))
+ (io (let [[step end] behavior
+ self (: (Actor ($ +0))
+ {#mailbox (stm;var (:! (Message ($ +0)) []))
+ #kill-switch (P;promise Unit)
+ #obituary (P;promise (Obituary ($ +0)))})
+ mailbox-chan (io;run (stm;follow (get@ #mailbox self)))
|mailbox| (stm;var mailbox-chan)
- _ (:: Monad<Promise> map
- (function [_]
- (io;run (do Monad<IO>
- [mb (stm;read! |mailbox|)]
- (frp;close mb))))
- kill-signal)
+ _ (P/map (function [_]
+ (io;run (do Monad<IO>
+ [mb (stm;read! |mailbox|)]
+ (frp;close mb))))
+ (get@ #kill-switch self))
process (loop [state init
messages mailbox-chan]
- (do Monad<Promise>
+ (do P;Monad<Promise>
[?messages+ messages]
(case ?messages+
- ## No kill-signal so far, so I may proceed...
+ ## No kill-switch so far, so I may proceed...
(#;Some [message messages'])
- (do Monad<Promise>
+ (do P;Monad<Promise>
[#let [_ (io;run (stm;write! messages' |mailbox|))]
- ?state' (step message state)]
+ ?state' (step message state self)]
(case ?state'
(#;Left error)
(do @
- [#let [_ (io;run (P;resolve [] kill-signal))
- _ (io;run (frp;close messages'))
- death-message (#;Some error)]
- _ (on-death death-message state)
+ [#let [_ (io;run (P;resolve [] (get@ #kill-switch self)))
+ _ (io;run (frp;close messages'))]
+ _ (end error state)
remaining-messages (frp;consume messages')]
- (wrap [death-message state (#;Cons message remaining-messages)]))
+ (wrap [error state (#;Cons message remaining-messages)]))
(#;Right state')
(recur state' messages')))
## Otherwise, clean-up and return current state.
#;None
- (do Monad<Promise>
+ (do P;Monad<Promise>
[#let [_ (io;run (frp;close messages))
- death-message #;None]
- _ (on-death death-message state)]
+ death-message (Killed "")]
+ _ (end death-message state)]
(wrap [death-message state (list)])))))]
self)))
-(def: #export poison
- {#;doc "Immediately kills the given actor (if it's not already dead)."}
- (All [s m] (-> (Actor s m) (io;IO Bool)))
- (|>. (get@ #kill-signal) (P;resolve [])))
-
(def: #export (alive? actor)
- (All [s m] (-> (Actor s m) Bool))
- (case [(P;poll (get@ #kill-signal actor))
+ (All [s] (-> (Actor s) Bool))
+ (case [(P;poll (get@ #kill-switch actor))
(P;poll (get@ #obituary actor))]
[#;None #;None]
true
@@ -101,76 +103,62 @@
(def: #export (send message actor)
{#;doc "Communicate with an actor through message passing."}
- (All [s m] (-> m (Actor s m) (P;Promise Bool)))
+ (All [s] (-> (Message s) (Actor s) (IO Bool)))
(if (alive? actor)
(exec (io;run (stm;write! message (get@ #mailbox actor)))
- (:: Monad<Promise> wrap true))
- (:: Monad<Promise> wrap false)))
+ (io/wrap true))
+ (io/wrap false)))
-(def: #export (keep-alive init behavior)
- {#;doc "Given initial-state and a behavior, spawns an actor that will reboot if it dies of errors.
+(def: #export (kill actor)
+ {#;doc "Immediately kills the given actor (if it is not already dead)."}
+ (All [s] (-> (Actor s) (io;IO Bool)))
+ (if (alive? actor)
+ (|> actor (get@ #kill-switch) (P;resolve []))
+ (io/wrap false)))
- However, if it is killed, it will not re-spawn."}
- (All [s m] (-> s (Behavior s m) (IO (Actor s m))))
- (io (let [ka-actor (: (Actor (Actor ($ +0) ($ +1)) ($ +1))
- (io;run (spawn (io;run (spawn init behavior))
- {#step (function [*self* message server]
- (do Monad<Promise>
- [was-sent? (send message server)]
- (if was-sent?
- (wrap (#;Right server))
- (do @
- [[?cause state unprocessed-messages] (get@ #obituary server)]
- (exec (log! (format "ACTOR DIED:" "\n"
- (default "" ?cause) "\n"
- "RESTARTING" "\n"))
- (do @
- [#let [new-server (io;run (spawn state behavior))
- mailbox (get@ #mailbox new-server)]
- _ (P;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))]
- (wrap (#;Right new-server))))
- ))))
- #end (function [_ server] (exec (io;run (poison server))
- (:: Monad<Promise> wrap [])))})))]
- (update@ #obituary (: (-> (P;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))])
- (P;Promise [(Maybe Text) ($ +0) (List ($ +1))]))
- (function [process]
- (do Monad<Promise>
- [[_ server unprocessed-messages-0] process
- [cause state unprocessed-messages-1] (get@ #obituary server)]
- (wrap [cause state (List/append unprocessed-messages-0 unprocessed-messages-1)]))))
- ka-actor))))
+(def: #export (poison actor)
+ {#;doc "Kills the actor by sending a message that will kill it upon processing,
+ but allows the actor to handle previous messages."}
+ (All [s] (-> (Actor s) (IO Bool)))
+ (send (function [state self]
+ (T;throw Poisoned ""))
+ actor))
## [Syntax]
(type: Method
{#name Text
#vars (List Text)
#args (List [Text Code])
+ #state Text
+ #self Text
#return Code
#body Code})
(def: method^
(Syntax Method)
(s;form (do p;Monad<Parser>
- [_ (s;this (' method:))
- vars (p;default (list) (s;tuple (p;some s;local-symbol)))
- [name args] (s;form ($_ p;seq
- s;local-symbol
- (p;many csr;typed-input)
- ))
+ [vars (p;default (list) (s;tuple (p;some s;local-symbol)))
+ [name args state self] (s;form ($_ p;seq
+ s;local-symbol
+ (p;some csr;typed-input)
+ s;local-symbol
+ s;local-symbol
+ ))
return s;any
body s;any]
(wrap {#name name
#vars vars
#args args
+ #state state
+ #self self
#return return
#body body}))))
(def: stop^
- (Syntax Code)
- (s;form (do p;Monad<Parser>
- [_ (s;this (' stop:))]
- s;any)))
+ (Syntax [[Text Text] Code])
+ (s;form (p;seq (s;tuple (p;seq s;local-symbol
+ s;local-symbol))
+ s;any)))
(def: actor-decl^
(Syntax [(List Text) Text (List [Text Code])])
@@ -178,105 +166,96 @@
(p;either (s;form (p;seq s;local-symbol (p;many csr;typed-input)))
(p;seq s;local-symbol (:: p;Monad<Parser> wrap (list))))))
-(def: (actor-def-decl [_vars _name _args] return-type)
- (-> [(List Text) Text (List [Text Code])] Code (List Code))
- (let [decl (` ((~ (code;symbol ["" (format _name "//new")])) (~@ (List/map (|>. product;left [""] code;symbol) _args))))
- base-type (` (-> (~@ (List/map product;right _args))
- (~ return-type)))
- type (case _vars
- #;Nil
- base-type
-
- _
- (` (All [(~@ (List/map (|>. [""] code;symbol) _vars))]
- (~ base-type))))]
- (list decl
- type)))
-
-(syntax: #export (actor: [_ex-lev csr;export]
- [(^@ decl [_vars _name _args]) actor-decl^]
+(syntax: #export (actor: [export csr;export]
+ [[_vars _name _args] actor-decl^]
state-type
- [methods (p;many method^)]
+ [methods (p;some method^)]
[?stop (p;opt stop^)])
- {#;doc (doc "Allows defining an actor, with a pice of state and a set of methods that can be called on it."
- "A method can access the actor's state through the *state* variable."
- "A method can also access the actor itself through the *self* variable."
+ {#;doc (doc "Defines an actor, with internal state and methods that can be called on it."
+ "A method can access the actor's state through the state parameter."
+ "A method can also access the actor itself through the self parameter."
"A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type."
"A method's output must be a promise containing a 2-tuple with the updated state and a return value."
"All methods are run implicitly within the Promise monad."
- (actor: #export Adder
+ (actor: #export Counter
Int
- (method: (count! [to-add Int])
- [Int Int]
- (if (i.>= 0 to-add)
- (let [new-state (i.+ to-add *state*)]
- (wrap (#;Right [new-state [*state* new-state]])))
- (wrap (#;Left "Cannot add negative numbers!"))))
+ ((count! [increment Int] state self)
+ [Int Int]
+ (if (i.>= 0 increment)
+ (let [state' (i.+ increment state)]
+ (T;return [state' [state state']]))
+ (T;fail "Cannot add negative numbers!")))
+
+ ([cause state]
+ (:: P;Monad<Promise> wrap
+ (log! (if (ex;match? ;;Killed cause)
+ (format "Counter was killed: " (%i state))
+ cause))))
))}
- (with-gensyms [g!message g!error g!return g!error g!output]
- (let [g!state-name (code;symbol ["" (format _name "//STATE")])
- g!protocol-name (code;symbol ["" (format _name "//PROTOCOL")])
- g!self (code;symbol ["" "*self*"])
- g!state (code;symbol ["" "*state*"])
- g!cause (code;symbol ["" "*cause*"])
- g!stop-body (default (` (:: P;Monad<Promise> (~' wrap) [])) ?stop)
- protocol (List/map (function [(^slots [#name #vars #args #return #body])]
- (` ((~ (code;tag ["" name])) [(~@ (List/map product;right args))] (P;Promise (~ return)))))
- methods)
- protocol-pm (List/map (: (-> Method [Code Code])
- (function [(^slots [#name #vars #args #return #body])]
- (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] code;symbol)))
- body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (P;Promise (R;Result [(~ g!state-name) (~ return)])))
- (function (~ (code;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] code;symbol) args))]
- (do P;Monad<Promise>
- []
- (~ body)))))]
- [(` [[(~@ arg-names)] (~ g!return)])
- (` (do P;Monad<Promise>
- [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))]
- (case (~ g!output)
- (#;Right [(~ g!state) (~ g!output)])
- (exec (io;run (P;resolve (~ g!output) (~ g!return)))
- ((~' wrap) (#;Right (~ g!state))))
+ (with-gensyms [g!message g!self g!state g!init g!error g!return g!output]
+ (let [g!state-type (code;symbol ["" (format _name "//Actor:State")])
+ g!behavior (code;symbol ["" (format _name "//Actor:Behavior")])
+ g!actor (code;symbol ["" _name])
+ g!methods (L/map (: (-> Method Code)
+ (function [(^slots [#name #vars #args #state #self #return #body])]
+ (let [g!method (code;symbol ["" name])
+ g!vars (L/map (|>. [""] code;symbol) vars)
+ g!var-refs (: (List Code)
+ (if (list;empty? vars)
+ (list)
+ (|> vars list;size n.dec
+ (list;n.range +0) (L/map (|>. code;nat (~) ($) (`))))))
+ g!args-names (L/map (|>. product;left [""] code;symbol) args)
+ g!arg-types (L/map product;right args)
+ g!state (code;symbol ["" state])
+ g!self (code;symbol ["" self])]
+ (` (def: (~@ (csw;export export)) ((~ g!method) (~@ g!args-names) (~ g!self))
+ (All [(~@ g!vars)]
+ (-> (~@ g!arg-types) (~ g!actor) (T;Task (~ return))))
+ (let [(~ g!output) (T;task (~ return))]
+ (exec (;;send (function [(~ g!state) (~ g!self)]
+ (do P;Monad<Promise>
+ [(~ g!return) (: (T;Task [((~ g!state-type) (~@ g!var-refs))
+ (~ return)])
+ (~ body))]
+ (case (~ g!return)
+ (#;Right [(~ g!state) (~ g!return)])
+ (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!output)))
+ (T;return (~ g!state)))
+
+ (#;Left (~ g!error))
+ (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!output)))
+ (T;fail (~ g!error))))))
+ (~ g!self))
+ (~ g!output))))))))
+ methods)
+ g!new (code;symbol ["" (format "new-" _name)])
+ g!vars (L/map (|>. [""] code;symbol) _vars)]
+ (wrap (list& (` (type: (~@ (csw;export export)) ((~ g!state-type) (~@ g!vars))
+ (~ state-type)))
+ (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars))
+ (;;Actor ((~ g!state-type) (~@ g!vars)))))
+ (` (def: (~@ (csw;export export)) (~ g!behavior)
+ (All [(~@ g!vars)]
+ (Behavior ((~ g!state-type) (~@ g!vars))))
+ {#step (function [(~' message) (~' state) (~' self)]
+ ((~' message) (~' state) (~' self)))
+ #end (~ (case ?stop
+ (#;Some [[cause state] body])
+ (let [g!cause (code;symbol ["" cause])
+ g!state (code;symbol ["" state])]
+ (` (function [(~ g!cause) (~ g!state)]
+ (do P;Monad<Promise>
+ []
+ (~ body)))))
- (#;Left (~ g!error))
- ((~' wrap) (#;Left (~ g!error))))
- ))])))
- methods)
- g!behavior (` {#step (function [(~ g!self) (~ g!message) (~ g!state)]
- (case (~ g!message)
- (~@ (if (n.= +1 (list;size protocol-pm))
- (List/join (List/map (function [[pattern clause]]
- (list pattern clause))
- protocol-pm))
- (List/join (List/map (function [[method [pattern clause]]]
- (list (` ((~ (code;tag ["" (get@ #name method)])) (~ pattern)))
- clause))
- (list;zip2 methods protocol-pm)))))
- ))
- #end (function [(~ g!cause) (~ g!state)]
- (do P;Monad<Promise>
- []
- (~ g!stop-body)))})
- g!actor-name (code;symbol ["" _name])
- g!methods (List/map (: (-> Method Code)
- (function [(^slots [#name #vars #args #return #body])]
- (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] code;symbol)))
- type (` (-> (~@ (List/map product;right args))
- (~ g!actor-name)
- (P;Promise (~ return))))]
- (` (def: (~@ (csw;export _ex-lev)) ((~ (code;symbol ["" name])) (~@ arg-names) (~ g!self))
- (~ type)
- (let [(~ g!output) (P;promise (~ return))]
- (exec (send ((~ (code;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self))
- (~ g!output))))))))
- methods)]
- (wrap (list& (` (type: (~@ (csw;export _ex-lev)) (~ g!state-name) (~ state-type)))
- (` (type: (~@ (csw;export _ex-lev)) (~ g!protocol-name) (~@ protocol)))
- (` (type: (~@ (csw;export _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name))))
- (` (def: (~@ (csw;export _ex-lev)) (~@ (actor-def-decl decl (` (Behavior (~ g!state-name) (~ g!protocol-name)))))
- (~ g!behavior)))
+ #;None
+ (` (:: P;Monad<Promise> (~' wrap) []))))}))
+ (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init))
+ (All [(~@ g!vars)]
+ (-> ((~ g!state-type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars)))))
+ (;;spawn (~ g!init) (~ g!behavior))))
g!methods))
)))