aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux336
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux3
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux8
3 files changed, 244 insertions, 103 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index 8db54f28f..5e5178e4b 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -1,132 +1,278 @@
(.module:
- [lux #*
+ [lux (#- Source)
[control
[functor (#+ Functor)]
[apply (#+ Apply)]
- ["." monad (#+ do Monad)]]
- ["." io (#+ IO io)]
+ ["." monad (#+ do Monad)]
+ [predicate (#+ Predicate)]
+ [equivalence (#+ Equivalence)]]
+ ["." io (#+ IO)]
[data
+ [maybe ("maybe/." Functor<Maybe>)]
[collection
[list ("list/." Monoid<List>)]]]
[type (#+ :share)
abstract]]
[//
- ["." atom (#+ Atom atom)]
- ["." promise (#+ Promise)]])
+ ["." atom (#+ Atom)]
+ ["." promise (#+ Promise) ("promise/." Functor<Promise>)]])
-(abstract: #export (Channel a)
+(type: #export (Channel a)
{#.doc "An asynchronous channel to distribute values."}
- (Atom (List (-> a (IO Any))))
+ (Promise (Maybe [a (Channel a)])))
- (def: #export (channel _)
- (All [a] (-> Any (Channel a)))
- (:abstraction (atom (list))))
+(signature: (Resolver a)
+ (: (IO Bit)
+ close)
+ (: (-> a (IO (Maybe (Resolver a))))
+ feed))
- (def: #export (listen listener channel)
- (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
- ## TODO: Simplify when possible.
- (do io.Monad<IO>
- [_ (atom.update (|>> (#.Cons listener))
- (:representation channel))]
- (wrap [])))
+(def: (resolver resolve)
+ (All [a]
+ (-> (promise.Resolver (Maybe [a (Channel a)]))
+ (Resolver a)))
+ (structure
+ (def: close
+ (resolve #.None))
+
+ (def: (feed value)
+ (do io.Monad<IO>
+ [#let [[next resolve-next] (:share [a]
+ {(promise.Resolver (Maybe [a (Channel a)]))
+ resolve}
+ {[(Promise (Maybe [a (Channel a)]))
+ (promise.Resolver (Maybe [a (Channel a)]))]
+ (promise.promise [])})]
+ resolved? (resolve (#.Some [value next]))]
+ (if resolved?
+ (wrap (#.Some (resolver resolve-next)))
+ (wrap #.None))))))
- (def: #export (publish channel value)
- {#.doc "Publish to a channel."}
- (All [a] (-> (Channel a) a (IO Any)))
- (do io.Monad<IO>
- [listeners (atom.read (:representation channel))]
- (monad.map @ (function (_ listener) (listener value)) listeners)))
- )
-
-(def: #export (filter predicate input)
- (All [a] (-> (-> a Bit) (Channel a) (Channel a)))
- (let [output (channel [])]
- (exec (io.run (listen (function (_ value)
- (if (predicate value)
- (publish output value)
- (io [])))
- input))
- output)))
+(abstract: #export (Source a)
+ {}
-(def: #export (pipe output input)
- {#.doc "Copy/pipe the contents of a channel on to another."}
- (All [a] (-> (Channel a) (Channel a) (IO Any)))
- (listen (publish output)
- input))
+ (Atom (Resolver a))
-(def: #export (merge inputs)
- {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."}
- (All [a] (-> (List (Channel a)) (IO (Channel a))))
- (let [output (channel [])]
+ (def: #export (close source)
+ (All [a] (-> (Source a) (IO Bit)))
(do io.Monad<IO>
- [_ (monad.map @ (pipe output) inputs)]
- (wrap output))))
+ [resolver (atom.read (:representation source))
+ closed? (:: resolver close)]
+ (if closed?
+ ## I closed the source.
+ (wrap true)
+ ## Someone else interacted with the source.
+ (do @
+ [resolver' (atom.read (:representation source))]
+ (if (is? resolver resolver')
+ ## Someone else closed the source.
+ (wrap true)
+ ## Someone else fed the source while I was closing it.
+ (close source))))))
-(def: #export (from-promise promise)
- (All [a] (-> (Promise a) (Channel a)))
- (let [output (channel [])]
- (exec (promise.await (publish output) promise)
- output)))
+ (def: #export (feed source value)
+ (All [a] (-> (Source a) a (IO Bit)))
+ (loop [source (:representation source)]
+ (do io.Monad<IO>
+ [current (atom.read source)
+ ?next (:: current feed value)
+ fed? (case ?next
+ (#.Some next)
+ (atom.compare-and-swap current next source)
-(def: #export (poll time action)
- (All [a] (-> Nat (IO a) (Channel a)))
- (let [output (channel [])]
- (exec (io.run
- (loop [_ []]
- (do io.Monad<IO>
- [value action
- _ (publish output value)]
- (wrap (promise.await recur (promise.wait time))))))
- output)))
+ #.None
+ (wrap false))]
+ (if fed?
+ ## I fed the source.
+ (wrap true)
+ ## Someone else interacted with the source.
+ (do @
+ [latter (atom.read source)]
+ (if (is? current latter)
+ ## Someone else closed the source while I was feeding it.
+ (wrap false)
+ ## Someone else fed the source.
+ (recur source)))))))
-(def: #export (periodic time)
- (-> Nat (Channel Any))
- (let [output (channel [])]
- (exec (io.run
- (loop [_ []]
- (do io.Monad<IO>
- [_ (publish output [])]
- (wrap (promise.await recur (promise.wait time))))))
- output)))
+ (def: source
+ (All [a] (-> (Resolver a) (Source a)))
+ (|>> atom.atom :abstraction))
+ )
-(def: #export (iterate f init)
- (All [a] (-> (-> a (Promise a)) a (Channel a)))
- (let [output (channel [])]
- (exec (io.run
- (loop [zero init]
- (do io.Monad<IO>
- [_ (publish output zero)]
- (wrap (promise.await recur (f zero))))))
- output)))
+(def: #export (channel _)
+ (All [a] (-> Any [(Channel a) (Source a)]))
+ (let [[promise resolve] (promise.promise [])]
+ [promise (..source (..resolver resolve))]))
+
+(def: #export (listen listener channel)
+ (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
+ (io.io (exec (: (Promise Any)
+ (loop [channel channel]
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ (#.Some [head tail])
+ (exec (io.run (listener head))
+ (recur tail))
+
+ #.None
+ (wrap [])))))
+ [])))
(structure: #export _ (Functor Channel)
- (def: (map f input)
- (let [output (channel [])]
- (exec (io.run (listen (|>> f (publish output))
- input))
- output))))
+ (def: (map f)
+ (promise/map
+ (maybe/map
+ (function (_ [head tail])
+ [(f head) (map f tail)])))))
(structure: #export _ (Apply Channel)
(def: functor Functor<Channel>)
(def: (apply ff fa)
- (let [output (channel [])]
- (exec (io.run (listen (function (_ f)
- (listen (|>> f (publish output))
- fa))
- ff))
- output))))
+ (do promise.Monad<Promise>
+ [cons-f ff
+ cons-a fa]
+ (case [cons-f cons-a]
+ [(#.Some [head-f tail-f]) (#.Some [head-a tail-a])]
+ (wrap (#.Some [(head-f head-a) (apply tail-f tail-a)]))
+
+ _
+ (wrap #.None)))))
(structure: #export _ (Monad Channel)
(def: functor Functor<Channel>)
(def: (wrap a)
- (let [output (channel [])]
- (exec (io.run (publish output a))
- output)))
+ (promise.resolved (#.Some [a (promise.resolved #.None)])))
(def: (join mma)
- (let [output (channel [])]
- (exec (io.run (listen (listen (publish output))
- mma))
+ (let [[output source] (channel [])]
+ (exec (io.run (..listen (..listen (..feed source))
+ mma))
output))))
+
+(def: #export (filter pass? channel)
+ (All [a] (-> (Predicate a) (Channel a) (Channel a)))
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ (#.Some [head tail])
+ (let [tail' (filter pass? tail)]
+ (if (pass? head)
+ (wrap (#.Some [head tail']))
+ tail'))
+
+ #.None
+ (wrap #.None))))
+
+(def: #export (from-promise promise)
+ (All [a] (-> (Promise a) (Channel a)))
+ (promise/map (function (_ value)
+ (#.Some [value (promise.resolved #.None)]))
+ promise))
+
+(def: #export (fold f init channel)
+ {#.doc "Asynchronous fold over channels."}
+ (All [a b]
+ (-> (-> b a (Promise a)) a (Channel b)
+ (Promise a)))
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ #.None
+ (wrap init)
+
+ (#.Some [head tail])
+ (do @
+ [init' (f head init)]
+ (fold f init' tail)))))
+
+(def: #export (folds f init channel)
+ {#.doc "A channel of folds."}
+ (All [a b]
+ (-> (-> b a (Promise a)) a (Channel b)
+ (Channel a)))
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ #.None
+ (wrap (#.Some [init (wrap #.None)]))
+
+ (#.Some [head tail])
+ (do @
+ [init' (f head init)]
+ (folds f init' tail)))))
+
+(def: #export (poll milli-seconds action)
+ (All [a] (-> Nat (IO a) (Channel a)))
+ (let [[output source] (channel [])]
+ (exec (io.run (loop [_ []]
+ (do io.Monad<IO>
+ [value action
+ _ (..feed source value)]
+ (promise.await recur (promise.wait milli-seconds)))))
+ output)))
+
+(def: #export (periodic milli-seconds)
+ (-> Nat (Channel Any))
+ (poll milli-seconds (io.io [])))
+
+(def: #export (iterate f init)
+ (All [a] (-> (-> a (Promise (Maybe a))) a (Channel a)))
+ (do promise.Monad<Promise>
+ [?next (f init)]
+ (case ?next
+ (#.Some next)
+ (wrap (#.Some [init (iterate f next)]))
+
+ #.None
+ (wrap (#.Some [init (wrap #.None)])))))
+
+(def: (distinct' equivalence previous channel)
+ (All [a] (-> (Equivalence a) a (Channel a) (Channel a)))
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ (#.Some [head tail])
+ (if (:: equivalence = previous head)
+ (distinct' equivalence previous tail)
+ (wrap (#.Some [head (distinct' equivalence head tail)])))
+
+ #.None
+ (wrap #.None))))
+
+(def: #export (distinct equivalence channel)
+ (All [a] (-> (Equivalence a) (Channel a) (Channel a)))
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ (#.Some [head tail])
+ (wrap (#.Some [head (distinct' equivalence head tail)]))
+
+ #.None
+ (wrap #.None))))
+
+(def: #export (consume channel)
+ {#.doc "Reads the entirety of a channel's content and returns it as a list."}
+ (All [a] (-> (Channel a) (Promise (List a))))
+ (do promise.Monad<Promise>
+ [cons channel]
+ (case cons
+ (#.Some [head tail])
+ (:: @ map (|>> (#.Cons head))
+ (consume tail))
+
+ #.None
+ (wrap #.Nil))))
+
+(def: #export (sequential milli-seconds values)
+ (All [a] (-> Nat (List a) (Channel a)))
+ (case values
+ #.Nil
+ (promise.resolved #.None)
+
+ (#.Cons head tail)
+ (promise.resolved (#.Some [head (do promise.Monad<Promise>
+ [_ (promise.wait milli-seconds)]
+ (sequential milli-seconds tail))]))))
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
index 2530d6080..31d620b64 100644
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -101,8 +101,7 @@
(structure: #export _ (Monad Promise)
(def: functor Functor<Promise>)
- (def: (wrap a)
- (..resolved a))
+ (def: wrap ..resolved)
(def: (join mma)
(let [[ma resolve] (promise [])]
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index f54e16baf..31a62413e 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -52,18 +52,14 @@
(wrap []))
(write! new-value (:abstraction var)))))
- ## TODO: Remove when possible
- (def: (helper|follow var)
- (All [a] (-> (Var a) (frp.Channel a)))
- (frp.channel []))
(def: #export (follow target)
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
(All [a] (-> (Var a) (IO (frp.Channel a))))
(do io.Monad<IO>
- [#let [channel (helper|follow target)
+ [#let [[channel source] (frp.channel [])
target (:representation target)]
_ (atom.update (function (_ [value observers])
- [value (#.Cons (frp.publish channel) observers)])
+ [value (#.Cons (frp.feed source) observers)])
target)]
(wrap channel)))
)