aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/frp.lux
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/library/lux/control/concurrency/frp.lux106
1 files changed, 55 insertions, 51 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/frp.lux b/stdlib/source/library/lux/control/concurrency/frp.lux
index 32cc8118a..97ed9bf6c 100644
--- a/stdlib/source/library/lux/control/concurrency/frp.lux
+++ b/stdlib/source/library/lux/control/concurrency/frp.lux
@@ -1,6 +1,6 @@
(.module:
[library
- [lux #*
+ [lux (#- list)
[abstract
[equivalence (#+ Equivalence)]
[functor (#+ Functor)]
@@ -124,50 +124,53 @@
(def: (join mma)
(let [[output sink] (channel [])]
- (exec (: (Async Any)
- (loop [mma mma]
- (do {! async.monad}
- [?mma mma]
- (case ?mma
- (#.Some [ma mma'])
- (do !
- [_ (loop [ma ma]
- (do !
- [?ma ma]
- (case ?ma
- (#.Some [a ma'])
- (exec (io.run! (\ sink feed a))
- (recur ma'))
-
- #.None
- (in []))))]
- (recur mma'))
-
- #.None
- (in (: Any (io.run! (\ sink close))))))))
+ (exec
+ (: (Async Any)
+ (loop [mma mma]
+ (do {! async.monad}
+ [?mma mma]
+ (case ?mma
+ (#.Some [ma mma'])
+ (do !
+ [_ (loop [ma ma]
+ (do !
+ [?ma ma]
+ (case ?ma
+ (#.Some [a ma'])
+ (exec
+ (io.run! (\ sink feed a))
+ (recur ma'))
+
+ #.None
+ (in []))))]
+ (recur mma'))
+
+ #.None
+ (in (: Any (io.run! (\ sink close))))))))
output))))
(type: .public (Subscriber a)
{#.doc (example "A function that can receive every value fed into a channel.")}
(-> a (IO (Maybe Any))))
-(def: .public (subscribe subscriber channel)
+(def: .public (subscribe! subscriber channel)
(All [a] (-> (Subscriber a) (Channel a) (IO Any)))
- (io (exec (: (Async Any)
- (loop [channel channel]
- (do async.monad
- [item channel]
- (case item
- (#.Some [head tail])
- (case (io.run! (subscriber head))
- (#.Some _)
- (recur tail)
-
- #.None
- (in []))
-
- #.None
- (in [])))))
+ (io (exec
+ (: (Async Any)
+ (loop [channel channel]
+ (do async.monad
+ [item channel]
+ (case item
+ (#.Some [head tail])
+ (case (io.run! (subscriber head))
+ (#.Some _)
+ (recur tail)
+
+ #.None
+ (in []))
+
+ #.None
+ (in [])))))
[])))
(def: .public (only pass? channel)
@@ -228,11 +231,12 @@
(All [a]
(-> Nat (IO a) [(Channel a) (Sink a)]))
(let [[output sink] (channel [])]
- (exec (io.run! (loop [_ []]
- (do io.monad
- [value action
- _ (\ sink feed value)]
- (async.upon! recur (async.delay milli_seconds)))))
+ (exec
+ (io.run! (loop [_ []]
+ (do io.monad
+ [value action
+ _ (\ sink feed value)]
+ (async.upon! recur (async.delay milli_seconds)))))
[output sink])))
(def: .public (periodic milli_seconds)
@@ -267,21 +271,21 @@
(All [a] (-> (Equivalence a) (Channel a) (Channel a)))
(do async.monad
[item channel]
- (case item
- (#.Some [head tail])
- (in (#.Some [head (distinct' equivalence head tail)]))
-
- #.None
- (in #.None))))
+ (in (case item
+ (#.Some [head tail])
+ (#.Some [head (distinct' equivalence head tail)])
+
+ #.None
+ #.None))))
-(def: .public (consume channel)
+(def: .public (list channel)
(All [a] (-> (Channel a) (Async (List a))))
(do {! async.monad}
[item channel]
(case item
(#.Some [head tail])
(\ ! map (|>> (#.Item head))
- (consume tail))
+ (list tail))
#.None
(in #.End))))