diff options
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 336 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 3 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 8 | ||||
-rw-r--r-- | stdlib/test/test/lux/control/concurrency/frp.lux | 109 |
4 files changed, 264 insertions, 192 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index 8db54f28f..5e5178e4b 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -1,132 +1,278 @@ (.module: - [lux #* + [lux (#- Source) [control [functor (#+ Functor)] [apply (#+ Apply)] - ["." monad (#+ do Monad)]] - ["." io (#+ IO io)] + ["." monad (#+ do Monad)] + [predicate (#+ Predicate)] + [equivalence (#+ Equivalence)]] + ["." io (#+ IO)] [data + [maybe ("maybe/." Functor<Maybe>)] [collection [list ("list/." Monoid<List>)]]] [type (#+ :share) abstract]] [// - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise)]]) + ["." atom (#+ Atom)] + ["." promise (#+ Promise) ("promise/." Functor<Promise>)]]) -(abstract: #export (Channel a) +(type: #export (Channel a) {#.doc "An asynchronous channel to distribute values."} - (Atom (List (-> a (IO Any)))) + (Promise (Maybe [a (Channel a)]))) - (def: #export (channel _) - (All [a] (-> Any (Channel a))) - (:abstraction (atom (list)))) +(signature: (Resolver a) + (: (IO Bit) + close) + (: (-> a (IO (Maybe (Resolver a)))) + feed)) - (def: #export (listen listener channel) - (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) - ## TODO: Simplify when possible. - (do io.Monad<IO> - [_ (atom.update (|>> (#.Cons listener)) - (:representation channel))] - (wrap []))) +(def: (resolver resolve) + (All [a] + (-> (promise.Resolver (Maybe [a (Channel a)])) + (Resolver a))) + (structure + (def: close + (resolve #.None)) + + (def: (feed value) + (do io.Monad<IO> + [#let [[next resolve-next] (:share [a] + {(promise.Resolver (Maybe [a (Channel a)])) + resolve} + {[(Promise (Maybe [a (Channel a)])) + (promise.Resolver (Maybe [a (Channel a)]))] + (promise.promise [])})] + resolved? (resolve (#.Some [value next]))] + (if resolved? + (wrap (#.Some (resolver resolve-next))) + (wrap #.None)))))) - (def: #export (publish channel value) - {#.doc "Publish to a channel."} - (All [a] (-> (Channel a) a (IO Any))) - (do io.Monad<IO> - [listeners (atom.read (:representation channel))] - (monad.map @ (function (_ listener) (listener value)) listeners))) - ) - -(def: #export (filter predicate input) - (All [a] (-> (-> a Bit) (Channel a) (Channel a))) - (let [output (channel [])] - (exec (io.run (listen (function (_ value) - (if (predicate value) - (publish output value) - (io []))) - input)) - output))) +(abstract: #export (Source a) + {} -(def: #export (pipe output input) - {#.doc "Copy/pipe the contents of a channel on to another."} - (All [a] (-> (Channel a) (Channel a) (IO Any))) - (listen (publish output) - input)) + (Atom (Resolver a)) -(def: #export (merge inputs) - {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} - (All [a] (-> (List (Channel a)) (IO (Channel a)))) - (let [output (channel [])] + (def: #export (close source) + (All [a] (-> (Source a) (IO Bit))) (do io.Monad<IO> - [_ (monad.map @ (pipe output) inputs)] - (wrap output)))) + [resolver (atom.read (:representation source)) + closed? (:: resolver close)] + (if closed? + ## I closed the source. + (wrap true) + ## Someone else interacted with the source. + (do @ + [resolver' (atom.read (:representation source))] + (if (is? resolver resolver') + ## Someone else closed the source. + (wrap true) + ## Someone else fed the source while I was closing it. + (close source)))))) -(def: #export (from-promise promise) - (All [a] (-> (Promise a) (Channel a))) - (let [output (channel [])] - (exec (promise.await (publish output) promise) - output))) + (def: #export (feed source value) + (All [a] (-> (Source a) a (IO Bit))) + (loop [source (:representation source)] + (do io.Monad<IO> + [current (atom.read source) + ?next (:: current feed value) + fed? (case ?next + (#.Some next) + (atom.compare-and-swap current next source) -(def: #export (poll time action) - (All [a] (-> Nat (IO a) (Channel a))) - (let [output (channel [])] - (exec (io.run - (loop [_ []] - (do io.Monad<IO> - [value action - _ (publish output value)] - (wrap (promise.await recur (promise.wait time)))))) - output))) + #.None + (wrap false))] + (if fed? + ## I fed the source. + (wrap true) + ## Someone else interacted with the source. + (do @ + [latter (atom.read source)] + (if (is? current latter) + ## Someone else closed the source while I was feeding it. + (wrap false) + ## Someone else fed the source. + (recur source))))))) -(def: #export (periodic time) - (-> Nat (Channel Any)) - (let [output (channel [])] - (exec (io.run - (loop [_ []] - (do io.Monad<IO> - [_ (publish output [])] - (wrap (promise.await recur (promise.wait time)))))) - output))) + (def: source + (All [a] (-> (Resolver a) (Source a))) + (|>> atom.atom :abstraction)) + ) -(def: #export (iterate f init) - (All [a] (-> (-> a (Promise a)) a (Channel a))) - (let [output (channel [])] - (exec (io.run - (loop [zero init] - (do io.Monad<IO> - [_ (publish output zero)] - (wrap (promise.await recur (f zero)))))) - output))) +(def: #export (channel _) + (All [a] (-> Any [(Channel a) (Source a)])) + (let [[promise resolve] (promise.promise [])] + [promise (..source (..resolver resolve))])) + +(def: #export (listen listener channel) + (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) + (io.io (exec (: (Promise Any) + (loop [channel channel] + (do promise.Monad<Promise> + [cons channel] + (case cons + (#.Some [head tail]) + (exec (io.run (listener head)) + (recur tail)) + + #.None + (wrap []))))) + []))) (structure: #export _ (Functor Channel) - (def: (map f input) - (let [output (channel [])] - (exec (io.run (listen (|>> f (publish output)) - input)) - output)))) + (def: (map f) + (promise/map + (maybe/map + (function (_ [head tail]) + [(f head) (map f tail)]))))) (structure: #export _ (Apply Channel) (def: functor Functor<Channel>) (def: (apply ff fa) - (let [output (channel [])] - (exec (io.run (listen (function (_ f) - (listen (|>> f (publish output)) - fa)) - ff)) - output)))) + (do promise.Monad<Promise> + [cons-f ff + cons-a fa] + (case [cons-f cons-a] + [(#.Some [head-f tail-f]) (#.Some [head-a tail-a])] + (wrap (#.Some [(head-f head-a) (apply tail-f tail-a)])) + + _ + (wrap #.None))))) (structure: #export _ (Monad Channel) (def: functor Functor<Channel>) (def: (wrap a) - (let [output (channel [])] - (exec (io.run (publish output a)) - output))) + (promise.resolved (#.Some [a (promise.resolved #.None)]))) (def: (join mma) - (let [output (channel [])] - (exec (io.run (listen (listen (publish output)) - mma)) + (let [[output source] (channel [])] + (exec (io.run (..listen (..listen (..feed source)) + mma)) output)))) + +(def: #export (filter pass? channel) + (All [a] (-> (Predicate a) (Channel a) (Channel a))) + (do promise.Monad<Promise> + [cons channel] + (case cons + (#.Some [head tail]) + (let [tail' (filter pass? tail)] + (if (pass? head) + (wrap (#.Some [head tail'])) + tail')) + + #.None + (wrap #.None)))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Channel a))) + (promise/map (function (_ value) + (#.Some [value (promise.resolved #.None)])) + promise)) + +(def: #export (fold f init channel) + {#.doc "Asynchronous fold over channels."} + (All [a b] + (-> (-> b a (Promise a)) a (Channel b) + (Promise a))) + (do promise.Monad<Promise> + [cons channel] + (case cons + #.None + (wrap init) + + (#.Some [head tail]) + (do @ + [init' (f head init)] + (fold f init' tail))))) + +(def: #export (folds f init channel) + {#.doc "A channel of folds."} + (All [a b] + (-> (-> b a (Promise a)) a (Channel b) + (Channel a))) + (do promise.Monad<Promise> + [cons channel] + (case cons + #.None + (wrap (#.Some [init (wrap #.None)])) + + (#.Some [head tail]) + (do @ + [init' (f head init)] + (folds f init' tail))))) + +(def: #export (poll milli-seconds action) + (All [a] (-> Nat (IO a) (Channel a))) + (let [[output source] (channel [])] + (exec (io.run (loop [_ []] + (do io.Monad<IO> + [value action + _ (..feed source value)] + (promise.await recur (promise.wait milli-seconds))))) + output))) + +(def: #export (periodic milli-seconds) + (-> Nat (Channel Any)) + (poll milli-seconds (io.io []))) + +(def: #export (iterate f init) + (All [a] (-> (-> a (Promise (Maybe a))) a (Channel a))) + (do promise.Monad<Promise> + [?next (f init)] + (case ?next + (#.Some next) + (wrap (#.Some [init (iterate f next)])) + + #.None + (wrap (#.Some [init (wrap #.None)]))))) + +(def: (distinct' equivalence previous channel) + (All [a] (-> (Equivalence a) a (Channel a) (Channel a))) + (do promise.Monad<Promise> + [cons channel] + (case cons + (#.Some [head tail]) + (if (:: equivalence = previous head) + (distinct' equivalence previous tail) + (wrap (#.Some [head (distinct' equivalence head tail)]))) + + #.None + (wrap #.None)))) + +(def: #export (distinct equivalence channel) + (All [a] (-> (Equivalence a) (Channel a) (Channel a))) + (do promise.Monad<Promise> + [cons channel] + (case cons + (#.Some [head tail]) + (wrap (#.Some [head (distinct' equivalence head tail)])) + + #.None + (wrap #.None)))) + +(def: #export (consume channel) + {#.doc "Reads the entirety of a channel's content and returns it as a list."} + (All [a] (-> (Channel a) (Promise (List a)))) + (do promise.Monad<Promise> + [cons channel] + (case cons + (#.Some [head tail]) + (:: @ map (|>> (#.Cons head)) + (consume tail)) + + #.None + (wrap #.Nil)))) + +(def: #export (sequential milli-seconds values) + (All [a] (-> Nat (List a) (Channel a))) + (case values + #.Nil + (promise.resolved #.None) + + (#.Cons head tail) + (promise.resolved (#.Some [head (do promise.Monad<Promise> + [_ (promise.wait milli-seconds)] + (sequential milli-seconds tail))])))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index 2530d6080..31d620b64 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -101,8 +101,7 @@ (structure: #export _ (Monad Promise) (def: functor Functor<Promise>) - (def: (wrap a) - (..resolved a)) + (def: wrap ..resolved) (def: (join mma) (let [[ma resolve] (promise [])] diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index f54e16baf..31a62413e 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -52,18 +52,14 @@ (wrap [])) (write! new-value (:abstraction var))))) - ## TODO: Remove when possible - (def: (helper|follow var) - (All [a] (-> (Var a) (frp.Channel a))) - (frp.channel [])) (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} (All [a] (-> (Var a) (IO (frp.Channel a)))) (do io.Monad<IO> - [#let [channel (helper|follow target) + [#let [[channel source] (frp.channel []) target (:representation target)] _ (atom.update (function (_ [value observers]) - [value (#.Cons (frp.publish channel) observers)]) + [value (#.Cons (frp.feed source) observers)]) target)] (wrap channel))) ) diff --git a/stdlib/test/test/lux/control/concurrency/frp.lux b/stdlib/test/test/lux/control/concurrency/frp.lux index 04ddd5986..a906ee54b 100644 --- a/stdlib/test/test/lux/control/concurrency/frp.lux +++ b/stdlib/test/test/lux/control/concurrency/frp.lux @@ -13,110 +13,41 @@ ["." list]]]] lux/test) -(def: (write! values channel) - (All [a] (-> (List a) (Channel a) (IO Any))) - (do io.Monad<IO> - [_ (monad.map @ (frp.publish channel) values)] - (wrap []))) - -(def: (read! channel) - (All [a] (-> (Channel a) (IO (Atom (List a))))) - (do io.Monad<IO> - [#let [output (atom (list))] - _ (frp.listen (function (_ value) - ## TODO: Simplify when possible. - (do @ - [_ (atom.update (|>> (#.Cons value)) output)] - (wrap []))) - channel)] - (wrap output))) - (context: "FRP" (let [(^open "list/.") (list.Equivalence<List> number.Equivalence<Int>)] ($_ seq (wrap (do promise.Monad<Promise> - [#let [values (list +0 +1 +2 +3 +4 +5)] - output (promise.future - (do io.Monad<IO> - [#let [input (: (Channel Int) (frp.channel []))] - output (read! input) - _ (write! values input)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can pipe one channel into another." - (list/= values - (list.reverse output))))) - - (wrap (do promise.Monad<Promise> - [output (promise.future - (do io.Monad<IO> - [#let [input (: (Channel Int) (frp.channel [])) - elems (frp.filter i/even? input)] - output (read! elems) - _ (write! (list +0 +1 +2 +3 +4 +5) input)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] + [output (|> (list +0 +1 +2 +3 +4 +5) + (frp.sequential 0) + (frp.filter i/even?) + frp.consume)] (assert "Can filter a channel's elements." - (list/= (list +0 +2 +4) - (list.reverse output))))) - - (wrap (do promise.Monad<Promise> - [output (promise.future - (do io.Monad<IO> - [#let [left (: (Channel Int) (frp.channel [])) - right (: (Channel Int) (frp.channel []))] - merged (frp.merge (list left right)) - output (read! merged) - _ (write! (list +0 +1 +2 +3 +4 +5) left) - _ (write! (list +0 -1 -2 -3 -4 -5) right)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can merge channels." - (list/= (list +0 +1 +2 +3 +4 +5 +0 -1 -2 -3 -4 -5) - (list.reverse output))))) + (list/= (list +0 +2 +4) output)))) (wrap (do promise.Monad<Promise> - [output (promise.future - (do io.Monad<IO> - [#let [inputs (: (Channel Int) (frp.channel [])) - mapped (:: frp.Functor<Channel> map inc inputs)] - output (read! mapped) - _ (write! (list +0 +1 +2 +3 +4 +5) inputs)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] + [output (|> (list +0 +1 +2 +3 +4 +5) + (frp.sequential 0) + (:: frp.Functor<Channel> map inc) + frp.consume)] (assert "Functor goes over every element in a channel." (list/= (list +1 +2 +3 +4 +5 +6) - (list.reverse output))))) + output)))) (wrap (do promise.Monad<Promise> - [output (promise.future - (do io.Monad<IO> - [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) - >a< (: (Channel Int) (frp.channel []))] - output (read! (let [(^open ".") frp.Apply<Channel>] - (apply >f< >a<))) - _ (write! (list inc) >f<) - _ (write! (list +12345) >a<)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] + [output (frp.consume (:: frp.Apply<Channel> apply + (frp.sequential 0 (list inc)) + (frp.sequential 0 (list +12345))))] (assert "Apply works over all channel values." (list/= (list +12346) - (list.reverse output))))) + output)))) (wrap (do promise.Monad<Promise> - [output (promise.future - (read! (do frp.Monad<Channel> - [f (frp.from-promise (promise.delay 100 inc)) - a (frp.from-promise (promise.delay 200 +12345))] - (frp.from-promise (promise.delay 300 (f a)))))) - _ (promise.wait 700) - output (promise.future (atom.read output))] + [output (frp.consume + (do frp.Monad<Channel> + [f (frp.from-promise (promise/wrap inc)) + a (frp.from-promise (promise/wrap +12345))] + (wrap (f a))))] (assert "Valid monad." (list/= (list +12346) - (list.reverse output))))) + output)))) ))) |