aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/actor.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/concurrency/actor.lux')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux302
1 files changed, 151 insertions, 151 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index abda284c0..75bbf15d2 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -1,4 +1,4 @@
-(;module: {#;doc "The actor model of concurrency."}
+(.module: {#.doc "The actor model of concurrency."}
lux
(lux (control monad
["p" parser]
@@ -27,13 +27,13 @@
## [Types]
(with-expansions
- [<Message> (as-is (-> s (Actor s) (T;Task s)))
+ [<Message> (as-is (-> s (Actor s) (T.Task s)))
<Obituary> (as-is [Text s (List <Message>)])]
(opaque: #export (Actor s)
- {#;doc "An actor, defined as all the necessities it requires."}
- {#mailbox (stm;Var <Message>)
- #kill-switch (P;Promise Unit)
- #obituary (P;Promise <Obituary>)}
+ {#.doc "An actor, defined as all the necessities it requires."}
+ {#mailbox (stm.Var <Message>)
+ #kill-switch (P.Promise Unit)
+ #obituary (P.Promise <Obituary>)}
(type: #export (Message s)
<Message>)
@@ -42,52 +42,52 @@
<Obituary>)
(type: #export (Behavior s)
- {#;doc "An actor's behavior when messages are received."}
- {#handle (-> (Message s) s (Actor s) (T;Task s))
- #end (-> Text s (P;Promise Unit))})
+ {#.doc "An actor's behavior when messages are received."}
+ {#handle (-> (Message s) s (Actor s) (T.Task s))
+ #end (-> Text s (P.Promise Unit))})
(def: #export (spawn behavior init)
- {#;doc "Given a behavior and initial state, spawns an actor and returns it."}
+ {#.doc "Given a behavior and initial state, spawns an actor and returns it."}
(All [s] (-> (Behavior s) s (IO (Actor s))))
(io (let [[handle end] behavior
self (: (Actor ($ +0))
- (@opaque {#mailbox (stm;var (:! (Message ($ +0)) []))
- #kill-switch (P;promise Unit)
- #obituary (P;promise (Obituary ($ +0)))}))
- mailbox-channel (io;run (stm;follow (get@ #mailbox (@repr self))))
- |mailbox| (stm;var mailbox-channel)
+ (@opaque {#mailbox (stm.var (:! (Message ($ +0)) []))
+ #kill-switch (P.promise Unit)
+ #obituary (P.promise (Obituary ($ +0)))}))
+ mailbox-channel (io.run (stm.follow (get@ #mailbox (@repr self))))
+ |mailbox| (stm.var mailbox-channel)
_ (P/map (function [_]
- (io;run (do Monad<IO>
- [mb (stm;read! |mailbox|)]
- (frp;close mb))))
+ (io.run (do Monad<IO>
+ [mb (stm.read! |mailbox|)]
+ (frp.close mb))))
(get@ #kill-switch (@repr self)))
process (loop [state init
messages mailbox-channel]
- (do P;Monad<Promise>
+ (do P.Monad<Promise>
[?messages+ messages]
(case ?messages+
## No kill-switch so far, so I may proceed...
- (#;Some [message messages'])
- (do P;Monad<Promise>
- [#let [_ (io;run (stm;write! messages' |mailbox|))]
+ (#.Some [message messages'])
+ (do P.Monad<Promise>
+ [#let [_ (io.run (stm.write! messages' |mailbox|))]
?state' (handle message state self)]
(case ?state'
- (#;Left error)
+ (#.Left error)
(do @
- [#let [_ (io;run (do Monad<IO>
- [_ (P;resolve [] (get@ #kill-switch (@repr self)))]
- (frp;close messages')))]
+ [#let [_ (io.run (do Monad<IO>
+ [_ (P.resolve [] (get@ #kill-switch (@repr self)))]
+ (frp.close messages')))]
_ (end error state)
- remaining-messages (frp;consume messages')]
- (wrap [error state (#;Cons message remaining-messages)]))
+ remaining-messages (frp.consume messages')]
+ (wrap [error state (#.Cons message remaining-messages)]))
- (#;Right state')
+ (#.Right state')
(recur state' messages')))
## Otherwise, clean-up and return current state.
- #;None
- (do P;Monad<Promise>
- [#let [_ (io;run (frp;close messages))
+ #.None
+ (do P.Monad<Promise>
+ [#let [_ (io.run (frp.close messages))
death-message (Killed "")]
_ (end death-message state)]
(wrap [death-message state (list)])))))]
@@ -95,38 +95,38 @@
(def: #export (alive? actor)
(All [s] (-> (Actor s) Bool))
- (case [(P;poll (get@ #kill-switch (@repr actor)))
- (P;poll (get@ #obituary (@repr actor)))]
- [#;None #;None]
+ (case [(P.poll (get@ #kill-switch (@repr actor)))
+ (P.poll (get@ #obituary (@repr actor)))]
+ [#.None #.None]
true
_
false))
(def: #export (send message actor)
- {#;doc "Communicate with an actor through message passing."}
+ {#.doc "Communicate with an actor through message passing."}
(All [s] (-> (Message s) (Actor s) (IO Bool)))
(if (alive? actor)
(do Monad<IO>
- [_ (stm;write! message (get@ #mailbox (@repr actor)))]
+ [_ (stm.write! message (get@ #mailbox (@repr actor)))]
(wrap true))
(io/wrap false)))
(def: #export (kill actor)
- {#;doc "Immediately kills the given actor (if it is not already dead)."}
- (All [s] (-> (Actor s) (io;IO Bool)))
+ {#.doc "Immediately kills the given actor (if it is not already dead)."}
+ (All [s] (-> (Actor s) (io.IO Bool)))
(if (alive? actor)
- (|> actor @repr (get@ #kill-switch) (P;resolve []))
+ (|> actor @repr (get@ #kill-switch) (P.resolve []))
(io/wrap false)))
))
## [Values]
(def: #export (default-handle message state self)
- (All [s] (-> (Message s) s (Actor s) (T;Task s)))
+ (All [s] (-> (Message s) s (Actor s) (T.Task s)))
(message state self))
(def: #export (default-end cause state)
- (All [s] (-> Text s (P;Promise Unit)))
+ (All [s] (-> Text s (P.Promise Unit)))
(P/wrap []))
(def: #export default-behavior
@@ -135,39 +135,39 @@
#end default-end})
(def: #export (poison actor)
- {#;doc "Kills the actor by sending a message that will kill it upon processing,
+ {#.doc "Kills the actor by sending a message that will kill it upon processing,
but allows the actor to handle previous messages."}
(All [s] (-> (Actor s) (IO Bool)))
(send (function [state self]
- (T;throw Poisoned ""))
+ (T.throw Poisoned ""))
actor))
## [Syntax]
(do-template [<with> <resolve> <tag> <desc>]
[(def: #hidden (<with> name)
- (-> Ident cs;Annotations cs;Annotations)
- (|>> (#;Cons [(ident-for <tag>)
- (code;tag name)])))
+ (-> Ident cs.Annotations cs.Annotations)
+ (|>> (#.Cons [(ident-for <tag>)
+ (code.tag name)])))
(def: #hidden (<resolve> name)
(-> Ident (Meta Ident))
(do Monad<Meta>
- [[_ annotations _] (macro;find-def name)]
- (case (macro;get-tag-ann (ident-for <tag>) annotations)
- (#;Some actor-name)
+ [[_ annotations _] (macro.find-def name)]
+ (case (macro.get-tag-ann (ident-for <tag>) annotations)
+ (#.Some actor-name)
(wrap actor-name)
_
- (macro;fail (format "Definition is not " <desc> ".")))))]
+ (macro.fail (format "Definition is not " <desc> ".")))))]
- [with-actor resolve-actor #;;actor "an actor"]
- [with-message resolve-message #;;message "a message"]
+ [with-actor resolve-actor #..actor "an actor"]
+ [with-message resolve-message #..message "a message"]
)
(def: actor-decl^
(Syntax [Text (List Text)])
- (p;either (s;form (p;seq s;local-symbol (p;some s;local-symbol)))
- (p;seq s;local-symbol (:: p;Monad<Parser> wrap (list)))))
+ (p.either (s.form (p.seq s.local-symbol (p.some s.local-symbol)))
+ (p.seq s.local-symbol (:: p.Monad<Parser> wrap (list)))))
(do-template [<name> <desc>]
[(def: #hidden <name>
@@ -189,26 +189,26 @@
[(Maybe HandleC) (Maybe StopC)])
(def: behavior^
- (s;Syntax BehaviorC)
- (let [handle-args ($_ p;seq s;local-symbol s;local-symbol s;local-symbol)
- stop-args ($_ p;seq s;local-symbol s;local-symbol)]
- (p;seq (p;maybe (s;form (p;seq (s;form (p;after (s;this (' handle)) handle-args))
- s;any)))
- (p;maybe (s;form (p;seq (s;form (p;after (s;this (' stop)) stop-args))
- s;any))))))
-
-(syntax: #export (actor: [export csr;export]
+ (s.Syntax BehaviorC)
+ (let [handle-args ($_ p.seq s.local-symbol s.local-symbol s.local-symbol)
+ stop-args ($_ p.seq s.local-symbol s.local-symbol)]
+ (p.seq (p.maybe (s.form (p.seq (s.form (p.after (s.this (' handle)) handle-args))
+ s.any)))
+ (p.maybe (s.form (p.seq (s.form (p.after (s.this (' stop)) stop-args))
+ s.any))))))
+
+(syntax: #export (actor: [export csr.export]
[[_name _vars] actor-decl^]
- [annotations (p;default cs;empty-annotations csr;annotations)]
+ [annotations (p.default cs.empty-annotations csr.annotations)]
state-type
[[?handle ?stop] behavior^])
- {#;doc (doc "Defines an actor, with its behavior and internal state."
+ {#.doc (doc "Defines an actor, with its behavior and internal state."
(actor: #export Counter
Nat
((stop cause state)
- (:: P;Monad<Promise> wrap
- (log! (if (ex;match? ;;Killed cause)
+ (:: P.Monad<Promise> wrap
+ (log! (if (ex.match? ..Killed cause)
(format "Counter was killed: " (%n state))
cause)))))
@@ -216,54 +216,54 @@
(List a)
((handle message state self)
- (do T;Monad<Task>
+ (do T.Monad<Task>
[#let [_ (log! "BEFORE")]
output (message state self)
#let [_ (log! "AFTER")]]
(wrap output)))))}
(with-gensyms [g!message g!self g!state g!init g!error g!return g!output]
(do @
- [module macro;current-module-name
- #let [g!type (code;local-symbol (state-name _name))
- g!behavior (code;local-symbol (behavior-name _name))
- g!actor (code;local-symbol _name)
- g!new (code;local-symbol (new-name _name))
- g!vars (list/map code;local-symbol _vars)]]
- (wrap (list (` (type: (~@ (csw;export export)) ((~ g!type) (~@ g!vars))
+ [module macro.current-module-name
+ #let [g!type (code.local-symbol (state-name _name))
+ g!behavior (code.local-symbol (behavior-name _name))
+ g!actor (code.local-symbol _name)
+ g!new (code.local-symbol (new-name _name))
+ g!vars (list/map code.local-symbol _vars)]]
+ (wrap (list (` (type: (~@ (csw.export export)) ((~ g!type) (~@ g!vars))
(~ state-type)))
- (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars))
+ (` (type: (~@ (csw.export export)) ((~ g!actor) (~@ g!vars))
(~ (|> annotations
(with-actor [module _name])
- csw;annotations))
- (;;Actor ((~ g!type) (~@ g!vars)))))
- (` (def: (~@ (csw;export export)) (~ g!behavior)
+ csw.annotations))
+ (..Actor ((~ g!type) (~@ g!vars)))))
+ (` (def: (~@ (csw.export export)) (~ g!behavior)
(All [(~@ g!vars)]
- (;;Behavior ((~ g!type) (~@ g!vars))))
- {#;;handle (~ (case ?handle
- #;None
- (` ;;default-handle)
-
- (#;Some [[messageN stateN selfN] bodyC])
- (` (function [(~ (code;local-symbol messageN))
- (~ (code;local-symbol stateN))
- (~ (code;local-symbol selfN))]
- (do T;Monad<Task>
+ (..Behavior ((~ g!type) (~@ g!vars))))
+ {#..handle (~ (case ?handle
+ #.None
+ (` ..default-handle)
+
+ (#.Some [[messageN stateN selfN] bodyC])
+ (` (function [(~ (code.local-symbol messageN))
+ (~ (code.local-symbol stateN))
+ (~ (code.local-symbol selfN))]
+ (do T.Monad<Task>
[]
(~ bodyC))))))
- #;;end (~ (case ?stop
- #;None
- (` ;;default-end)
-
- (#;Some [[causeN stateN] bodyC])
- (` (function [(~ (code;local-symbol causeN))
- (~ (code;local-symbol stateN))]
- (do P;Monad<Promise>
+ #..end (~ (case ?stop
+ #.None
+ (` ..default-end)
+
+ (#.Some [[causeN stateN] bodyC])
+ (` (function [(~ (code.local-symbol causeN))
+ (~ (code.local-symbol stateN))]
+ (do P.Monad<Promise>
[]
(~ bodyC))))))}))
- (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init))
+ (` (def: (~@ (csw.export export)) ((~ g!new) (~ g!init))
(All [(~@ g!vars)]
- (-> ((~ g!type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars)))))
- (;;spawn (~ g!behavior) (~ g!init))))))
+ (-> ((~ g!type) (~@ g!vars)) (io.IO ((~ g!actor) (~@ g!vars)))))
+ (..spawn (~ g!behavior) (~ g!init))))))
)))
(type: Signature
@@ -275,25 +275,25 @@
#output Code})
(def: signature^
- (s;Syntax Signature)
- (s;form ($_ p;seq
- (p;default (list) (s;tuple (p;some s;local-symbol)))
- s;local-symbol
- (p;some csr;typed-input)
- s;local-symbol
- s;local-symbol
- s;any)))
+ (s.Syntax Signature)
+ (s.form ($_ p.seq
+ (p.default (list) (s.tuple (p.some s.local-symbol)))
+ s.local-symbol
+ (p.some csr.typed-input)
+ s.local-symbol
+ s.local-symbol
+ s.any)))
(def: reference^
- (s;Syntax [Ident (List Text)])
- (p;either (s;form (p;seq s;symbol (p;some s;local-symbol)))
- (p;seq s;symbol (:: p;Monad<Parser> wrap (list)))))
+ (s.Syntax [Ident (List Text)])
+ (p.either (s.form (p.seq s.symbol (p.some s.local-symbol)))
+ (p.seq s.symbol (:: p.Monad<Parser> wrap (list)))))
-(syntax: #export (message: [export csr;export] [[actor-name actor-vars] reference^]
+(syntax: #export (message: [export csr.export] [[actor-name actor-vars] reference^]
[signature signature^]
- [annotations (p;default cs;empty-annotations csr;annotations)]
+ [annotations (p.default cs.empty-annotations csr.annotations)]
body)
- {#;doc (doc "A message can access the actor's state through the state parameter."
+ {#.doc (doc "A message can access the actor's state through the state parameter."
"A message can also access the actor itself through the self parameter."
"A message's output must be a task containing a 2-tuple with the updated state and a return value."
"A message may succeed or fail (in case of failure, the actor dies)."
@@ -301,66 +301,66 @@
(message: #export Counter
(count! [increment Nat] state self Nat)
(let [state' (n/+ increment state)]
- (T;return [state' state'])))
+ (T.return [state' state'])))
(message: #export (Stack a)
(push [value a] state self (List a))
- (let [state' (#;Cons value state)]
- (T;return [state' state']))))}
+ (let [state' (#.Cons value state)]
+ (T.return [state' state']))))}
(with-gensyms [g!return g!error g!task g!sent?]
(do @
[actor-name (resolve-actor actor-name)
- #let [g!type (code;symbol (product;both id state-name actor-name))
- g!message (code;local-symbol (get@ #name signature))
- g!actor-vars (list/map code;local-symbol actor-vars)
- g!actor (` ((~ (code;symbol actor-name)) (~@ g!actor-vars)))
- g!all-vars (|> (get@ #vars signature) (list/map code;local-symbol) (list/compose g!actor-vars))
- g!inputsC (|> (get@ #inputs signature) (list/map (|>> product;left code;local-symbol)))
- g!inputsT (|> (get@ #inputs signature) (list/map product;right))
- g!state (|> signature (get@ #state) code;local-symbol)
- g!self (|> signature (get@ #self) code;local-symbol)
+ #let [g!type (code.symbol (product.both id state-name actor-name))
+ g!message (code.local-symbol (get@ #name signature))
+ g!actor-vars (list/map code.local-symbol actor-vars)
+ g!actor (` ((~ (code.symbol actor-name)) (~@ g!actor-vars)))
+ g!all-vars (|> (get@ #vars signature) (list/map code.local-symbol) (list/compose g!actor-vars))
+ g!inputsC (|> (get@ #inputs signature) (list/map (|>> product.left code.local-symbol)))
+ g!inputsT (|> (get@ #inputs signature) (list/map product.right))
+ g!state (|> signature (get@ #state) code.local-symbol)
+ g!self (|> signature (get@ #self) code.local-symbol)
g!actor-refs (: (List Code)
- (if (list;empty? actor-vars)
+ (if (list.empty? actor-vars)
(list)
- (|> actor-vars list;size n/dec
- (list;n/range +0) (list/map (|>> code;nat (~) ($) (`))))))
- ref-replacements (|> (if (list;empty? actor-vars)
+ (|> actor-vars list.size n/dec
+ (list.n/range +0) (list/map (|>> code.nat (~) ($) (`))))))
+ ref-replacements (|> (if (list.empty? actor-vars)
(list)
- (|> actor-vars list;size n/dec
- (list;n/range +0) (list/map (|>> code;nat (~) ($) (`)))))
+ (|> actor-vars list.size n/dec
+ (list.n/range +0) (list/map (|>> code.nat (~) ($) (`)))))
(: (List Code))
- (list;zip2 g!all-vars)
+ (list.zip2 g!all-vars)
(: (List [Code Code])))
g!outputT (list/fold (function [[g!var g!ref] outputT]
- (code;replace g!var g!ref outputT))
+ (code.replace g!var g!ref outputT))
(get@ #output signature)
ref-replacements)]]
- (wrap (list (` (def: (~@ (csw;export export)) ((~ g!message) (~@ g!inputsC) (~ g!self))
+ (wrap (list (` (def: (~@ (csw.export export)) ((~ g!message) (~@ g!inputsC) (~ g!self))
(~ (|> annotations
(with-message actor-name)
- csw;annotations))
- (All [(~@ g!all-vars)] (-> (~@ g!inputsT) (~ g!actor) (T;Task (~ (get@ #output signature)))))
- (let [(~ g!task) (T;task (~ g!outputT))]
- (io;run (do io;Monad<IO>
- [(~ g!sent?) (;;send (function [(~ g!state) (~ g!self)]
- (do P;Monad<Promise>
- [(~ g!return) (: (T;Task [((~ g!type) (~@ g!actor-refs))
+ csw.annotations))
+ (All [(~@ g!all-vars)] (-> (~@ g!inputsT) (~ g!actor) (T.Task (~ (get@ #output signature)))))
+ (let [(~ g!task) (T.task (~ g!outputT))]
+ (io.run (do io.Monad<IO>
+ [(~ g!sent?) (..send (function [(~ g!state) (~ g!self)]
+ (do P.Monad<Promise>
+ [(~ g!return) (: (T.Task [((~ g!type) (~@ g!actor-refs))
(~ g!outputT)])
- (do T;Monad<Task>
+ (do T.Monad<Task>
[]
(~ body)))]
(case (~ g!return)
- (#;Right [(~ g!state) (~ g!return)])
- (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task)))
- (T;return (~ g!state)))
+ (#.Right [(~ g!state) (~ g!return)])
+ (exec (io.run (P.resolve (#.Right (~ g!return)) (~ g!task)))
+ (T.return (~ g!state)))
- (#;Left (~ g!error))
- (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task)))
- (T;fail (~ g!error))))
+ (#.Left (~ g!error))
+ (exec (io.run (P.resolve (#.Left (~ g!error)) (~ g!task)))
+ (T.fail (~ g!error))))
))
(~ g!self))]
(if (~ g!sent?)
((~' wrap) (~ g!task))
- ((~' wrap) (T;throw ;;Dead ""))))))))
+ ((~' wrap) (T.throw ..Dead ""))))))))
))
)))