aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux111
-rw-r--r--stdlib/source/test/lux/control.lux9
-rw-r--r--stdlib/source/test/lux/control/concurrency/frp.lux18
3 files changed, 81 insertions, 57 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index a91a4f531..907ffa97d 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -9,15 +9,15 @@
["." monad (#+ Monad do)]
["ex" exception (#+ exception:)]]
[data
- ["." maybe ("#;." functor)]
+ ["." maybe ("#@." functor)]
["." error (#+ Error)]
[collection
- ["." list ("#;." monoid)]]]
+ ["." list ("#@." monoid)]]]
[type (#+ :share)
abstract]]
[//
["." atom (#+ Atom)]
- ["." promise (#+ Promise) ("#;." functor)]])
+ ["." promise (#+ Promise) ("#@." functor)]])
(type: #export (Channel a)
{#.doc "An asynchronous channel to distribute values."}
@@ -31,33 +31,33 @@
(: (-> a (IO (Error Any)))
feed))
-(def: (source resolve)
+(def: (sink resolve)
(All [a]
(-> (promise.Resolver (Maybe [a (Channel a)]))
(Sink a)))
- (let [source (atom.atom resolve)]
+ (let [sink (atom.atom resolve)]
(structure
(def: close
(loop [_ []]
(do io.monad
- [current (atom.read source)
+ [current (atom.read sink)
stopped? (current #.None)]
(if stopped?
- ## I closed the source.
+ ## I closed the sink.
(wrap (ex.return []))
- ## Someone else interacted with the source.
+ ## Someone else interacted with the sink.
(do @
- [latter (atom.read source)]
+ [latter (atom.read sink)]
(if (is? current latter)
- ## Someone else closed the source.
+ ## Someone else closed the sink.
(wrap (ex.throw channel-is-already-closed []))
- ## Someone else fed the source while I was closing it.
+ ## Someone else fed the sink while I was closing it.
(recur [])))))))
(def: (feed value)
(loop [_ []]
(do io.monad
- [current (atom.read source)
+ [current (atom.read sink)
#let [[next resolve-next] (:share [a]
{(promise.Resolver (Maybe [a (Channel a)]))
current}
@@ -66,43 +66,28 @@
(promise.promise [])})]
fed? (current (#.Some [value next]))]
(if fed?
- ## I fed the source.
+ ## I fed the sink.
(do @
- [_ (atom.compare-and-swap current resolve-next source)]
+ [_ (atom.compare-and-swap current resolve-next sink)]
(wrap (ex.return [])))
- ## Someone else interacted with the source.
+ ## Someone else interacted with the sink.
(do @
- [latter (atom.read source)]
+ [latter (atom.read sink)]
(if (is? current latter)
- ## Someone else closed the source while I was feeding it.
+ ## Someone else closed the sink while I was feeding it.
(wrap (ex.throw channel-is-already-closed []))
- ## Someone else fed the source.
+ ## Someone else fed the sink.
(recur []))))))))))
(def: #export (channel _)
(All [a] (-> Any [(Channel a) (Sink a)]))
(let [[promise resolve] (promise.promise [])]
- [promise (..source resolve)]))
-
-(def: #export (listen listener channel)
- (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
- (io (exec (: (Promise Any)
- (loop [channel channel]
- (do promise.monad
- [cons channel]
- (case cons
- (#.Some [head tail])
- (exec (io.run (listener head))
- (recur tail))
-
- #.None
- (wrap [])))))
- [])))
+ [promise (..sink resolve)]))
(structure: #export functor (Functor Channel)
(def: (map f)
- (promise;map
- (maybe;map
+ (promise@map
+ (maybe@map
(function (_ [head tail])
[(f head) (map f tail)])))))
@@ -120,18 +105,54 @@
_
(wrap #.None)))))
+(def: empty Channel (promise.resolved #.None))
+
(structure: #export monad (Monad Channel)
(def: &functor ..functor)
(def: (wrap a)
- (promise.resolved (#.Some [a (promise.resolved #.None)])))
+ (promise.resolved (#.Some [a ..empty])))
(def: (join mma)
- (let [[output source] (channel [])]
- (exec (io.run (..listen (..listen (:: source feed))
- mma))
+ (let [[output sink] (channel [])]
+ (exec (: (Promise Any)
+ (loop [mma mma]
+ (do promise.monad
+ [?mma mma]
+ (case ?mma
+ (#.Some [ma mma'])
+ (do @
+ [_ (loop [ma ma]
+ (do @
+ [?ma ma]
+ (case ?ma
+ (#.Some [a ma'])
+ (exec (io.run (:: sink feed a))
+ (recur ma'))
+
+ #.None
+ (wrap []))))]
+ (recur mma'))
+
+ #.None
+ (wrap (: Any (io.run (:: sink close))))))))
output))))
+(def: #export (listen listener channel)
+ (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
+ (io (exec (: (Promise Any)
+ (loop [channel channel]
+ (do promise.monad
+ [cons channel]
+ (case cons
+ (#.Some [head tail])
+ (exec (io.run (listener head))
+ (recur tail))
+
+ #.None
+ (wrap [])))))
+ [])))
+
(def: #export (filter pass? channel)
(All [a] (-> (Predicate a) (Channel a) (Channel a)))
(do promise.monad
@@ -148,8 +169,8 @@
(def: #export (from-promise promise)
(All [a] (-> (Promise a) (Channel a)))
- (promise;map (function (_ value)
- (#.Some [value (promise.resolved #.None)]))
+ (promise@map (function (_ value)
+ (#.Some [value ..empty]))
promise))
(def: #export (fold f init channel)
@@ -186,11 +207,11 @@
(def: #export (poll milli-seconds action)
(All [a] (-> Nat (IO a) (Channel a)))
- (let [[output source] (channel [])]
+ (let [[output sink] (channel [])]
(exec (io.run (loop [_ []]
(do io.monad
[value action
- _ (:: source feed value)]
+ _ (:: sink feed value)]
(promise.await recur (promise.wait milli-seconds)))))
output)))
@@ -250,7 +271,7 @@
(All [a] (-> Nat (List a) (Channel a)))
(case values
#.Nil
- (promise.resolved #.None)
+ ..empty
(#.Cons head tail)
(promise.resolved (#.Some [head (do promise.monad
diff --git a/stdlib/source/test/lux/control.lux b/stdlib/source/test/lux/control.lux
index 22bd9741c..8bfd57da0 100644
--- a/stdlib/source/test/lux/control.lux
+++ b/stdlib/source/test/lux/control.lux
@@ -25,13 +25,15 @@
($_ _.and
/actor.test
/atom.test
- /frp.test))
+ /frp.test
+ ))
(def: security
Test
($_ _.and
/privacy.test
- /integrity.test))
+ /integrity.test
+ ))
(def: #export test
Test
@@ -48,4 +50,5 @@
/thread.test
/writer.test
..concurrency
- ..security))
+ ..security
+ ))
diff --git a/stdlib/source/test/lux/control/concurrency/frp.lux b/stdlib/source/test/lux/control/concurrency/frp.lux
index ea4d7adad..92e4838a8 100644
--- a/stdlib/source/test/lux/control/concurrency/frp.lux
+++ b/stdlib/source/test/lux/control/concurrency/frp.lux
@@ -5,13 +5,13 @@
[control
["." monad (#+ do)]
[concurrency
- ["." promise ("#;." monad)]
+ ["." promise ("#@." monad)]
["." atom (#+ Atom atom)]]]
[data
[number
["." nat]]
[collection
- ["." list ("#;." functor)]]]
+ ["." list ("#@." functor)]]]
[math
["r" random]]]
{1
@@ -19,7 +19,7 @@
(def: #export test
Test
- (let [(^open "list;.") (list.equivalence nat.equivalence)]
+ (let [(^open "list@.") (list.equivalence nat.equivalence)]
(do r.monad
[inputs (r.list 5 r.nat)
sample r.nat]
@@ -30,7 +30,7 @@
(/.filter n/even?)
/.consume)]
(_.assert "Can filter a channel's elements."
- (list;= (list.filter n/even? inputs)
+ (list@= (list.filter n/even? inputs)
output))))
(wrap (do promise.monad
[output (|> inputs
@@ -38,22 +38,22 @@
(:: /.functor map inc)
/.consume)]
(_.assert "Functor goes over every element in a channel."
- (list;= (list;map inc inputs)
+ (list@= (list@map inc inputs)
output))))
(wrap (do promise.monad
[output (/.consume (:: /.apply apply
(/.sequential 0 (list inc))
(/.sequential 0 (list sample))))]
(_.assert "Apply works over all channel values."
- (list;= (list (inc sample))
+ (list@= (list (inc sample))
output))))
(wrap (do promise.monad
[output (/.consume
(do /.monad
- [f (/.from-promise (promise;wrap inc))
- a (/.from-promise (promise;wrap sample))]
+ [f (/.from-promise (promise@wrap inc))
+ a (/.from-promise (promise@wrap sample))]
(wrap (f a))))]
(_.assert "Valid monad."
- (list;= (list (inc sample))
+ (list@= (list (inc sample))
output))))
))))