aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux122
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux2
2 files changed, 49 insertions, 75 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index 5e5178e4b..18b385a65 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -21,88 +21,62 @@
{#.doc "An asynchronous channel to distribute values."}
(Promise (Maybe [a (Channel a)])))
-(signature: (Resolver a)
+(signature: #export (Source a)
(: (IO Bit)
close)
- (: (-> a (IO (Maybe (Resolver a))))
+ (: (-> a (IO Bit))
feed))
-(def: (resolver resolve)
+(def: (source resolve)
(All [a]
(-> (promise.Resolver (Maybe [a (Channel a)]))
- (Resolver a)))
- (structure
- (def: close
- (resolve #.None))
-
- (def: (feed value)
- (do io.Monad<IO>
- [#let [[next resolve-next] (:share [a]
- {(promise.Resolver (Maybe [a (Channel a)]))
- resolve}
- {[(Promise (Maybe [a (Channel a)]))
- (promise.Resolver (Maybe [a (Channel a)]))]
- (promise.promise [])})]
- resolved? (resolve (#.Some [value next]))]
- (if resolved?
- (wrap (#.Some (resolver resolve-next)))
- (wrap #.None))))))
-
-(abstract: #export (Source a)
- {}
-
- (Atom (Resolver a))
-
- (def: #export (close source)
- (All [a] (-> (Source a) (IO Bit)))
- (do io.Monad<IO>
- [resolver (atom.read (:representation source))
- closed? (:: resolver close)]
- (if closed?
- ## I closed the source.
- (wrap true)
- ## Someone else interacted with the source.
- (do @
- [resolver' (atom.read (:representation source))]
- (if (is? resolver resolver')
- ## Someone else closed the source.
- (wrap true)
- ## Someone else fed the source while I was closing it.
- (close source))))))
-
- (def: #export (feed source value)
- (All [a] (-> (Source a) a (IO Bit)))
- (loop [source (:representation source)]
- (do io.Monad<IO>
- [current (atom.read source)
- ?next (:: current feed value)
- fed? (case ?next
- (#.Some next)
- (atom.compare-and-swap current next source)
-
- #.None
- (wrap false))]
- (if fed?
- ## I fed the source.
- (wrap true)
- ## Someone else interacted with the source.
- (do @
- [latter (atom.read source)]
- (if (is? current latter)
- ## Someone else closed the source while I was feeding it.
- (wrap false)
- ## Someone else fed the source.
- (recur source)))))))
-
- (def: source
- (All [a] (-> (Resolver a) (Source a)))
- (|>> atom.atom :abstraction))
- )
+ (Source a)))
+ (let [source (atom.atom resolve)]
+ (structure
+ (def: close
+ (loop [_ []]
+ (do io.Monad<IO>
+ [current (atom.read source)
+ stopped? (current #.None)]
+ (if stopped?
+ ## I closed the source.
+ (wrap true)
+ ## Someone else interacted with the source.
+ (do @
+ [latter (atom.read source)]
+ (if (is? current latter)
+ ## Someone else closed the source.
+ (wrap true)
+ ## Someone else fed the source while I was closing it.
+ (recur [])))))))
+
+ (def: (feed value)
+ (loop [_ []]
+ (do io.Monad<IO>
+ [current (atom.read source)
+ #let [[next resolve-next] (:share [a]
+ {(promise.Resolver (Maybe [a (Channel a)]))
+ current}
+ {[(Promise (Maybe [a (Channel a)]))
+ (promise.Resolver (Maybe [a (Channel a)]))]
+ (promise.promise [])})]
+ fed? (current (#.Some [value next]))]
+ (if fed?
+ ## I fed the source.
+ (atom.compare-and-swap current resolve-next source)
+ ## Someone else interacted with the source.
+ (do @
+ [latter (atom.read source)]
+ (if (is? current latter)
+ ## Someone else closed the source while I was feeding it.
+ (wrap false)
+ ## Someone else fed the source.
+ (recur []))))))))))
(def: #export (channel _)
(All [a] (-> Any [(Channel a) (Source a)]))
(let [[promise resolve] (promise.promise [])]
- [promise (..source (..resolver resolve))]))
+ [promise (..source resolve)]))
(def: #export (listen listener channel)
(All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
@@ -148,7 +122,7 @@
(def: (join mma)
(let [[output source] (channel [])]
- (exec (io.run (..listen (..listen (..feed source))
+ (exec (io.run (..listen (..listen (:: source feed))
mma))
output))))
@@ -210,7 +184,7 @@
(exec (io.run (loop [_ []]
(do io.Monad<IO>
[value action
- _ (..feed source value)]
+ _ (:: source feed value)]
(promise.await recur (promise.wait milli-seconds)))))
output)))
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 31a62413e..34122abd4 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -59,7 +59,7 @@
[#let [[channel source] (frp.channel [])
target (:representation target)]
_ (atom.update (function (_ [value observers])
- [value (#.Cons (frp.feed source) observers)])
+ [value (#.Cons (:: source feed) observers)])
target)]
(wrap channel)))
)