diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 31 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 2 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 17 |
3 files changed, 44 insertions, 6 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index c5f6ca6c7..aa30efa76 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -30,7 +30,8 @@ ["." abstract (#+ abstract: :representation :abstraction)]]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver) ("#@." monad)]]) + ["." promise (#+ Promise Resolver) ("#@." monad)] + ["." frp (#+ Channel)]]) (exception: #export poisoned) (exception: #export dead) @@ -122,6 +123,12 @@ (let [[obituary _] (get@ #obituary (:representation actor))] (promise.poll obituary))) + (def: #export await + (All [s] (-> (Actor s) (Promise (Obituary s)))) + (|>> :representation + (get@ #obituary) + product.left)) + (def: #export (mail! mail actor) {#.doc "Send mail to an actor.."} (All [s] (-> (Mail s) (Actor s) (IO (Try Any)))) @@ -389,3 +396,25 @@ (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor-scope)) (~ (get@ #output signature))])))))))) )))))) + +(type: #export Stop + (IO Any)) + +(def: continue! true) +(def: stop! false) + +(def: #export (observe action channel actor) + (All [e s] (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any))) + (let [signal (: (Atom Bit) + (atom.atom ..continue!)) + stop (: Stop + (atom.write ..stop! signal))] + (frp.subscribe (function (_ event) + (do {! io.monad} + [continue? (atom.read signal)] + (if continue? + (do ! + [outcome (..mail! (action event stop) actor)] + (wrap (try.to-maybe outcome))) + (wrap #.None)))) + channel))) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux index 9ebd54bb8..54be96d76 100644 --- a/stdlib/source/lux/control/concurrency/atom.lux +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -98,4 +98,4 @@ (def: #export (write value atom) (All [a] (-> a (Atom a) (IO Any))) - (update (function.constant value) atom)) + (..update (function.constant value) atom)) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index 50c26e769..fdec66a61 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -110,7 +110,9 @@ _ (wrap #.None))))) -(def: empty Channel (promise.resolved #.None)) +(def: empty + Channel + (promise.resolved #.None)) (structure: #export monad (Monad Channel) @@ -145,15 +147,22 @@ (wrap (: Any (io.run (:: sink close)))))))) output)))) -(def: #export (listen listener channel) - (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) +(type: #export (Subscriber a) + (-> a (IO (Maybe Any)))) + +(def: #export (subscribe subscriber channel) + (All [a] (-> (Subscriber a) (Channel a) (IO Any))) (io (exec (: (Promise Any) (loop [channel channel] (do promise.monad [cons channel] (case cons (#.Some [head tail]) - (exec (io.run (listener head)) + (case (io.run (subscriber head)) + (#.Some _) + (recur tail) + + #.None (recur tail)) #.None |