diff options
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/actor.lux')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/actor.lux | 239 |
1 files changed, 89 insertions, 150 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux index 1557a9f89..e51be0b98 100644 --- a/stdlib/source/library/lux/control/concurrency/actor.lux +++ b/stdlib/source/library/lux/control/concurrency/actor.lux @@ -1,5 +1,4 @@ (.module: - {#.doc "The actor model of concurrency."} [library [lux #* ["." debug] @@ -64,27 +63,23 @@ (in #.End)))) (abstract: .public (Actor s) - {#.doc (example "An entity that can react to messages (mail) sent to it concurrently.")} + {} {#obituary [(Async <Obituary>) (Resolver <Obituary>)] #mailbox (Atom <Mailbox>)} (type: .public (Mail s) - {#.doc (example "A one-way message sent to an actor, without expecting a reply.")} <Mail>) (type: .public (Obituary s) - {#.doc (example "Details on the death of an actor.")} <Obituary>) (type: .public (Behavior o s) - {#.doc (example "An actor's behavior when mail is received and when a fatal error occurs.")} {#on_init (-> o s) #on_mail (-> (Mail s) s (Actor s) (Async (Try s)))}) (def: .public (spawn! behavior init) - {#.doc (example "Given a behavior and initial state, spawns an actor and returns it.")} (All [o s] (-> (Behavior o s) o (IO (Actor s)))) (io (let [[on_init on_mail] behavior self (:sharing [o s] @@ -130,14 +125,12 @@ (async.value obituary))) (def: .public obituary - {#.doc (example "Await for an actor to stop working.")} (All [s] (-> (Actor s) (Async (Obituary s)))) (|>> :representation (get@ #obituary) product.left)) (def: .public (mail! mail actor) - {#.doc (example "Send mail to an actor.")} (All [s] (-> (Mail s) (Actor s) (IO (Try Any)))) (do {! io.monad} [alive? (..alive? actor)] @@ -163,7 +156,6 @@ (in (exception.except ..dead []))))) (type: .public (Message s o) - {#.doc (example "A two-way message sent to an actor, expecting a reply.")} (-> s (Actor s) (Async (Try [s o])))) (def: (mail message) @@ -191,7 +183,6 @@ (async.resolved (#try.Failure error))))))])) (def: .public (tell! message actor) - {#.doc (example "Communicate with an actor through message-passing.")} (All [s o] (-> (Message s o) (Actor s) (Async (Try o)))) (let [[async mail] (..mail message)] (do async.monad @@ -210,14 +201,11 @@ (mail state self)) (def: .public default - {#.doc (example "Default actor behavior.")} (All [s] (Behavior s s)) {#on_init function.identity #on_mail ..default_on_mail}) (def: .public (poison! actor) - {#.doc (example "Kills the actor by sending mail that will kill it upon processing," - "but allows the actor to handle previous mail.")} (All [s] (-> (Actor s) (IO (Try Any)))) (..mail! (function (_ state self) (async.resolved (exception.except ..poisoned []))) @@ -270,151 +258,102 @@ (<>.and <code>.any private) (<>.and (<>\in (` .private)) private)))) -(with_expansions [<examples> (as_is (actor: .public (stack a) - {} - - (List a) - - ((on_mail mail state self) - (do (try.with async.monad) - [.let [_ (debug.log! "BEFORE")] - output (mail state self) - .let [_ (debug.log! "AFTER")]] - (in output))) - - (message: .public (push {value a} state self) - (List a) - (let [state' (#.Item value state)] - (async.resolved (#try.Success [state' state']))))) - - (actor: .public counter - {} - - Nat - - (message: .public (count! {increment Nat} state self) - Any - (let [state' (n.+ increment state)] - (async.resolved (#try.Success [state' state'])))) - - (message: .public (read! state self) - Nat - (async.resolved (#try.Success [state state])))))] - (syntax: .public (actor: [[export_policy [name vars] annotations state_type [?on_mail messages]] ..actorP]) - {#.doc (example "Defines a named actor, with its behavior and internal state." - "Messages for the actor must be defined after the on_mail handler." - <examples>)} - (with_identifiers [g!_] - (do meta.monad - [g!type (macro.identifier (format name "_abstract_type")) - .let [g!actor (code.local_identifier name) - g!vars (list\map code.local_identifier vars)]] - (in (list (` ((~! abstract:) (~ export_policy) ((~ g!type) (~+ g!vars)) - {} - - (~ state_type) - - (def: (~ export_policy) (~ g!actor) - (All [(~+ g!vars)] - (..Behavior (~ state_type) ((~ g!type) (~+ g!vars)))) - {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type))) - #..on_mail (~ (..on_mail g!_ ?on_mail))}) - - (~+ messages)))))))) - - (syntax: .public (actor [[state_type init] (<code>.record (<>.and <code>.any <code>.any)) - [?on_mail messages] behavior^]) - {#.doc (example "Defines an anonymous actor, with its behavior and internal state." - "Messages for the actor must be defined after the on_mail handler." - (actor {Nat - 123} - ((on_mail message state self) - (message (inc state) self))))} - (with_identifiers [g!_] - (in (list (` (: ((~! io.IO) (..Actor (~ state_type))) - (..spawn! (: (..Behavior (~ state_type) (~ state_type)) - {#..on_init (|>>) - #..on_mail (~ (..on_mail g!_ ?on_mail))}) - (: (~ state_type) - (~ init))))))))) - - (type: Signature - {#vars (List Text) - #name Text - #inputs (List |input|.Input) - #state Text - #self Text}) - - (def: signature^ - (Parser Signature) - (<code>.form ($_ <>.and - (<>.else (list) (<code>.tuple (<>.some <code>.local_identifier))) - <code>.local_identifier - (<>.some |input|.parser) - <code>.local_identifier - <code>.local_identifier))) - - (def: reference^ - (Parser [Name (List Text)]) - (<>.either (<code>.form (<>.and <code>.identifier (<>.some <code>.local_identifier))) - (<>.and <code>.identifier (\ <>.monad in (list))))) - - (def: messageP - (Parser [Code Signature |annotations|.Annotations Code Code]) - (let [private ($_ <>.and - ..signature^ - (<>.else |annotations|.empty |annotations|.parser) - <code>.any - <code>.any)] - ($_ <>.either - (<>.and <code>.any private) - (<>.and (<>\in (` .private)) private)))) - - (syntax: .public (message: [[export_policy signature annotations output_type body] ..messageP]) - {#.doc (example "A message can access the actor's state through the state parameter." - "A message can also access the actor itself through the self parameter." - "A message's output must be an async containing a 2-tuple with the updated state and a return value." - "A message may succeed or fail (in case of failure, the actor dies)." - - <examples>)} - (with_identifiers [g!_ g!return] - (do meta.monad - [actor_scope abstract.current - .let [g!type (code.local_identifier (get@ #abstract.name actor_scope)) - g!message (code.local_identifier (get@ #name signature)) - g!actor_vars (get@ #abstract.type_vars actor_scope) - g!all_vars (|> signature (get@ #vars) (list\map code.local_identifier) (list\compose g!actor_vars)) - g!inputsC (|> signature (get@ #inputs) (list\map product.left)) - g!inputsT (|> signature (get@ #inputs) (list\map product.right)) - g!state (|> signature (get@ #state) code.local_identifier) - g!self (|> signature (get@ #self) code.local_identifier)]] - (in (list (` (def: (~ export_policy) ((~ g!message) (~+ g!inputsC)) - (~ (|annotations|.format annotations)) - (All [(~+ g!all_vars)] - (-> (~+ g!inputsT) - (..Message (~ (get@ #abstract.abstraction actor_scope)) - (~ output_type)))) - (function ((~ g!_) (~ g!state) (~ g!self)) - (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope)) - (~ g!state))] - (|> (~ body) - (: ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope)) - (~ output_type)]))) - (:as ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope)) - (~ output_type)])))))))) - )))))) +(syntax: .public (actor: [[export_policy [name vars] annotations state_type [?on_mail messages]] ..actorP]) + (with_identifiers [g!_] + (do meta.monad + [g!type (macro.identifier (format name "_abstract_type")) + .let [g!actor (code.local_identifier name) + g!vars (list\map code.local_identifier vars)]] + (in (list (` ((~! abstract:) (~ export_policy) ((~ g!type) (~+ g!vars)) + {} + + (~ state_type) + + (def: (~ export_policy) (~ g!actor) + (All [(~+ g!vars)] + (..Behavior (~ state_type) ((~ g!type) (~+ g!vars)))) + {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type))) + #..on_mail (~ (..on_mail g!_ ?on_mail))}) + + (~+ messages)))))))) + +(syntax: .public (actor [[state_type init] (<code>.record (<>.and <code>.any <code>.any)) + [?on_mail messages] behavior^]) + (with_identifiers [g!_] + (in (list (` (: ((~! io.IO) (..Actor (~ state_type))) + (..spawn! (: (..Behavior (~ state_type) (~ state_type)) + {#..on_init (|>>) + #..on_mail (~ (..on_mail g!_ ?on_mail))}) + (: (~ state_type) + (~ init))))))))) + +(type: Signature + {#vars (List Text) + #name Text + #inputs (List |input|.Input) + #state Text + #self Text}) + +(def: signature^ + (Parser Signature) + (<code>.form ($_ <>.and + (<>.else (list) (<code>.tuple (<>.some <code>.local_identifier))) + <code>.local_identifier + (<>.some |input|.parser) + <code>.local_identifier + <code>.local_identifier))) + +(def: reference^ + (Parser [Name (List Text)]) + (<>.either (<code>.form (<>.and <code>.identifier (<>.some <code>.local_identifier))) + (<>.and <code>.identifier (\ <>.monad in (list))))) + +(def: messageP + (Parser [Code Signature |annotations|.Annotations Code Code]) + (let [private ($_ <>.and + ..signature^ + (<>.else |annotations|.empty |annotations|.parser) + <code>.any + <code>.any)] + ($_ <>.either + (<>.and <code>.any private) + (<>.and (<>\in (` .private)) private)))) + +(syntax: .public (message: [[export_policy signature annotations output_type body] ..messageP]) + (with_identifiers [g!_ g!return] + (do meta.monad + [actor_scope abstract.current + .let [g!type (code.local_identifier (get@ #abstract.name actor_scope)) + g!message (code.local_identifier (get@ #name signature)) + g!actor_vars (get@ #abstract.type_vars actor_scope) + g!all_vars (|> signature (get@ #vars) (list\map code.local_identifier) (list\compose g!actor_vars)) + g!inputsC (|> signature (get@ #inputs) (list\map product.left)) + g!inputsT (|> signature (get@ #inputs) (list\map product.right)) + g!state (|> signature (get@ #state) code.local_identifier) + g!self (|> signature (get@ #self) code.local_identifier)]] + (in (list (` (def: (~ export_policy) ((~ g!message) (~+ g!inputsC)) + (~ (|annotations|.format annotations)) + (All [(~+ g!all_vars)] + (-> (~+ g!inputsT) + (..Message (~ (get@ #abstract.abstraction actor_scope)) + (~ output_type)))) + (function ((~ g!_) (~ g!state) (~ g!self)) + (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope)) + (~ g!state))] + (|> (~ body) + (: ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope)) + (~ output_type)]))) + (:as ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope)) + (~ output_type)])))))))) + ))))) (type: .public Stop - {#.doc (example "A signal to stop an actor from observing a channel.")} (IO Any)) (def: continue! true) (def: stop! false) (def: .public (observe! action channel actor) - {#.doc (example "Use an actor to observe a channel by transforming each datum" - "flowing through the channel into mail the actor can process." - "Can stop observing the channel by executing the Stop value.")} (All [e s] (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any))) (let [signal (: (Atom Bit) (atom.atom ..continue!)) |