aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/actor.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/actor.lux')
-rw-r--r--stdlib/source/library/lux/control/concurrency/actor.lux239
1 files changed, 89 insertions, 150 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux
index 1557a9f89..e51be0b98 100644
--- a/stdlib/source/library/lux/control/concurrency/actor.lux
+++ b/stdlib/source/library/lux/control/concurrency/actor.lux
@@ -1,5 +1,4 @@
(.module:
- {#.doc "The actor model of concurrency."}
[library
[lux #*
["." debug]
@@ -64,27 +63,23 @@
(in #.End))))
(abstract: .public (Actor s)
- {#.doc (example "An entity that can react to messages (mail) sent to it concurrently.")}
+ {}
{#obituary [(Async <Obituary>)
(Resolver <Obituary>)]
#mailbox (Atom <Mailbox>)}
(type: .public (Mail s)
- {#.doc (example "A one-way message sent to an actor, without expecting a reply.")}
<Mail>)
(type: .public (Obituary s)
- {#.doc (example "Details on the death of an actor.")}
<Obituary>)
(type: .public (Behavior o s)
- {#.doc (example "An actor's behavior when mail is received and when a fatal error occurs.")}
{#on_init (-> o s)
#on_mail (-> (Mail s) s (Actor s) (Async (Try s)))})
(def: .public (spawn! behavior init)
- {#.doc (example "Given a behavior and initial state, spawns an actor and returns it.")}
(All [o s] (-> (Behavior o s) o (IO (Actor s))))
(io (let [[on_init on_mail] behavior
self (:sharing [o s]
@@ -130,14 +125,12 @@
(async.value obituary)))
(def: .public obituary
- {#.doc (example "Await for an actor to stop working.")}
(All [s] (-> (Actor s) (Async (Obituary s))))
(|>> :representation
(get@ #obituary)
product.left))
(def: .public (mail! mail actor)
- {#.doc (example "Send mail to an actor.")}
(All [s] (-> (Mail s) (Actor s) (IO (Try Any))))
(do {! io.monad}
[alive? (..alive? actor)]
@@ -163,7 +156,6 @@
(in (exception.except ..dead [])))))
(type: .public (Message s o)
- {#.doc (example "A two-way message sent to an actor, expecting a reply.")}
(-> s (Actor s) (Async (Try [s o]))))
(def: (mail message)
@@ -191,7 +183,6 @@
(async.resolved (#try.Failure error))))))]))
(def: .public (tell! message actor)
- {#.doc (example "Communicate with an actor through message-passing.")}
(All [s o] (-> (Message s o) (Actor s) (Async (Try o))))
(let [[async mail] (..mail message)]
(do async.monad
@@ -210,14 +201,11 @@
(mail state self))
(def: .public default
- {#.doc (example "Default actor behavior.")}
(All [s] (Behavior s s))
{#on_init function.identity
#on_mail ..default_on_mail})
(def: .public (poison! actor)
- {#.doc (example "Kills the actor by sending mail that will kill it upon processing,"
- "but allows the actor to handle previous mail.")}
(All [s] (-> (Actor s) (IO (Try Any))))
(..mail! (function (_ state self)
(async.resolved (exception.except ..poisoned [])))
@@ -270,151 +258,102 @@
(<>.and <code>.any private)
(<>.and (<>\in (` .private)) private))))
-(with_expansions [<examples> (as_is (actor: .public (stack a)
- {}
-
- (List a)
-
- ((on_mail mail state self)
- (do (try.with async.monad)
- [.let [_ (debug.log! "BEFORE")]
- output (mail state self)
- .let [_ (debug.log! "AFTER")]]
- (in output)))
-
- (message: .public (push {value a} state self)
- (List a)
- (let [state' (#.Item value state)]
- (async.resolved (#try.Success [state' state'])))))
-
- (actor: .public counter
- {}
-
- Nat
-
- (message: .public (count! {increment Nat} state self)
- Any
- (let [state' (n.+ increment state)]
- (async.resolved (#try.Success [state' state']))))
-
- (message: .public (read! state self)
- Nat
- (async.resolved (#try.Success [state state])))))]
- (syntax: .public (actor: [[export_policy [name vars] annotations state_type [?on_mail messages]] ..actorP])
- {#.doc (example "Defines a named actor, with its behavior and internal state."
- "Messages for the actor must be defined after the on_mail handler."
- <examples>)}
- (with_identifiers [g!_]
- (do meta.monad
- [g!type (macro.identifier (format name "_abstract_type"))
- .let [g!actor (code.local_identifier name)
- g!vars (list\map code.local_identifier vars)]]
- (in (list (` ((~! abstract:) (~ export_policy) ((~ g!type) (~+ g!vars))
- {}
-
- (~ state_type)
-
- (def: (~ export_policy) (~ g!actor)
- (All [(~+ g!vars)]
- (..Behavior (~ state_type) ((~ g!type) (~+ g!vars))))
- {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type)))
- #..on_mail (~ (..on_mail g!_ ?on_mail))})
-
- (~+ messages))))))))
-
- (syntax: .public (actor [[state_type init] (<code>.record (<>.and <code>.any <code>.any))
- [?on_mail messages] behavior^])
- {#.doc (example "Defines an anonymous actor, with its behavior and internal state."
- "Messages for the actor must be defined after the on_mail handler."
- (actor {Nat
- 123}
- ((on_mail message state self)
- (message (inc state) self))))}
- (with_identifiers [g!_]
- (in (list (` (: ((~! io.IO) (..Actor (~ state_type)))
- (..spawn! (: (..Behavior (~ state_type) (~ state_type))
- {#..on_init (|>>)
- #..on_mail (~ (..on_mail g!_ ?on_mail))})
- (: (~ state_type)
- (~ init)))))))))
-
- (type: Signature
- {#vars (List Text)
- #name Text
- #inputs (List |input|.Input)
- #state Text
- #self Text})
-
- (def: signature^
- (Parser Signature)
- (<code>.form ($_ <>.and
- (<>.else (list) (<code>.tuple (<>.some <code>.local_identifier)))
- <code>.local_identifier
- (<>.some |input|.parser)
- <code>.local_identifier
- <code>.local_identifier)))
-
- (def: reference^
- (Parser [Name (List Text)])
- (<>.either (<code>.form (<>.and <code>.identifier (<>.some <code>.local_identifier)))
- (<>.and <code>.identifier (\ <>.monad in (list)))))
-
- (def: messageP
- (Parser [Code Signature |annotations|.Annotations Code Code])
- (let [private ($_ <>.and
- ..signature^
- (<>.else |annotations|.empty |annotations|.parser)
- <code>.any
- <code>.any)]
- ($_ <>.either
- (<>.and <code>.any private)
- (<>.and (<>\in (` .private)) private))))
-
- (syntax: .public (message: [[export_policy signature annotations output_type body] ..messageP])
- {#.doc (example "A message can access the actor's state through the state parameter."
- "A message can also access the actor itself through the self parameter."
- "A message's output must be an async containing a 2-tuple with the updated state and a return value."
- "A message may succeed or fail (in case of failure, the actor dies)."
-
- <examples>)}
- (with_identifiers [g!_ g!return]
- (do meta.monad
- [actor_scope abstract.current
- .let [g!type (code.local_identifier (get@ #abstract.name actor_scope))
- g!message (code.local_identifier (get@ #name signature))
- g!actor_vars (get@ #abstract.type_vars actor_scope)
- g!all_vars (|> signature (get@ #vars) (list\map code.local_identifier) (list\compose g!actor_vars))
- g!inputsC (|> signature (get@ #inputs) (list\map product.left))
- g!inputsT (|> signature (get@ #inputs) (list\map product.right))
- g!state (|> signature (get@ #state) code.local_identifier)
- g!self (|> signature (get@ #self) code.local_identifier)]]
- (in (list (` (def: (~ export_policy) ((~ g!message) (~+ g!inputsC))
- (~ (|annotations|.format annotations))
- (All [(~+ g!all_vars)]
- (-> (~+ g!inputsT)
- (..Message (~ (get@ #abstract.abstraction actor_scope))
- (~ output_type))))
- (function ((~ g!_) (~ g!state) (~ g!self))
- (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope))
- (~ g!state))]
- (|> (~ body)
- (: ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))
- (~ output_type)])))
- (:as ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))
- (~ output_type)]))))))))
- ))))))
+(syntax: .public (actor: [[export_policy [name vars] annotations state_type [?on_mail messages]] ..actorP])
+ (with_identifiers [g!_]
+ (do meta.monad
+ [g!type (macro.identifier (format name "_abstract_type"))
+ .let [g!actor (code.local_identifier name)
+ g!vars (list\map code.local_identifier vars)]]
+ (in (list (` ((~! abstract:) (~ export_policy) ((~ g!type) (~+ g!vars))
+ {}
+
+ (~ state_type)
+
+ (def: (~ export_policy) (~ g!actor)
+ (All [(~+ g!vars)]
+ (..Behavior (~ state_type) ((~ g!type) (~+ g!vars))))
+ {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type)))
+ #..on_mail (~ (..on_mail g!_ ?on_mail))})
+
+ (~+ messages))))))))
+
+(syntax: .public (actor [[state_type init] (<code>.record (<>.and <code>.any <code>.any))
+ [?on_mail messages] behavior^])
+ (with_identifiers [g!_]
+ (in (list (` (: ((~! io.IO) (..Actor (~ state_type)))
+ (..spawn! (: (..Behavior (~ state_type) (~ state_type))
+ {#..on_init (|>>)
+ #..on_mail (~ (..on_mail g!_ ?on_mail))})
+ (: (~ state_type)
+ (~ init)))))))))
+
+(type: Signature
+ {#vars (List Text)
+ #name Text
+ #inputs (List |input|.Input)
+ #state Text
+ #self Text})
+
+(def: signature^
+ (Parser Signature)
+ (<code>.form ($_ <>.and
+ (<>.else (list) (<code>.tuple (<>.some <code>.local_identifier)))
+ <code>.local_identifier
+ (<>.some |input|.parser)
+ <code>.local_identifier
+ <code>.local_identifier)))
+
+(def: reference^
+ (Parser [Name (List Text)])
+ (<>.either (<code>.form (<>.and <code>.identifier (<>.some <code>.local_identifier)))
+ (<>.and <code>.identifier (\ <>.monad in (list)))))
+
+(def: messageP
+ (Parser [Code Signature |annotations|.Annotations Code Code])
+ (let [private ($_ <>.and
+ ..signature^
+ (<>.else |annotations|.empty |annotations|.parser)
+ <code>.any
+ <code>.any)]
+ ($_ <>.either
+ (<>.and <code>.any private)
+ (<>.and (<>\in (` .private)) private))))
+
+(syntax: .public (message: [[export_policy signature annotations output_type body] ..messageP])
+ (with_identifiers [g!_ g!return]
+ (do meta.monad
+ [actor_scope abstract.current
+ .let [g!type (code.local_identifier (get@ #abstract.name actor_scope))
+ g!message (code.local_identifier (get@ #name signature))
+ g!actor_vars (get@ #abstract.type_vars actor_scope)
+ g!all_vars (|> signature (get@ #vars) (list\map code.local_identifier) (list\compose g!actor_vars))
+ g!inputsC (|> signature (get@ #inputs) (list\map product.left))
+ g!inputsT (|> signature (get@ #inputs) (list\map product.right))
+ g!state (|> signature (get@ #state) code.local_identifier)
+ g!self (|> signature (get@ #self) code.local_identifier)]]
+ (in (list (` (def: (~ export_policy) ((~ g!message) (~+ g!inputsC))
+ (~ (|annotations|.format annotations))
+ (All [(~+ g!all_vars)]
+ (-> (~+ g!inputsT)
+ (..Message (~ (get@ #abstract.abstraction actor_scope))
+ (~ output_type))))
+ (function ((~ g!_) (~ g!state) (~ g!self))
+ (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope))
+ (~ g!state))]
+ (|> (~ body)
+ (: ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))
+ (~ output_type)])))
+ (:as ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))
+ (~ output_type)]))))))))
+ )))))
(type: .public Stop
- {#.doc (example "A signal to stop an actor from observing a channel.")}
(IO Any))
(def: continue! true)
(def: stop! false)
(def: .public (observe! action channel actor)
- {#.doc (example "Use an actor to observe a channel by transforming each datum"
- "flowing through the channel into mail the actor can process."
- "Can stop observing the channel by executing the Stop value.")}
(All [e s] (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any)))
(let [signal (: (Atom Bit)
(atom.atom ..continue!))