From 0e32b3711890f6d8ab25e76ea7810ccd9445ef07 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sun, 16 Dec 2018 00:16:30 -0400 Subject: Improved how the "Source" type works. --- stdlib/source/lux/control/concurrency/frp.lux | 122 ++++++++++---------------- stdlib/source/lux/control/concurrency/stm.lux | 2 +- 2 files changed, 49 insertions(+), 75 deletions(-) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index 5e5178e4b..18b385a65 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -21,88 +21,62 @@ {#.doc "An asynchronous channel to distribute values."} (Promise (Maybe [a (Channel a)]))) -(signature: (Resolver a) +(signature: #export (Source a) (: (IO Bit) close) - (: (-> a (IO (Maybe (Resolver a)))) + (: (-> a (IO Bit)) feed)) -(def: (resolver resolve) +(def: (source resolve) (All [a] (-> (promise.Resolver (Maybe [a (Channel a)])) - (Resolver a))) - (structure - (def: close - (resolve #.None)) - - (def: (feed value) - (do io.Monad - [#let [[next resolve-next] (:share [a] - {(promise.Resolver (Maybe [a (Channel a)])) - resolve} - {[(Promise (Maybe [a (Channel a)])) - (promise.Resolver (Maybe [a (Channel a)]))] - (promise.promise [])})] - resolved? (resolve (#.Some [value next]))] - (if resolved? - (wrap (#.Some (resolver resolve-next))) - (wrap #.None)))))) - -(abstract: #export (Source a) - {} - - (Atom (Resolver a)) - - (def: #export (close source) - (All [a] (-> (Source a) (IO Bit))) - (do io.Monad - [resolver (atom.read (:representation source)) - closed? (:: resolver close)] - (if closed? - ## I closed the source. - (wrap true) - ## Someone else interacted with the source. - (do @ - [resolver' (atom.read (:representation source))] - (if (is? resolver resolver') - ## Someone else closed the source. - (wrap true) - ## Someone else fed the source while I was closing it. - (close source)))))) - - (def: #export (feed source value) - (All [a] (-> (Source a) a (IO Bit))) - (loop [source (:representation source)] - (do io.Monad - [current (atom.read source) - ?next (:: current feed value) - fed? (case ?next - (#.Some next) - (atom.compare-and-swap current next source) - - #.None - (wrap false))] - (if fed? - ## I fed the source. - (wrap true) - ## Someone else interacted with the source. - (do @ - [latter (atom.read source)] - (if (is? current latter) - ## Someone else closed the source while I was feeding it. - (wrap false) - ## Someone else fed the source. - (recur source))))))) - - (def: source - (All [a] (-> (Resolver a) (Source a))) - (|>> atom.atom :abstraction)) - ) + (Source a))) + (let [source (atom.atom resolve)] + (structure + (def: close + (loop [_ []] + (do io.Monad + [current (atom.read source) + stopped? (current #.None)] + (if stopped? + ## I closed the source. + (wrap true) + ## Someone else interacted with the source. + (do @ + [latter (atom.read source)] + (if (is? current latter) + ## Someone else closed the source. + (wrap true) + ## Someone else fed the source while I was closing it. + (recur []))))))) + + (def: (feed value) + (loop [_ []] + (do io.Monad + [current (atom.read source) + #let [[next resolve-next] (:share [a] + {(promise.Resolver (Maybe [a (Channel a)])) + current} + {[(Promise (Maybe [a (Channel a)])) + (promise.Resolver (Maybe [a (Channel a)]))] + (promise.promise [])})] + fed? (current (#.Some [value next]))] + (if fed? + ## I fed the source. + (atom.compare-and-swap current resolve-next source) + ## Someone else interacted with the source. + (do @ + [latter (atom.read source)] + (if (is? current latter) + ## Someone else closed the source while I was feeding it. + (wrap false) + ## Someone else fed the source. + (recur [])))))))))) (def: #export (channel _) (All [a] (-> Any [(Channel a) (Source a)])) (let [[promise resolve] (promise.promise [])] - [promise (..source (..resolver resolve))])) + [promise (..source resolve)])) (def: #export (listen listener channel) (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) @@ -148,7 +122,7 @@ (def: (join mma) (let [[output source] (channel [])] - (exec (io.run (..listen (..listen (..feed source)) + (exec (io.run (..listen (..listen (:: source feed)) mma)) output)))) @@ -210,7 +184,7 @@ (exec (io.run (loop [_ []] (do io.Monad [value action - _ (..feed source value)] + _ (:: source feed value)] (promise.await recur (promise.wait milli-seconds))))) output))) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 31a62413e..34122abd4 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -59,7 +59,7 @@ [#let [[channel source] (frp.channel []) target (:representation target)] _ (atom.update (function (_ [value observers]) - [value (#.Cons (frp.feed source) observers)]) + [value (#.Cons (:: source feed) observers)]) target)] (wrap channel))) ) -- cgit v1.2.3