diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/frp.lux | 106 |
1 files changed, 55 insertions, 51 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/frp.lux b/stdlib/source/library/lux/control/concurrency/frp.lux index 32cc8118a..97ed9bf6c 100644 --- a/stdlib/source/library/lux/control/concurrency/frp.lux +++ b/stdlib/source/library/lux/control/concurrency/frp.lux @@ -1,6 +1,6 @@ (.module: [library - [lux #* + [lux (#- list) [abstract [equivalence (#+ Equivalence)] [functor (#+ Functor)] @@ -124,50 +124,53 @@ (def: (join mma) (let [[output sink] (channel [])] - (exec (: (Async Any) - (loop [mma mma] - (do {! async.monad} - [?mma mma] - (case ?mma - (#.Some [ma mma']) - (do ! - [_ (loop [ma ma] - (do ! - [?ma ma] - (case ?ma - (#.Some [a ma']) - (exec (io.run! (\ sink feed a)) - (recur ma')) - - #.None - (in []))))] - (recur mma')) - - #.None - (in (: Any (io.run! (\ sink close)))))))) + (exec + (: (Async Any) + (loop [mma mma] + (do {! async.monad} + [?mma mma] + (case ?mma + (#.Some [ma mma']) + (do ! + [_ (loop [ma ma] + (do ! + [?ma ma] + (case ?ma + (#.Some [a ma']) + (exec + (io.run! (\ sink feed a)) + (recur ma')) + + #.None + (in []))))] + (recur mma')) + + #.None + (in (: Any (io.run! (\ sink close)))))))) output)))) (type: .public (Subscriber a) {#.doc (example "A function that can receive every value fed into a channel.")} (-> a (IO (Maybe Any)))) -(def: .public (subscribe subscriber channel) +(def: .public (subscribe! subscriber channel) (All [a] (-> (Subscriber a) (Channel a) (IO Any))) - (io (exec (: (Async Any) - (loop [channel channel] - (do async.monad - [item channel] - (case item - (#.Some [head tail]) - (case (io.run! (subscriber head)) - (#.Some _) - (recur tail) - - #.None - (in [])) - - #.None - (in []))))) + (io (exec + (: (Async Any) + (loop [channel channel] + (do async.monad + [item channel] + (case item + (#.Some [head tail]) + (case (io.run! (subscriber head)) + (#.Some _) + (recur tail) + + #.None + (in [])) + + #.None + (in []))))) []))) (def: .public (only pass? channel) @@ -228,11 +231,12 @@ (All [a] (-> Nat (IO a) [(Channel a) (Sink a)])) (let [[output sink] (channel [])] - (exec (io.run! (loop [_ []] - (do io.monad - [value action - _ (\ sink feed value)] - (async.upon! recur (async.delay milli_seconds))))) + (exec + (io.run! (loop [_ []] + (do io.monad + [value action + _ (\ sink feed value)] + (async.upon! recur (async.delay milli_seconds))))) [output sink]))) (def: .public (periodic milli_seconds) @@ -267,21 +271,21 @@ (All [a] (-> (Equivalence a) (Channel a) (Channel a))) (do async.monad [item channel] - (case item - (#.Some [head tail]) - (in (#.Some [head (distinct' equivalence head tail)])) - - #.None - (in #.None)))) + (in (case item + (#.Some [head tail]) + (#.Some [head (distinct' equivalence head tail)]) + + #.None + #.None)))) -(def: .public (consume channel) +(def: .public (list channel) (All [a] (-> (Channel a) (Async (List a)))) (do {! async.monad} [item channel] (case item (#.Some [head tail]) (\ ! map (|>> (#.Item head)) - (consume tail)) + (list tail)) #.None (in #.End)))) |