diff options
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/actor.lux')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/actor.lux | 91 |
1 files changed, 46 insertions, 45 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux index 355a7885e..b2b619735 100644 --- a/stdlib/source/library/lux/control/concurrency/actor.lux +++ b/stdlib/source/library/lux/control/concurrency/actor.lux @@ -1,4 +1,5 @@ -(.module: {#.doc "The actor model of concurrency."} +(.module: + {#.doc "The actor model of concurrency."} [library [lux #* ["." debug] @@ -34,27 +35,27 @@ ["." abstract (#+ abstract: :representation :abstraction)]]]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver) ("#\." monad)] + ["." async (#+ Async Resolver) ("#\." monad)] ["." frp (#+ Channel)]]) (exception: #export poisoned) (exception: #export dead) (with_expansions - [<Mail> (as_is (-> s (Actor s) (Promise (Try s)))) + [<Mail> (as_is (-> s (Actor s) (Async (Try s)))) <Obituary> (as_is [Text s (List <Mail>)]) <Mailbox> (as_is (Rec Mailbox - [(Promise [<Mail> Mailbox]) + [(Async [<Mail> Mailbox]) (Resolver [<Mail> Mailbox])]))] (def: (pending [read write]) (All [a] (-> (Rec Mailbox - [(Promise [a Mailbox]) + [(Async [a Mailbox]) (Resolver [a Mailbox])]) (IO (List a)))) (do {! io.monad} - [current (promise.poll read)] + [current (async.poll read)] (case current (#.Some [head tail]) (\ ! map (|>> (#.Cons head)) @@ -64,7 +65,7 @@ (wrap #.Nil)))) (abstract: #export (Actor s) - {#obituary [(Promise <Obituary>) + {#obituary [(Async <Obituary>) (Resolver <Obituary>)] #mailbox (Atom <Mailbox>)} @@ -81,7 +82,7 @@ (type: #export (Behavior o s) {#.doc (doc "An actor's behavior when mail is received and when a fatal error occurs.")} {#on_init (-> o s) - #on_mail (-> (Mail s) s (Actor s) (Promise (Try s)))}) + #on_mail (-> (Mail s) s (Actor s) (Async (Try s)))}) (def: #export (spawn! behavior init) {#.doc (doc "Given a behavior and initial state, spawns an actor and returns it.")} @@ -92,11 +93,11 @@ behavior (Actor s) - (:abstraction {#obituary (promise.promise []) - #mailbox (atom (promise.promise []))})) + (:abstraction {#obituary (async.async []) + #mailbox (atom (async.async []))})) process (loop [state (on_init init) [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))] - (do {! promise.monad} + (do {! async.monad} [[head tail] |mailbox| ?state' (on_mail head state self)] (case ?state' @@ -116,7 +117,7 @@ (All [s] (-> (Actor s) (IO Bit))) (let [[obituary _] (get@ #obituary (:representation actor))] (|> obituary - promise.poll + async.poll (\ io.functor map (|>> (case> #.None bit.yes @@ -127,11 +128,11 @@ (def: #export (obituary actor) (All [s] (-> (Actor s) (IO (Maybe (Obituary s))))) (let [[obituary _] (get@ #obituary (:representation actor))] - (promise.poll obituary))) + (async.poll obituary))) (def: #export await {#.doc (doc "Await for an actor to end working.")} - (All [s] (-> (Actor s) (Promise (Obituary s)))) + (All [s] (-> (Actor s) (Async (Obituary s)))) (|>> :representation (get@ #obituary) product.left)) @@ -142,12 +143,12 @@ (do {! io.monad} [alive? (..alive? actor)] (if alive? - (let [entry [mail (promise.promise [])]] + (let [entry [mail (async.async [])]] (do ! [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))] (loop [[|mailbox| resolve] |mailbox|&resolve] (do ! - [|mailbox| (promise.poll |mailbox|)] + [|mailbox| (async.poll |mailbox|)] (case |mailbox| #.None (do ! @@ -164,39 +165,39 @@ (type: #export (Message s o) {#.doc (doc "A two-way message sent to an actor, expecting a reply.")} - (-> s (Actor s) (Promise (Try [s o])))) + (-> s (Actor s) (Async (Try [s o])))) (def: (mail message) - (All [s o] (-> (Message s o) [(Promise (Try o)) (Mail s)])) - (let [[promise resolve] (:sharing [s o] - (Message s o) - message - - [(Promise (Try o)) - (Resolver (Try o))] - (promise.promise []))] - [promise + (All [s o] (-> (Message s o) [(Async (Try o)) (Mail s)])) + (let [[async resolve] (:sharing [s o] + (Message s o) + message + + [(Async (Try o)) + (Resolver (Try o))] + (async.async []))] + [async (function (_ state self) - (do {! promise.monad} + (do {! async.monad} [outcome (message state self)] (case outcome (#try.Success [state' return]) (exec (io.run (resolve (#try.Success return))) - (promise.resolved (#try.Success state'))) + (async.resolved (#try.Success state'))) (#try.Failure error) (exec (io.run (resolve (#try.Failure error))) - (promise.resolved (#try.Failure error))))))])) + (async.resolved (#try.Failure error))))))])) (def: #export (tell! message actor) {#.doc (doc "Communicate with an actor through message-passing.")} - (All [s o] (-> (Message s o) (Actor s) (Promise (Try o)))) - (let [[promise mail] (..mail message)] - (do promise.monad - [outcome (promise.future (..mail! mail actor))] + (All [s o] (-> (Message s o) (Actor s) (Async (Try o)))) + (let [[async mail] (..mail message)] + (do async.monad + [outcome (async.future (..mail! mail actor))] (case outcome (#try.Success) - promise + async (#try.Failure error) (wrap (#try.Failure error)))))) @@ -204,7 +205,7 @@ ) (def: (default_on_mail mail state self) - (All [s] (-> (Mail s) s (Actor s) (Promise (Try s)))) + (All [s] (-> (Mail s) s (Actor s) (Async (Try s)))) (mail state self)) (def: #export default @@ -218,7 +219,7 @@ "but allows the actor to handle previous mail.")} (All [s] (-> (Actor s) (IO (Try Any)))) (..mail! (function (_ state self) - (promise.resolved (exception.throw ..poisoned []))) + (async.resolved (exception.throw ..poisoned []))) actor)) (def: actor_decl^ @@ -261,7 +262,7 @@ (List a) ((on_mail mail state self) - (do (try.with promise.monad) + (do (try.with async.monad) [#let [_ (debug.log! "BEFORE")] output (mail state self) #let [_ (debug.log! "AFTER")]] @@ -270,7 +271,7 @@ (message: #export (push {value a} state self) (List a) (let [state' (#.Cons value state)] - (promise.resolved (#try.Success [state' state']))))) + (async.resolved (#try.Success [state' state']))))) (actor: #export Counter Nat @@ -278,11 +279,11 @@ (message: #export (count! {increment Nat} state self) Any (let [state' (n.+ increment state)] - (promise.resolved (#try.Success [state' state'])))) + (async.resolved (#try.Success [state' state'])))) (message: #export (read! state self) Nat - (promise.resolved (#try.Success [state state])))))] + (async.resolved (#try.Success [state state])))))] (syntax: #export (actor: {export |export|.parser} {[name vars] actor_decl^} @@ -353,7 +354,7 @@ body) {#.doc (doc "A message can access the actor's state through the state parameter." "A message can also access the actor itself through the self parameter." - "A message's output must be a promise containing a 2-tuple with the updated state and a return value." + "A message's output must be an async containing a 2-tuple with the updated state and a return value." "A message may succeed or fail (in case of failure, the actor dies)." <examples>)} @@ -378,10 +379,10 @@ (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope)) (~ g!state))] (|> (~ body) - (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope)) - (~ output_type)]))) - (:as ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope)) - (~ output_type)])))))))) + (: ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope)) + (~ output_type)]))) + (:as ((~! async.Async) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope)) + (~ output_type)])))))))) )))))) (type: #export Stop |