aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/actor.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/actor.lux')
-rw-r--r--stdlib/source/library/lux/control/concurrency/actor.lux91
1 files changed, 46 insertions, 45 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux
index 355a7885e..b2b619735 100644
--- a/stdlib/source/library/lux/control/concurrency/actor.lux
+++ b/stdlib/source/library/lux/control/concurrency/actor.lux
@@ -1,4 +1,5 @@
-(.module: {#.doc "The actor model of concurrency."}
+(.module:
+ {#.doc "The actor model of concurrency."}
[library
[lux #*
["." debug]
@@ -34,27 +35,27 @@
["." abstract (#+ abstract: :representation :abstraction)]]]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver) ("#\." monad)]
+ ["." async (#+ Async Resolver) ("#\." monad)]
["." frp (#+ Channel)]])
(exception: #export poisoned)
(exception: #export dead)
(with_expansions
- [<Mail> (as_is (-> s (Actor s) (Promise (Try s))))
+ [<Mail> (as_is (-> s (Actor s) (Async (Try s))))
<Obituary> (as_is [Text s (List <Mail>)])
<Mailbox> (as_is (Rec Mailbox
- [(Promise [<Mail> Mailbox])
+ [(Async [<Mail> Mailbox])
(Resolver [<Mail> Mailbox])]))]
(def: (pending [read write])
(All [a]
(-> (Rec Mailbox
- [(Promise [a Mailbox])
+ [(Async [a Mailbox])
(Resolver [a Mailbox])])
(IO (List a))))
(do {! io.monad}
- [current (promise.poll read)]
+ [current (async.poll read)]
(case current
(#.Some [head tail])
(\ ! map (|>> (#.Cons head))
@@ -64,7 +65,7 @@
(wrap #.Nil))))
(abstract: #export (Actor s)
- {#obituary [(Promise <Obituary>)
+ {#obituary [(Async <Obituary>)
(Resolver <Obituary>)]
#mailbox (Atom <Mailbox>)}
@@ -81,7 +82,7 @@
(type: #export (Behavior o s)
{#.doc (doc "An actor's behavior when mail is received and when a fatal error occurs.")}
{#on_init (-> o s)
- #on_mail (-> (Mail s) s (Actor s) (Promise (Try s)))})
+ #on_mail (-> (Mail s) s (Actor s) (Async (Try s)))})
(def: #export (spawn! behavior init)
{#.doc (doc "Given a behavior and initial state, spawns an actor and returns it.")}
@@ -92,11 +93,11 @@
behavior
(Actor s)
- (:abstraction {#obituary (promise.promise [])
- #mailbox (atom (promise.promise []))}))
+ (:abstraction {#obituary (async.async [])
+ #mailbox (atom (async.async []))}))
process (loop [state (on_init init)
[|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]
- (do {! promise.monad}
+ (do {! async.monad}
[[head tail] |mailbox|
?state' (on_mail head state self)]
(case ?state'
@@ -116,7 +117,7 @@
(All [s] (-> (Actor s) (IO Bit)))
(let [[obituary _] (get@ #obituary (:representation actor))]
(|> obituary
- promise.poll
+ async.poll
(\ io.functor map
(|>> (case> #.None
bit.yes
@@ -127,11 +128,11 @@
(def: #export (obituary actor)
(All [s] (-> (Actor s) (IO (Maybe (Obituary s)))))
(let [[obituary _] (get@ #obituary (:representation actor))]
- (promise.poll obituary)))
+ (async.poll obituary)))
(def: #export await
{#.doc (doc "Await for an actor to end working.")}
- (All [s] (-> (Actor s) (Promise (Obituary s))))
+ (All [s] (-> (Actor s) (Async (Obituary s))))
(|>> :representation
(get@ #obituary)
product.left))
@@ -142,12 +143,12 @@
(do {! io.monad}
[alive? (..alive? actor)]
(if alive?
- (let [entry [mail (promise.promise [])]]
+ (let [entry [mail (async.async [])]]
(do !
[|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))]
(loop [[|mailbox| resolve] |mailbox|&resolve]
(do !
- [|mailbox| (promise.poll |mailbox|)]
+ [|mailbox| (async.poll |mailbox|)]
(case |mailbox|
#.None
(do !
@@ -164,39 +165,39 @@
(type: #export (Message s o)
{#.doc (doc "A two-way message sent to an actor, expecting a reply.")}
- (-> s (Actor s) (Promise (Try [s o]))))
+ (-> s (Actor s) (Async (Try [s o]))))
(def: (mail message)
- (All [s o] (-> (Message s o) [(Promise (Try o)) (Mail s)]))
- (let [[promise resolve] (:sharing [s o]
- (Message s o)
- message
-
- [(Promise (Try o))
- (Resolver (Try o))]
- (promise.promise []))]
- [promise
+ (All [s o] (-> (Message s o) [(Async (Try o)) (Mail s)]))
+ (let [[async resolve] (:sharing [s o]
+ (Message s o)
+ message
+
+ [(Async (Try o))
+ (Resolver (Try o))]
+ (async.async []))]
+ [async
(function (_ state self)
- (do {! promise.monad}
+ (do {! async.monad}
[outcome (message state self)]
(case outcome
(#try.Success [state' return])
(exec (io.run (resolve (#try.Success return)))
- (promise.resolved (#try.Success state')))
+ (async.resolved (#try.Success state')))
(#try.Failure error)
(exec (io.run (resolve (#try.Failure error)))
- (promise.resolved (#try.Failure error))))))]))
+ (async.resolved (#try.Failure error))))))]))
(def: #export (tell! message actor)
{#.doc (doc "Communicate with an actor through message-passing.")}
- (All [s o] (-> (Message s o) (Actor s) (Promise (Try o))))
- (let [[promise mail] (..mail message)]
- (do promise.monad
- [outcome (promise.future (..mail! mail actor))]
+ (All [s o] (-> (Message s o) (Actor s) (Async (Try o))))
+ (let [[async mail] (..mail message)]
+ (do async.monad
+ [outcome (async.future (..mail! mail actor))]
(case outcome
(#try.Success)
- promise
+ async
(#try.Failure error)
(wrap (#try.Failure error))))))
@@ -204,7 +205,7 @@
)
(def: (default_on_mail mail state self)
- (All [s] (-> (Mail s) s (Actor s) (Promise (Try s))))
+ (All [s] (-> (Mail s) s (Actor s) (Async (Try s))))
(mail state self))
(def: #export default
@@ -218,7 +219,7 @@
"but allows the actor to handle previous mail.")}
(All [s] (-> (Actor s) (IO (Try Any))))
(..mail! (function (_ state self)
- (promise.resolved (exception.throw ..poisoned [])))
+ (async.resolved (exception.throw ..poisoned [])))
actor))
(def: actor_decl^
@@ -261,7 +262,7 @@
(List a)
((on_mail mail state self)
- (do (try.with promise.monad)
+ (do (try.with async.monad)
[#let [_ (debug.log! "BEFORE")]
output (mail state self)
#let [_ (debug.log! "AFTER")]]
@@ -270,7 +271,7 @@
(message: #export (push {value a} state self)
(List a)
(let [state' (#.Cons value state)]
- (promise.resolved (#try.Success [state' state'])))))
+ (async.resolved (#try.Success [state' state'])))))
(actor: #export Counter
Nat
@@ -278,11 +279,11 @@
(message: #export (count! {increment Nat} state self)
Any
(let [state' (n.+ increment state)]
- (promise.resolved (#try.Success [state' state']))))
+ (async.resolved (#try.Success [state' state']))))
(message: #export (read! state self)
Nat
- (promise.resolved (#try.Success [state state])))))]
+ (async.resolved (#try.Success [state state])))))]
(syntax: #export (actor:
{export |export|.parser}
{[name vars] actor_decl^}
@@ -353,7 +354,7 @@
body)
{#.doc (doc "A message can access the actor's state through the state parameter."
"A message can also access the actor itself through the self parameter."
- "A message's output must be a promise containing a 2-tuple with the updated state and a return value."
+ "A message's output must be an async containing a 2-tuple with the updated state and a return value."
"A message may succeed or fail (in case of failure, the actor dies)."
<examples>)}
@@ -378,10 +379,10 @@
(let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope))
(~ g!state))]
(|> (~ body)
- (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))
- (~ output_type)])))
- (:as ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))
- (~ output_type)]))))))))
+ (: ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))
+ (~ output_type)])))
+ (:as ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))
+ (~ output_type)]))))))))
))))))
(type: #export Stop