aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux31
-rw-r--r--stdlib/source/lux/control/concurrency/atom.lux2
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux17
3 files changed, 44 insertions, 6 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index c5f6ca6c7..aa30efa76 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -30,7 +30,8 @@
["." abstract (#+ abstract: :representation :abstraction)]]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver) ("#@." monad)]])
+ ["." promise (#+ Promise Resolver) ("#@." monad)]
+ ["." frp (#+ Channel)]])
(exception: #export poisoned)
(exception: #export dead)
@@ -122,6 +123,12 @@
(let [[obituary _] (get@ #obituary (:representation actor))]
(promise.poll obituary)))
+ (def: #export await
+ (All [s] (-> (Actor s) (Promise (Obituary s))))
+ (|>> :representation
+ (get@ #obituary)
+ product.left))
+
(def: #export (mail! mail actor)
{#.doc "Send mail to an actor.."}
(All [s] (-> (Mail s) (Actor s) (IO (Try Any))))
@@ -389,3 +396,25 @@
(:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor-scope))
(~ (get@ #output signature))]))))))))
))))))
+
+(type: #export Stop
+ (IO Any))
+
+(def: continue! true)
+(def: stop! false)
+
+(def: #export (observe action channel actor)
+ (All [e s] (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any)))
+ (let [signal (: (Atom Bit)
+ (atom.atom ..continue!))
+ stop (: Stop
+ (atom.write ..stop! signal))]
+ (frp.subscribe (function (_ event)
+ (do {! io.monad}
+ [continue? (atom.read signal)]
+ (if continue?
+ (do !
+ [outcome (..mail! (action event stop) actor)]
+ (wrap (try.to-maybe outcome)))
+ (wrap #.None))))
+ channel)))
diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux
index 9ebd54bb8..54be96d76 100644
--- a/stdlib/source/lux/control/concurrency/atom.lux
+++ b/stdlib/source/lux/control/concurrency/atom.lux
@@ -98,4 +98,4 @@
(def: #export (write value atom)
(All [a] (-> a (Atom a) (IO Any)))
- (update (function.constant value) atom))
+ (..update (function.constant value) atom))
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index 50c26e769..fdec66a61 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -110,7 +110,9 @@
_
(wrap #.None)))))
-(def: empty Channel (promise.resolved #.None))
+(def: empty
+ Channel
+ (promise.resolved #.None))
(structure: #export monad
(Monad Channel)
@@ -145,15 +147,22 @@
(wrap (: Any (io.run (:: sink close))))))))
output))))
-(def: #export (listen listener channel)
- (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
+(type: #export (Subscriber a)
+ (-> a (IO (Maybe Any))))
+
+(def: #export (subscribe subscriber channel)
+ (All [a] (-> (Subscriber a) (Channel a) (IO Any)))
(io (exec (: (Promise Any)
(loop [channel channel]
(do promise.monad
[cons channel]
(case cons
(#.Some [head tail])
- (exec (io.run (listener head))
+ (case (io.run (subscriber head))
+ (#.Some _)
+ (recur tail)
+
+ #.None
(recur tail))
#.None