aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--stdlib/source/lux/concurrency/actor.lux338
-rw-r--r--stdlib/test/test/lux/concurrency/actor.lux39
2 files changed, 232 insertions, 145 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index 0b9c1032f..91e4f7a4a 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -7,7 +7,7 @@
(data text/format
(coll [list "L/" Monoid<List> Monad<List>])
[product])
- [macro #+ with-gensyms]
+ [macro #+ with-gensyms Monad<Lux>]
(macro [code]
["s" syntax #+ syntax: Syntax]
(syntax ["cs" common]
@@ -41,14 +41,22 @@
(type: #export (Behavior s)
{#;doc "An actor's behavior when messages are received."}
- {#step (-> (Message s) s (Actor s) (T;Task s))
+ {#handle (-> (Message s) s (Actor s) (T;Task s))
#end (-> Text s (P;Promise Unit))})
## [Values]
-(def: #export (spawn init behavior)
+(def: #export (default-handle message state self)
+ (All [s] (-> (Message s) s (Actor s) (T;Task s)))
+ (message state self))
+
+(def: #export (default-end cause state)
+ (All [s] (-> Text s (P;Promise Unit)))
+ (P/wrap []))
+
+(def: #export (spawn behavior init)
{#;doc "Given a behavior and initial state, spawns an actor and returns it."}
- (All [s] (-> s (Behavior s) (IO (Actor s))))
- (io (let [[step end] behavior
+ (All [s] (-> (Behavior s) s (IO (Actor s))))
+ (io (let [[handle end] behavior
self (: (Actor ($ +0))
{#mailbox (stm;var (:! (Message ($ +0)) []))
#kill-switch (P;promise Unit)
@@ -69,7 +77,7 @@
(#;Some [message messages'])
(do P;Monad<Promise>
[#let [_ (io;run (stm;write! messages' |mailbox|))]
- ?state' (step message state self)]
+ ?state' (handle message state self)]
(case ?state'
(#;Left error)
(do @
@@ -125,137 +133,209 @@
actor))
## [Syntax]
-(type: Method
- {#name Text
- #vars (List Text)
- #args (List [Text Code])
- #state Text
- #self Text
- #return Code
- #body Code})
-
-(def: method^
- (Syntax Method)
- (s;form (do p;Monad<Parser>
- [vars (p;default (list) (s;tuple (p;some s;local-symbol)))
- [name args state self] (s;form ($_ p;seq
- s;local-symbol
- (p;some csr;typed-input)
- s;local-symbol
- s;local-symbol
- ))
- return s;any
- body s;any]
- (wrap {#name name
- #vars vars
- #args args
- #state state
- #self self
- #return return
- #body body}))))
-
-(def: stop^
- (Syntax [[Text Text] Code])
- (s;form (p;seq (s;tuple (p;seq s;local-symbol
- s;local-symbol))
- s;any)))
+(do-template [<with> <resolve> <tag> <desc>]
+ [(def: (<with> name)
+ (-> Ident cs;Annotations cs;Annotations)
+ (|>. (#;Cons [(ident-for <tag>)
+ (code;tag name)])))
+
+ (def: (<resolve> name)
+ (-> Ident (Lux Ident))
+ (do Monad<Lux>
+ [name (macro;normalize name)
+ [_ annotations _] (macro;find-def name)]
+ (case (macro;get-ident-ann (ident-for <tag>) annotations)
+ (#;Some actor-name)
+ (wrap actor-name)
+
+ _
+ (macro;fail (format "Definition is not " <desc> ".")))))]
+
+ [with-actor resolve-actor #;;actor "an actor"]
+ [with-message resolve-message #;;message "a message"]
+ )
(def: actor-decl^
- (Syntax [(List Text) Text (List [Text Code])])
- (p;seq (p;default (list) (s;tuple (p;some s;local-symbol)))
- (p;either (s;form (p;seq s;local-symbol (p;many csr;typed-input)))
- (p;seq s;local-symbol (:: p;Monad<Parser> wrap (list))))))
+ (Syntax [Text (List Text)])
+ (p;either (s;form (p;seq s;local-symbol (p;some s;local-symbol)))
+ (p;seq s;local-symbol (:: p;Monad<Parser> wrap (list)))))
+
+(do-template [<name> <desc>]
+ [(def: <name>
+ (-> Text Text)
+ (|>. (format <desc> "@")))]
+
+ [state-name "State"]
+ [behavior-name "Behavior"]
+ [new-name "new"]
+ )
+
+(type: HandleC
+ [[Text Text Text] Code])
+
+(type: StopC
+ [[Text Text] Code])
+
+(type: BehaviorC
+ [(Maybe HandleC) (Maybe StopC)])
+
+(def: behavior^
+ (s;Syntax BehaviorC)
+ (let [handle-args ($_ p;seq s;local-symbol s;local-symbol s;local-symbol)
+ stop-args ($_ p;seq s;local-symbol s;local-symbol)]
+ (p;seq (p;opt (s;form (p;seq (s;form (p;after (s;this (' handle)) handle-args))
+ s;any)))
+ (p;opt (s;form (p;seq (s;form (p;after (s;this (' stop)) stop-args))
+ s;any))))))
(syntax: #export (actor: [export csr;export]
- [[_vars _name _args] actor-decl^]
+ [[_name _vars] actor-decl^]
+ [annotations (p;default cs;empty-annotations csr;annotations)]
state-type
- [methods (p;some method^)]
- [?stop (p;opt stop^)])
- {#;doc (doc "Defines an actor, with internal state and methods that can be called on it."
- "A method can access the actor's state through the state parameter."
- "A method can also access the actor itself through the self parameter."
- "A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type."
- "A method's output must be a promise containing a 2-tuple with the updated state and a return value."
- "All methods are run implicitly within the Promise monad."
-
+ [[?handle ?stop] behavior^])
+ {#;doc (doc "Defines an actor, with its behavior and internal state."
(actor: #export Counter
- Int
+ Nat
- ((count! [increment Int] state self)
- [Int Int]
- (if (i.>= 0 increment)
- (let [state' (i.+ increment state)]
- (T;return [state' [state state']]))
- (T;fail "Cannot add negative numbers!")))
-
- ([cause state]
+ ((stop cause state)
(:: P;Monad<Promise> wrap
(log! (if (ex;match? ;;Killed cause)
- (format "Counter was killed: " (%i state))
- cause))))
- ))}
+ (format "Counter was killed: " (%n state))
+ cause)))))
+
+ (actor: #export (Stack a)
+ (List a)
+
+ ((handle message state self)
+ (do T;Monad<Task>
+ [#let [_ (log! "BEFORE")]
+ output (message state self)
+ #let [_ (log! "AFTER")]]
+ (wrap output)))))}
(with-gensyms [g!message g!self g!state g!init g!error g!return g!output]
- (let [g!state-type (code;symbol ["" (format _name "//Actor:State")])
- g!behavior (code;symbol ["" (format _name "//Actor:Behavior")])
- g!actor (code;symbol ["" _name])
- g!methods (L/map (: (-> Method Code)
- (function [(^slots [#name #vars #args #state #self #return #body])]
- (let [g!method (code;symbol ["" name])
- g!vars (L/map (|>. [""] code;symbol) vars)
- g!var-refs (: (List Code)
- (if (list;empty? vars)
- (list)
- (|> vars list;size n.dec
- (list;n.range +0) (L/map (|>. code;nat (~) ($) (`))))))
- g!args-names (L/map (|>. product;left [""] code;symbol) args)
- g!arg-types (L/map product;right args)
- g!state (code;symbol ["" state])
- g!self (code;symbol ["" self])]
- (` (def: (~@ (csw;export export)) ((~ g!method) (~@ g!args-names) (~ g!self))
- (All [(~@ g!vars)]
- (-> (~@ g!arg-types) (~ g!actor) (T;Task (~ return))))
- (let [(~ g!output) (T;task (~ return))]
- (exec (;;send (function [(~ g!state) (~ g!self)]
- (do P;Monad<Promise>
- [(~ g!return) (: (T;Task [((~ g!state-type) (~@ g!var-refs))
- (~ return)])
- (~ body))]
- (case (~ g!return)
- (#;Right [(~ g!state) (~ g!return)])
- (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!output)))
- (T;return (~ g!state)))
-
- (#;Left (~ g!error))
- (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!output)))
- (T;fail (~ g!error))))))
- (~ g!self))
- (~ g!output))))))))
- methods)
- g!new (code;symbol ["" (format "new-" _name)])
- g!vars (L/map (|>. [""] code;symbol) _vars)]
- (wrap (list& (` (type: (~@ (csw;export export)) ((~ g!state-type) (~@ g!vars))
- (~ state-type)))
- (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars))
- (;;Actor ((~ g!state-type) (~@ g!vars)))))
- (` (def: (~@ (csw;export export)) (~ g!behavior)
- (All [(~@ g!vars)]
- (Behavior ((~ g!state-type) (~@ g!vars))))
- {#step (function [(~' message) (~' state) (~' self)]
- ((~' message) (~' state) (~' self)))
- #end (~ (case ?stop
- (#;Some [[cause state] body])
- (let [g!cause (code;symbol ["" cause])
- g!state (code;symbol ["" state])]
- (` (function [(~ g!cause) (~ g!state)]
- (do P;Monad<Promise>
- []
- (~ body)))))
-
- #;None
- (` (:: P;Monad<Promise> (~' wrap) []))))}))
- (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init))
- (All [(~@ g!vars)]
- (-> ((~ g!state-type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars)))))
- (;;spawn (~ g!init) (~ g!behavior))))
- g!methods))
+ (do @
+ [module macro;current-module-name
+ #let [g!type (code;local-symbol (state-name _name))
+ g!behavior (code;local-symbol (behavior-name _name))
+ g!actor (code;local-symbol _name)
+ g!new (code;local-symbol (new-name _name))
+ g!vars (L/map code;local-symbol _vars)]]
+ (wrap (list (` (type: (~@ (csw;export export)) ((~ g!type) (~@ g!vars))
+ (~ state-type)))
+ (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars))
+ (~ (|> annotations
+ (with-actor [module _name])
+ csw;annotations))
+ (;;Actor ((~ g!type) (~@ g!vars)))))
+ (` (def: (~@ (csw;export export)) (~ g!behavior)
+ (All [(~@ g!vars)]
+ (;;Behavior ((~ g!type) (~@ g!vars))))
+ {#;;handle (~ (case ?handle
+ #;None
+ (` ;;default-handle)
+
+ (#;Some [[messageN stateN selfN] bodyC])
+ (` (function [(~ (code;local-symbol messageN))
+ (~ (code;local-symbol stateN))
+ (~ (code;local-symbol selfN))]
+ (do T;Monad<Task>
+ []
+ (~ bodyC))))))
+ #;;end (~ (case ?stop
+ #;None
+ (` ;;default-handle)
+
+ (#;Some [[causeN stateN] bodyC])
+ (` (function [(~ (code;local-symbol causeN))
+ (~ (code;local-symbol stateN))]
+ (do P;Monad<Promise>
+ []
+ (~ bodyC))))))}))
+ (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init))
+ (All [(~@ g!vars)]
+ (-> ((~ g!type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars)))))
+ (;;spawn (~ g!behavior) (~ g!init))))))
+ )))
+
+(type: Signature
+ {#vars (List Text)
+ #name Text
+ #inputs (List [Text Code])
+ #state Text
+ #self Text
+ #output Code})
+
+(def: signature^
+ (s;Syntax Signature)
+ (s;form ($_ p;seq
+ (p;default (list) (s;tuple (p;some s;local-symbol)))
+ s;local-symbol
+ (p;some csr;typed-input)
+ s;local-symbol
+ s;local-symbol
+ s;any)))
+
+(def: reference^
+ (s;Syntax [Ident (List Text)])
+ (p;either (s;form (p;seq s;symbol (p;some s;local-symbol)))
+ (p;seq s;symbol (:: p;Monad<Parser> wrap (list)))))
+
+(syntax: #export (message: [export csr;export] [[actor-name actor-vars] reference^]
+ [signature signature^]
+ [annotations (p;default cs;empty-annotations csr;annotations)]
+ body)
+ {#;doc (doc "A message can access the actor's state through the state parameter."
+ "A message can also access the actor itself through the self parameter."
+ "A message's output must be a task containing a 2-tuple with the updated state and a return value."
+ "A message may succeed or fail (in case of failure, the actor dies)."
+
+ (message: #export Counter
+ (count! [increment Nat] state self Nat)
+ (let [state' (n.+ increment state)]
+ (T;return [state' state'])))
+
+ (message: #export (Stack a)
+ (push [value a] state self (List a))
+ (let [state' (#;Cons value state)]
+ (T;return [state' state']))))}
+ (with-gensyms [g!return g!error g!task]
+ (do @
+ [actor-name (resolve-actor actor-name)
+ #let [g!type (code;symbol (product;both id state-name actor-name))
+ g!message (code;local-symbol (get@ #name signature))
+ g!refs (: (List Code)
+ (if (list;empty? actor-vars)
+ (list)
+ (|> actor-vars list;size n.dec
+ (list;n.range +0) (L/map (|>. code;nat (~) ($) (`))))))
+ g!actor (code;symbol actor-name)
+ g!tvars (|> (get@ #vars signature) (L/append actor-vars) (L/map code;local-symbol))
+ g!inputsC (|> (get@ #inputs signature) (L/map (|>. product;left code;local-symbol)))
+ g!inputsT (|> (get@ #inputs signature) (L/map product;right))
+ g!outputT (get@ #output signature)
+ g!state (|> signature (get@ #state) code;local-symbol)
+ g!self (|> signature (get@ #self) code;local-symbol)]]
+ (wrap (list (` (def: (~@ (csw;export export)) ((~ g!message) (~@ g!inputsC) (~ g!self))
+ (~ (|> annotations
+ (with-message actor-name)
+ csw;annotations))
+ (All [(~@ g!tvars)] (-> (~@ g!inputsT) (~ g!actor) (T;Task (~ g!outputT))))
+ (let [(~ g!task) (T;task (~ g!outputT))]
+ (exec (;;send (function [(~ g!state) (~ g!self)]
+ (do P;Monad<Promise>
+ [(~ g!return) (: (T;Task [((~ g!type) (~@ g!refs))
+ (~ g!outputT)])
+ (~ body))]
+ (case (~ g!return)
+ (#;Right [(~ g!state) (~ g!return)])
+ (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task)))
+ (T;return (~ g!state)))
+
+ (#;Left (~ g!error))
+ (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task)))
+ (T;fail (~ g!error))))
+ ))
+ (~ g!self))
+ (~ g!task)))))
+ ))
)))
diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux
index 8ec792baf..a0e51cb09 100644
--- a/stdlib/test/test/lux/concurrency/actor.lux
+++ b/stdlib/test/test/lux/concurrency/actor.lux
@@ -8,46 +8,53 @@
["R" result])
(concurrency ["P" promise "P/" Monad<Promise>]
["T" task]
- ["&" actor #+ actor:]))
+ ["&" actor #+ actor: message:]))
lux/test)
(actor: Counter
Nat
- ((count! state self)
- Nat
- (let [state' (n.inc state)]
- (T;return [state' state'])))
+ ((handle message state self)
+ (do T;Monad<Task>
+ [#let [_ (log! "BEFORE")]
+ output (message state self)
+ #let [_ (log! "AFTER")]]
+ (wrap output)))
- ([cause state]
+ ((stop cause state)
(P/wrap (log! (if (ex;match? &;Killed cause)
(format "Counter was killed: " (%n state))
cause)))))
+(message: #export Counter
+ (count! [increment Nat] state self Nat)
+ (let [state' (n.+ increment state)]
+ (T;return [state' state'])))
+
(context: "Actors"
($_ seq
- (test "Can check where an actor is alive."
+ (test "Can check if an actor is alive."
(io;run (do Monad<IO>
- [counter (new-Counter +0)]
+ [counter (new@Counter +0)]
(wrap (&;alive? counter)))))
(test "Can kill actors."
(io;run (do Monad<IO>
- [counter (new-Counter +0)
+ [counter (new@Counter +0)
killed? (&;kill counter)]
(wrap (and killed?
(not (&;alive? counter)))))))
(test "Can poison actors."
(io;run (do Monad<IO>
- [counter (new-Counter +0)
+ [counter (new@Counter +0)
poisoned? (&;poison counter)]
(wrap (and poisoned?
(not (&;alive? counter)))))))
(test "Cannot kill an already dead actor."
(io;run (do Monad<IO>
- [counter (new-Counter +0)
+ [counter (new@Counter +0)
first-time (&;kill counter)
second-time (&;kill counter)]
(wrap (and first-time
@@ -55,7 +62,7 @@
(test "Cannot poison an already dead actor."
(io;run (do Monad<IO>
- [counter (new-Counter +0)
+ [counter (new@Counter +0)
first-time (&;kill counter)
second-time (&;poison counter)]
(wrap (and first-time
@@ -63,10 +70,10 @@
(do P;Monad<Promise>
[result (do T;Monad<Task>
- [#let [counter (io;run (new-Counter +0))]
- output-1 (count! counter)
- output-2 (count! counter)
- output-3 (count! counter)]
+ [#let [counter (io;run (new@Counter +0))]
+ output-1 (count! +1 counter)
+ output-2 (count! +1 counter)
+ output-3 (count! +1 counter)]
(wrap (and (n.= +1 output-1)
(n.= +2 output-2)
(n.= +3 output-3))))]