diff options
Diffstat (limited to 'stdlib')
-rw-r--r-- | stdlib/source/lux/concurrency/actor.lux | 338 | ||||
-rw-r--r-- | stdlib/test/test/lux/concurrency/actor.lux | 39 |
2 files changed, 232 insertions, 145 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux index 0b9c1032f..91e4f7a4a 100644 --- a/stdlib/source/lux/concurrency/actor.lux +++ b/stdlib/source/lux/concurrency/actor.lux @@ -7,7 +7,7 @@ (data text/format (coll [list "L/" Monoid<List> Monad<List>]) [product]) - [macro #+ with-gensyms] + [macro #+ with-gensyms Monad<Lux>] (macro [code] ["s" syntax #+ syntax: Syntax] (syntax ["cs" common] @@ -41,14 +41,22 @@ (type: #export (Behavior s) {#;doc "An actor's behavior when messages are received."} - {#step (-> (Message s) s (Actor s) (T;Task s)) + {#handle (-> (Message s) s (Actor s) (T;Task s)) #end (-> Text s (P;Promise Unit))}) ## [Values] -(def: #export (spawn init behavior) +(def: #export (default-handle message state self) + (All [s] (-> (Message s) s (Actor s) (T;Task s))) + (message state self)) + +(def: #export (default-end cause state) + (All [s] (-> Text s (P;Promise Unit))) + (P/wrap [])) + +(def: #export (spawn behavior init) {#;doc "Given a behavior and initial state, spawns an actor and returns it."} - (All [s] (-> s (Behavior s) (IO (Actor s)))) - (io (let [[step end] behavior + (All [s] (-> (Behavior s) s (IO (Actor s)))) + (io (let [[handle end] behavior self (: (Actor ($ +0)) {#mailbox (stm;var (:! (Message ($ +0)) [])) #kill-switch (P;promise Unit) @@ -69,7 +77,7 @@ (#;Some [message messages']) (do P;Monad<Promise> [#let [_ (io;run (stm;write! messages' |mailbox|))] - ?state' (step message state self)] + ?state' (handle message state self)] (case ?state' (#;Left error) (do @ @@ -125,137 +133,209 @@ actor)) ## [Syntax] -(type: Method - {#name Text - #vars (List Text) - #args (List [Text Code]) - #state Text - #self Text - #return Code - #body Code}) - -(def: method^ - (Syntax Method) - (s;form (do p;Monad<Parser> - [vars (p;default (list) (s;tuple (p;some s;local-symbol))) - [name args state self] (s;form ($_ p;seq - s;local-symbol - (p;some csr;typed-input) - s;local-symbol - s;local-symbol - )) - return s;any - body s;any] - (wrap {#name name - #vars vars - #args args - #state state - #self self - #return return - #body body})))) - -(def: stop^ - (Syntax [[Text Text] Code]) - (s;form (p;seq (s;tuple (p;seq s;local-symbol - s;local-symbol)) - s;any))) +(do-template [<with> <resolve> <tag> <desc>] + [(def: (<with> name) + (-> Ident cs;Annotations cs;Annotations) + (|>. (#;Cons [(ident-for <tag>) + (code;tag name)]))) + + (def: (<resolve> name) + (-> Ident (Lux Ident)) + (do Monad<Lux> + [name (macro;normalize name) + [_ annotations _] (macro;find-def name)] + (case (macro;get-ident-ann (ident-for <tag>) annotations) + (#;Some actor-name) + (wrap actor-name) + + _ + (macro;fail (format "Definition is not " <desc> ".")))))] + + [with-actor resolve-actor #;;actor "an actor"] + [with-message resolve-message #;;message "a message"] + ) (def: actor-decl^ - (Syntax [(List Text) Text (List [Text Code])]) - (p;seq (p;default (list) (s;tuple (p;some s;local-symbol))) - (p;either (s;form (p;seq s;local-symbol (p;many csr;typed-input))) - (p;seq s;local-symbol (:: p;Monad<Parser> wrap (list)))))) + (Syntax [Text (List Text)]) + (p;either (s;form (p;seq s;local-symbol (p;some s;local-symbol))) + (p;seq s;local-symbol (:: p;Monad<Parser> wrap (list))))) + +(do-template [<name> <desc>] + [(def: <name> + (-> Text Text) + (|>. (format <desc> "@")))] + + [state-name "State"] + [behavior-name "Behavior"] + [new-name "new"] + ) + +(type: HandleC + [[Text Text Text] Code]) + +(type: StopC + [[Text Text] Code]) + +(type: BehaviorC + [(Maybe HandleC) (Maybe StopC)]) + +(def: behavior^ + (s;Syntax BehaviorC) + (let [handle-args ($_ p;seq s;local-symbol s;local-symbol s;local-symbol) + stop-args ($_ p;seq s;local-symbol s;local-symbol)] + (p;seq (p;opt (s;form (p;seq (s;form (p;after (s;this (' handle)) handle-args)) + s;any))) + (p;opt (s;form (p;seq (s;form (p;after (s;this (' stop)) stop-args)) + s;any)))))) (syntax: #export (actor: [export csr;export] - [[_vars _name _args] actor-decl^] + [[_name _vars] actor-decl^] + [annotations (p;default cs;empty-annotations csr;annotations)] state-type - [methods (p;some method^)] - [?stop (p;opt stop^)]) - {#;doc (doc "Defines an actor, with internal state and methods that can be called on it." - "A method can access the actor's state through the state parameter." - "A method can also access the actor itself through the self parameter." - "A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type." - "A method's output must be a promise containing a 2-tuple with the updated state and a return value." - "All methods are run implicitly within the Promise monad." - + [[?handle ?stop] behavior^]) + {#;doc (doc "Defines an actor, with its behavior and internal state." (actor: #export Counter - Int + Nat - ((count! [increment Int] state self) - [Int Int] - (if (i.>= 0 increment) - (let [state' (i.+ increment state)] - (T;return [state' [state state']])) - (T;fail "Cannot add negative numbers!"))) - - ([cause state] + ((stop cause state) (:: P;Monad<Promise> wrap (log! (if (ex;match? ;;Killed cause) - (format "Counter was killed: " (%i state)) - cause)))) - ))} + (format "Counter was killed: " (%n state)) + cause))))) + + (actor: #export (Stack a) + (List a) + + ((handle message state self) + (do T;Monad<Task> + [#let [_ (log! "BEFORE")] + output (message state self) + #let [_ (log! "AFTER")]] + (wrap output)))))} (with-gensyms [g!message g!self g!state g!init g!error g!return g!output] - (let [g!state-type (code;symbol ["" (format _name "//Actor:State")]) - g!behavior (code;symbol ["" (format _name "//Actor:Behavior")]) - g!actor (code;symbol ["" _name]) - g!methods (L/map (: (-> Method Code) - (function [(^slots [#name #vars #args #state #self #return #body])] - (let [g!method (code;symbol ["" name]) - g!vars (L/map (|>. [""] code;symbol) vars) - g!var-refs (: (List Code) - (if (list;empty? vars) - (list) - (|> vars list;size n.dec - (list;n.range +0) (L/map (|>. code;nat (~) ($) (`)))))) - g!args-names (L/map (|>. product;left [""] code;symbol) args) - g!arg-types (L/map product;right args) - g!state (code;symbol ["" state]) - g!self (code;symbol ["" self])] - (` (def: (~@ (csw;export export)) ((~ g!method) (~@ g!args-names) (~ g!self)) - (All [(~@ g!vars)] - (-> (~@ g!arg-types) (~ g!actor) (T;Task (~ return)))) - (let [(~ g!output) (T;task (~ return))] - (exec (;;send (function [(~ g!state) (~ g!self)] - (do P;Monad<Promise> - [(~ g!return) (: (T;Task [((~ g!state-type) (~@ g!var-refs)) - (~ return)]) - (~ body))] - (case (~ g!return) - (#;Right [(~ g!state) (~ g!return)]) - (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!output))) - (T;return (~ g!state))) - - (#;Left (~ g!error)) - (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!output))) - (T;fail (~ g!error)))))) - (~ g!self)) - (~ g!output)))))))) - methods) - g!new (code;symbol ["" (format "new-" _name)]) - g!vars (L/map (|>. [""] code;symbol) _vars)] - (wrap (list& (` (type: (~@ (csw;export export)) ((~ g!state-type) (~@ g!vars)) - (~ state-type))) - (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars)) - (;;Actor ((~ g!state-type) (~@ g!vars))))) - (` (def: (~@ (csw;export export)) (~ g!behavior) - (All [(~@ g!vars)] - (Behavior ((~ g!state-type) (~@ g!vars)))) - {#step (function [(~' message) (~' state) (~' self)] - ((~' message) (~' state) (~' self))) - #end (~ (case ?stop - (#;Some [[cause state] body]) - (let [g!cause (code;symbol ["" cause]) - g!state (code;symbol ["" state])] - (` (function [(~ g!cause) (~ g!state)] - (do P;Monad<Promise> - [] - (~ body))))) - - #;None - (` (:: P;Monad<Promise> (~' wrap) []))))})) - (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init)) - (All [(~@ g!vars)] - (-> ((~ g!state-type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars))))) - (;;spawn (~ g!init) (~ g!behavior)))) - g!methods)) + (do @ + [module macro;current-module-name + #let [g!type (code;local-symbol (state-name _name)) + g!behavior (code;local-symbol (behavior-name _name)) + g!actor (code;local-symbol _name) + g!new (code;local-symbol (new-name _name)) + g!vars (L/map code;local-symbol _vars)]] + (wrap (list (` (type: (~@ (csw;export export)) ((~ g!type) (~@ g!vars)) + (~ state-type))) + (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars)) + (~ (|> annotations + (with-actor [module _name]) + csw;annotations)) + (;;Actor ((~ g!type) (~@ g!vars))))) + (` (def: (~@ (csw;export export)) (~ g!behavior) + (All [(~@ g!vars)] + (;;Behavior ((~ g!type) (~@ g!vars)))) + {#;;handle (~ (case ?handle + #;None + (` ;;default-handle) + + (#;Some [[messageN stateN selfN] bodyC]) + (` (function [(~ (code;local-symbol messageN)) + (~ (code;local-symbol stateN)) + (~ (code;local-symbol selfN))] + (do T;Monad<Task> + [] + (~ bodyC)))))) + #;;end (~ (case ?stop + #;None + (` ;;default-handle) + + (#;Some [[causeN stateN] bodyC]) + (` (function [(~ (code;local-symbol causeN)) + (~ (code;local-symbol stateN))] + (do P;Monad<Promise> + [] + (~ bodyC))))))})) + (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init)) + (All [(~@ g!vars)] + (-> ((~ g!type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars))))) + (;;spawn (~ g!behavior) (~ g!init)))))) + ))) + +(type: Signature + {#vars (List Text) + #name Text + #inputs (List [Text Code]) + #state Text + #self Text + #output Code}) + +(def: signature^ + (s;Syntax Signature) + (s;form ($_ p;seq + (p;default (list) (s;tuple (p;some s;local-symbol))) + s;local-symbol + (p;some csr;typed-input) + s;local-symbol + s;local-symbol + s;any))) + +(def: reference^ + (s;Syntax [Ident (List Text)]) + (p;either (s;form (p;seq s;symbol (p;some s;local-symbol))) + (p;seq s;symbol (:: p;Monad<Parser> wrap (list))))) + +(syntax: #export (message: [export csr;export] [[actor-name actor-vars] reference^] + [signature signature^] + [annotations (p;default cs;empty-annotations csr;annotations)] + body) + {#;doc (doc "A message can access the actor's state through the state parameter." + "A message can also access the actor itself through the self parameter." + "A message's output must be a task containing a 2-tuple with the updated state and a return value." + "A message may succeed or fail (in case of failure, the actor dies)." + + (message: #export Counter + (count! [increment Nat] state self Nat) + (let [state' (n.+ increment state)] + (T;return [state' state']))) + + (message: #export (Stack a) + (push [value a] state self (List a)) + (let [state' (#;Cons value state)] + (T;return [state' state']))))} + (with-gensyms [g!return g!error g!task] + (do @ + [actor-name (resolve-actor actor-name) + #let [g!type (code;symbol (product;both id state-name actor-name)) + g!message (code;local-symbol (get@ #name signature)) + g!refs (: (List Code) + (if (list;empty? actor-vars) + (list) + (|> actor-vars list;size n.dec + (list;n.range +0) (L/map (|>. code;nat (~) ($) (`)))))) + g!actor (code;symbol actor-name) + g!tvars (|> (get@ #vars signature) (L/append actor-vars) (L/map code;local-symbol)) + g!inputsC (|> (get@ #inputs signature) (L/map (|>. product;left code;local-symbol))) + g!inputsT (|> (get@ #inputs signature) (L/map product;right)) + g!outputT (get@ #output signature) + g!state (|> signature (get@ #state) code;local-symbol) + g!self (|> signature (get@ #self) code;local-symbol)]] + (wrap (list (` (def: (~@ (csw;export export)) ((~ g!message) (~@ g!inputsC) (~ g!self)) + (~ (|> annotations + (with-message actor-name) + csw;annotations)) + (All [(~@ g!tvars)] (-> (~@ g!inputsT) (~ g!actor) (T;Task (~ g!outputT)))) + (let [(~ g!task) (T;task (~ g!outputT))] + (exec (;;send (function [(~ g!state) (~ g!self)] + (do P;Monad<Promise> + [(~ g!return) (: (T;Task [((~ g!type) (~@ g!refs)) + (~ g!outputT)]) + (~ body))] + (case (~ g!return) + (#;Right [(~ g!state) (~ g!return)]) + (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task))) + (T;return (~ g!state))) + + (#;Left (~ g!error)) + (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task))) + (T;fail (~ g!error)))) + )) + (~ g!self)) + (~ g!task))))) + )) ))) diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux index 8ec792baf..a0e51cb09 100644 --- a/stdlib/test/test/lux/concurrency/actor.lux +++ b/stdlib/test/test/lux/concurrency/actor.lux @@ -8,46 +8,53 @@ ["R" result]) (concurrency ["P" promise "P/" Monad<Promise>] ["T" task] - ["&" actor #+ actor:])) + ["&" actor #+ actor: message:])) lux/test) (actor: Counter Nat - ((count! state self) - Nat - (let [state' (n.inc state)] - (T;return [state' state']))) + ((handle message state self) + (do T;Monad<Task> + [#let [_ (log! "BEFORE")] + output (message state self) + #let [_ (log! "AFTER")]] + (wrap output))) - ([cause state] + ((stop cause state) (P/wrap (log! (if (ex;match? &;Killed cause) (format "Counter was killed: " (%n state)) cause))))) +(message: #export Counter + (count! [increment Nat] state self Nat) + (let [state' (n.+ increment state)] + (T;return [state' state']))) + (context: "Actors" ($_ seq - (test "Can check where an actor is alive." + (test "Can check if an actor is alive." (io;run (do Monad<IO> - [counter (new-Counter +0)] + [counter (new@Counter +0)] (wrap (&;alive? counter))))) (test "Can kill actors." (io;run (do Monad<IO> - [counter (new-Counter +0) + [counter (new@Counter +0) killed? (&;kill counter)] (wrap (and killed? (not (&;alive? counter))))))) (test "Can poison actors." (io;run (do Monad<IO> - [counter (new-Counter +0) + [counter (new@Counter +0) poisoned? (&;poison counter)] (wrap (and poisoned? (not (&;alive? counter))))))) (test "Cannot kill an already dead actor." (io;run (do Monad<IO> - [counter (new-Counter +0) + [counter (new@Counter +0) first-time (&;kill counter) second-time (&;kill counter)] (wrap (and first-time @@ -55,7 +62,7 @@ (test "Cannot poison an already dead actor." (io;run (do Monad<IO> - [counter (new-Counter +0) + [counter (new@Counter +0) first-time (&;kill counter) second-time (&;poison counter)] (wrap (and first-time @@ -63,10 +70,10 @@ (do P;Monad<Promise> [result (do T;Monad<Task> - [#let [counter (io;run (new-Counter +0))] - output-1 (count! counter) - output-2 (count! counter) - output-3 (count! counter)] + [#let [counter (io;run (new@Counter +0))] + output-1 (count! +1 counter) + output-2 (count! +1 counter) + output-3 (count! +1 counter)] (wrap (and (n.= +1 output-1) (n.= +2 output-2) (n.= +3 output-3))))] |