aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEduardo Julian2017-08-03 19:44:43 -0400
committerEduardo Julian2017-08-03 19:44:43 -0400
commit0c3c53a714019a543e70e0751387b5dfccc3b872 (patch)
tree5cd19f0e5d5d2aa52e2899cd3f9f501dffd9b281
parentae8306fe81376eefb7416a1d5c6b8d2ed3cd8f6c (diff)
- Some refactoring and small bug fixes.
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux229
1 files changed, 121 insertions, 108 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index dd34a7ae8..94dc81e48 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -13,7 +13,8 @@
(syntax ["cs" common]
(common ["csr" reader]
["csw" writer])))
- [type])
+ [type]
+ (type model))
(.. ["A" atom]
["P" promise "P/" Monad<Promise>]
["T" task]
@@ -22,27 +23,102 @@
(exception: #export Poisoned)
(exception: #export Killed)
+(exception: #export Dead)
## [Types]
(with-expansions
[<Message> (as-is (-> s (Actor s) (T;Task s)))
<Obituary> (as-is [Text s (List <Message>)])]
- (type: #export (Actor s)
+ (model: #export (Actor s)
{#;doc "An actor, defined as all the necessities it requires."}
{#mailbox (stm;Var <Message>)
#kill-switch (P;Promise Unit)
- #obituary (P;Promise <Obituary>)})
-
- (type: #export (Message s)
- <Message>)
-
- (type: #export (Obituary s)
- <Obituary>))
-
-(type: #export (Behavior s)
- {#;doc "An actor's behavior when messages are received."}
- {#handle (-> (Message s) s (Actor s) (T;Task s))
- #end (-> Text s (P;Promise Unit))})
+ #obituary (P;Promise <Obituary>)}
+
+ (type: #export (Message s)
+ <Message>)
+
+ (type: #export (Obituary s)
+ <Obituary>)
+
+ (type: #export (Behavior s)
+ {#;doc "An actor's behavior when messages are received."}
+ {#handle (-> (Message s) s (Actor s) (T;Task s))
+ #end (-> Text s (P;Promise Unit))})
+
+ (def: #export (spawn behavior init)
+ {#;doc "Given a behavior and initial state, spawns an actor and returns it."}
+ (All [s] (-> (Behavior s) s (IO (Actor s))))
+ (io (let [[handle end] behavior
+ self (: (Actor ($ +0))
+ (@model {#mailbox (stm;var (:! (Message ($ +0)) []))
+ #kill-switch (P;promise Unit)
+ #obituary (P;promise (Obituary ($ +0)))}))
+ mailbox-channel (io;run (stm;follow (get@ #mailbox (@repr self))))
+ |mailbox| (stm;var mailbox-channel)
+ _ (P/map (function [_]
+ (io;run (do Monad<IO>
+ [mb (stm;read! |mailbox|)]
+ (frp;close mb))))
+ (get@ #kill-switch (@repr self)))
+ process (loop [state init
+ messages mailbox-channel]
+ (do P;Monad<Promise>
+ [?messages+ messages]
+ (case ?messages+
+ ## No kill-switch so far, so I may proceed...
+ (#;Some [message messages'])
+ (do P;Monad<Promise>
+ [#let [_ (io;run (stm;write! messages' |mailbox|))]
+ ?state' (handle message state self)]
+ (case ?state'
+ (#;Left error)
+ (do @
+ [#let [_ (io;run (do Monad<IO>
+ [_ (P;resolve [] (get@ #kill-switch (@repr self)))]
+ (frp;close messages')))]
+ _ (end error state)
+ remaining-messages (frp;consume messages')]
+ (wrap [error state (#;Cons message remaining-messages)]))
+
+ (#;Right state')
+ (recur state' messages')))
+
+ ## Otherwise, clean-up and return current state.
+ #;None
+ (do P;Monad<Promise>
+ [#let [_ (io;run (frp;close messages))
+ death-message (Killed "")]
+ _ (end death-message state)]
+ (wrap [death-message state (list)])))))]
+ self)))
+
+ (def: #export (alive? actor)
+ (All [s] (-> (Actor s) Bool))
+ (case [(P;poll (get@ #kill-switch (@repr actor)))
+ (P;poll (get@ #obituary (@repr actor)))]
+ [#;None #;None]
+ true
+
+ _
+ false))
+
+ (def: #export (send message actor)
+ {#;doc "Communicate with an actor through message passing."}
+ (All [s] (-> (Message s) (Actor s) (IO Bool)))
+ (if (alive? actor)
+ (do Monad<IO>
+ [_ (stm;write! message (get@ #mailbox (@repr actor)))]
+ (wrap true))
+ (io/wrap false)))
+
+ (def: #export (kill actor)
+ {#;doc "Immediately kills the given actor (if it is not already dead)."}
+ (All [s] (-> (Actor s) (io;IO Bool)))
+ (if (alive? actor)
+ (|> actor @repr (get@ #kill-switch) (P;resolve []))
+ (io/wrap false)))
+ ))
## [Values]
(def: #export (default-handle message state self)
@@ -53,76 +129,10 @@
(All [s] (-> Text s (P;Promise Unit)))
(P/wrap []))
-(def: #export (spawn behavior init)
- {#;doc "Given a behavior and initial state, spawns an actor and returns it."}
- (All [s] (-> (Behavior s) s (IO (Actor s))))
- (io (let [[handle end] behavior
- self (: (Actor ($ +0))
- {#mailbox (stm;var (:! (Message ($ +0)) []))
- #kill-switch (P;promise Unit)
- #obituary (P;promise (Obituary ($ +0)))})
- mailbox-channel (io;run (stm;follow (get@ #mailbox self)))
- |mailbox| (stm;var mailbox-channel)
- _ (P/map (function [_]
- (io;run (do Monad<IO>
- [mb (stm;read! |mailbox|)]
- (frp;close mb))))
- (get@ #kill-switch self))
- process (loop [state init
- messages mailbox-channel]
- (do P;Monad<Promise>
- [?messages+ messages]
- (case ?messages+
- ## No kill-switch so far, so I may proceed...
- (#;Some [message messages'])
- (do P;Monad<Promise>
- [#let [_ (io;run (stm;write! messages' |mailbox|))]
- ?state' (handle message state self)]
- (case ?state'
- (#;Left error)
- (do @
- [#let [_ (io;run (P;resolve [] (get@ #kill-switch self)))
- _ (io;run (frp;close messages'))]
- _ (end error state)
- remaining-messages (frp;consume messages')]
- (wrap [error state (#;Cons message remaining-messages)]))
-
- (#;Right state')
- (recur state' messages')))
-
- ## Otherwise, clean-up and return current state.
- #;None
- (do P;Monad<Promise>
- [#let [_ (io;run (frp;close messages))
- death-message (Killed "")]
- _ (end death-message state)]
- (wrap [death-message state (list)])))))]
- self)))
-
-(def: #export (alive? actor)
- (All [s] (-> (Actor s) Bool))
- (case [(P;poll (get@ #kill-switch actor))
- (P;poll (get@ #obituary actor))]
- [#;None #;None]
- true
-
- _
- false))
-
-(def: #export (send message actor)
- {#;doc "Communicate with an actor through message passing."}
- (All [s] (-> (Message s) (Actor s) (IO Bool)))
- (if (alive? actor)
- (exec (io;run (stm;write! message (get@ #mailbox actor)))
- (io/wrap true))
- (io/wrap false)))
-
-(def: #export (kill actor)
- {#;doc "Immediately kills the given actor (if it is not already dead)."}
- (All [s] (-> (Actor s) (io;IO Bool)))
- (if (alive? actor)
- (|> actor (get@ #kill-switch) (P;resolve []))
- (io/wrap false)))
+(def: #export default-behavior
+ (All [s] (Behavior s))
+ {#handle default-handle
+ #end default-end})
(def: #export (poison actor)
{#;doc "Kills the actor by sending a message that will kill it upon processing,
@@ -134,12 +144,12 @@
## [Syntax]
(do-template [<with> <resolve> <tag> <desc>]
- [(def: (<with> name)
+ [(def: #hidden (<with> name)
(-> Ident cs;Annotations cs;Annotations)
(|>. (#;Cons [(ident-for <tag>)
(code;tag name)])))
- (def: (<resolve> name)
+ (def: #hidden (<resolve> name)
(-> Ident (Lux Ident))
(do Monad<Lux>
[name (macro;normalize name)
@@ -161,7 +171,7 @@
(p;seq s;local-symbol (:: p;Monad<Parser> wrap (list)))))
(do-template [<name> <desc>]
- [(def: <name>
+ [(def: #hidden <name>
(-> Text Text)
(|>. (format <desc> "@")))]
@@ -202,7 +212,7 @@
(log! (if (ex;match? ;;Killed cause)
(format "Counter was killed: " (%n state))
cause)))))
-
+
(actor: #export (Stack a)
(List a)
@@ -243,7 +253,7 @@
(~ bodyC))))))
#;;end (~ (case ?stop
#;None
- (` ;;default-handle)
+ (` ;;default-end)
(#;Some [[causeN stateN] bodyC])
(` (function [(~ (code;local-symbol causeN))
@@ -298,7 +308,7 @@
(push [value a] state self (List a))
(let [state' (#;Cons value state)]
(T;return [state' state']))))}
- (with-gensyms [g!return g!error g!task]
+ (with-gensyms [g!return g!error g!task g!sent?]
(do @
[actor-name (resolve-actor actor-name)
#let [g!type (code;symbol (product;both id state-name actor-name))
@@ -321,23 +331,26 @@
csw;annotations))
(All [(~@ g!tvars)] (-> (~@ g!inputsT) (~ g!actor) (T;Task (~ g!outputT))))
(let [(~ g!task) (T;task (~ g!outputT))]
- (exec (;;send (function [(~ g!state) (~ g!self)]
- (do P;Monad<Promise>
- [(~ g!return) (: (T;Task [((~ g!type) (~@ g!refs))
- (~ g!outputT)])
- (do T;Monad<Task>
- []
- (~ body)))]
- (case (~ g!return)
- (#;Right [(~ g!state) (~ g!return)])
- (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task)))
- (T;return (~ g!state)))
-
- (#;Left (~ g!error))
- (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task)))
- (T;fail (~ g!error))))
- ))
- (~ g!self))
- (~ g!task)))))
+ (io;run (do io;Monad<IO>
+ [(~ g!sent?) (;;send (function [(~ g!state) (~ g!self)]
+ (do P;Monad<Promise>
+ [(~ g!return) (: (T;Task [((~ g!type) (~@ g!refs))
+ (~ g!outputT)])
+ (do T;Monad<Task>
+ []
+ (~ body)))]
+ (case (~ g!return)
+ (#;Right [(~ g!state) (~ g!return)])
+ (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task)))
+ (T;return (~ g!state)))
+
+ (#;Left (~ g!error))
+ (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task)))
+ (T;fail (~ g!error))))
+ ))
+ (~ g!self))]
+ (if (~ g!sent?)
+ ((~' wrap) (~ g!task))
+ ((~' wrap) (T;throw ;;Dead ""))))))))
))
)))