aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEduardo Julian2017-12-12 08:28:55 -0400
committerEduardo Julian2017-12-12 08:28:55 -0400
commitea32f6d5500b5affa7f5b11cdc05b48ad4fe7a46 (patch)
treec1f8a884fe7f3b0b3d5acc29389f2003a371a394
parent859c7485cd0e9ebe8d456ed58238bdec849bd6e1 (diff)
- FRP channels are no longer asynchronously-built lists, but rather mediums for spreading data, that can be chained together to form networks for data distribution.
-rw-r--r--stdlib/source/lux/concurrency/frp.lux375
-rw-r--r--stdlib/source/lux/concurrency/stm.lux43
-rw-r--r--stdlib/source/lux/world/net/tcp.jvm.lux122
-rw-r--r--stdlib/source/lux/world/net/udp.jvm.lux3
-rw-r--r--stdlib/test/test/lux/concurrency/frp.lux224
-rw-r--r--stdlib/test/test/lux/concurrency/stm.lux64
-rw-r--r--stdlib/test/test/lux/world/net/tcp.lux44
-rw-r--r--stdlib/test/test/lux/world/net/udp.lux13
8 files changed, 326 insertions, 562 deletions
diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux
index 533d2a7e5..e7efcbf3d 100644
--- a/stdlib/source/lux/concurrency/frp.lux
+++ b/stdlib/source/lux/concurrency/frp.lux
@@ -1,310 +1,127 @@
(.module:
lux
- (lux (control ["F" functor]
- ["A" applicative]
- ["M" monad #+ do Monad]
- [eq #+ Eq]
- ["p" parser])
- [io #- run]
- (data (coll [list "L/" Monoid<List>])
- text/format)
- [macro]
- (macro ["s" syntax #+ syntax: Syntax]))
- (// ["&" promise]))
+ (lux (control [functor #+ Functor]
+ [applicative #+ Applicative]
+ [monad #+ do Monad])
+ [io #+ IO io]
+ (data (coll [list "list/" Monoid<List>]))
+ (type abstract))
+ (// [atom #+ Atom atom]
+ [promise #+ Promise]))
## [Types]
-(type: #export (Channel a)
- {#.doc "An asynchronous channel of values which may be closed.
-
- Reading from a channel does not remove the read piece of data, as it can still be accessed if you have an earlier node of the channel."}
- (&.Promise (Maybe [a (Channel a)])))
-
-## [Syntax]
-(syntax: #export (channel)
- {#.doc (doc "Makes an uninitialized Channel."
- (channel))}
- (wrap (list (` (&.promise #.None)))))
+(abstract: #export (Channel a)
+ {#.doc "An asynchronous channel to distribute values."}
+ (Atom (List (-> a (IO Top))))
+
+ (def: #export (channel _)
+ (All [a] (-> Top (Channel a)))
+ (@abstraction (atom (list))))
+
+ (def: #export (listen listener (^@representation channel))
+ (All [a] (-> (-> a (IO Top)) (Channel a) (IO Unit)))
+ (atom.update (|>> (#.Cons listener)) channel))
+
+ (def: #export (publish (^@representation channel) value)
+ {#.doc "Publish to a channel."}
+ (All [a] (-> (Channel a) a (IO Unit)))
+ (do io.Monad<IO>
+ [listeners (atom.read channel)
+ _ (monad.map @ (function [listener] (listener value)) listeners)]
+ (wrap [])))
+ )
## [Values]
-(def: #export (filter p xs)
+(def: #export (filter predicate input)
(All [a] (-> (-> a Bool) (Channel a) (Channel a)))
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap #.None)
- (#.Some [x xs']) (if (p x)
- (wrap (#.Some [x (filter p xs')]))
- (filter p xs')))))
-
-(def: #export (write value target)
- {#.doc "Write to a channel, so long as it's still open."}
- (All [a] (-> a (Channel a) (IO (Maybe (Channel a)))))
- (case (&.poll target)
- (^template [<case> <channel-to-write>]
- <case>
- (do Monad<IO>
- [#let [new-tail (channel)]
- done? (&.resolve (#.Some [value new-tail]) <channel-to-write>)]
- (if done?
- (wrap (#.Some new-tail))
- (write value <channel-to-write>))))
- ([#.None target]
- [(#.Some (#.Some [_ target'])) target'])
-
- _
- (:: Monad<IO> wrap #.None)
- ))
-
-(def: #export (close target)
- (All [a] (-> (Channel a) (IO Bool)))
- (case (&.poll target)
- (^template [<case> <channel-to-write>]
- <case>
- (do Monad<IO>
- [done? (&.resolve #.None <channel-to-write>)]
- (if done?
- (wrap true)
- (close <channel-to-write>))))
- ([#.None target]
- [(#.Some (#.Some [_ target'])) target'])
-
- _
- (:: Monad<IO> wrap false)
- ))
-
-(def: (pipe' input output)
- (All [a] (-> (Channel a) (Channel a) (&.Promise Unit)))
- (do &.Monad<Promise>
- [?x+xs input]
- (case ?x+xs
- #.None (wrap [])
- (#.Some [x input']) (case (io.run (write x output))
- #.None
- (wrap [])
-
- (#.Some output')
- (pipe' input' output')))))
+ (let [output (channel [])]
+ (exec (io.run (listen (function [value]
+ (if (predicate value)
+ (publish output value)
+ (io [])))
+ input))
+ output)))
-(def: #export (pipe input output)
+(def: #export (pipe output input)
{#.doc "Copy/pipe the contents of a channel on to another."}
- (All [a] (-> (Channel a) (Channel a) (&.Promise Unit)))
- (do &.Monad<Promise>
- [_ (pipe' input output)]
- (exec (io.run (close output))
- (wrap []))))
+ (All [a] (-> (Channel a) (Channel a) (IO Unit)))
+ (listen (publish output)
+ input))
-(def: #export (merge xss)
+(def: #export (merge inputs)
{#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."}
- (All [a] (-> (List (Channel a)) (Channel a)))
- (let [output (channel)]
- (exec (do &.Monad<Promise>
- [_ (M.map @ (function [input] (pipe' input output)) xss)]
- (exec (io.run (close output))
- (wrap [])))
+ (All [a] (-> (List (Channel a)) (IO (Channel a))))
+ (let [output (channel [])]
+ (do io.Monad<IO>
+ [_ (monad.map @ (pipe output) inputs)]
+ (wrap output))))
+
+(def: #export (from-promise promise)
+ (All [a] (-> (Promise a) (Channel a)))
+ (let [output (channel [])]
+ (exec (promise.await (publish output) promise)
output)))
-(def: #export (fold f init xs)
- {#.doc "Asynchronous fold over channels."}
- (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (&.Promise a)))
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap init)
- (#.Some [x xs']) (do @
- [init' (f x init)]
- (fold f init' xs')))))
-
-(def: #export (folds f init xs)
- {#.doc "A channel of folds."}
- (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (Channel a)))
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap (#.Some [init (wrap #.None)]))
- (#.Some [x xs']) (do @
- [init' (f x init)]
- (folds f init' xs')))))
-
-(def: (distinct' eq last-one xs)
- (All [a] (-> (Eq a) a (Channel a) (Channel a)))
- (let [(^open) eq]
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap #.None)
- (#.Some [x xs']) (if (= x last-one)
- (distinct' eq last-one xs')
- (wrap (#.Some [x (distinct' eq x xs')])))))))
-
-(def: #export (distinct eq xs)
- {#.doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."}
- (All [a] (-> (Eq a) (Channel a) (Channel a)))
- (let [(^open) eq]
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap #.None)
- (#.Some [x xs']) (wrap (#.Some [x (distinct' eq x xs')]))))))
-
-(def: #export (consume xs)
- {#.doc "Reads the entirety of a channel's contents and returns them as a list."}
- (All [a] (-> (Channel a) (&.Promise (List a))))
- (do &.Monad<Promise>
- [?x+xs' xs]
- (case ?x+xs'
- #.None
- (wrap #.Nil)
-
- (#.Some [x xs'])
- (do @
- [=xs (consume xs')]
- (wrap (#.Cons x =xs))))))
-
-(def: #export (once p)
- (All [a] (-> (&.Promise a) (Channel a)))
- (do &.Monad<Promise>
- [x p]
- (wrap (#.Some [x (wrap #.None)]))))
-
(def: #export (poll time action)
- (All [a] (-> Nat (IO (Maybe a)) (Channel a)))
- (do &.Monad<Promise>
- [?output (&.future action)]
- (case ?output
- #.None
- (wrap #.None)
-
- (#.Some head)
- (do @
- [_ (&.wait time)]
- (wrap (#.Some [head (poll time action)]))))))
-
-(def: #export (periodic time value)
- (All [a] (-> Nat a (Channel a)))
- (do &.Monad<Promise>
- []
- (wrap (#.Some [value (do @
- [_ (&.wait time)]
- (periodic time value))]))))
-
-(def: #export (sequential time xs)
- (All [a] (-> Nat (List a) (Channel a)))
- (do &.Monad<Promise>
- []
- (case xs
- #.Nil
- (wrap #.None)
-
- (#.Cons x xs')
- (wrap (#.Some [x (do @
- [_ (&.wait time)]
- (sequential time xs'))])))))
-
-(def: #export (cycle time values)
- (All [a] (-> Nat (List a) (Channel a)))
- (do &.Monad<Promise>
- []
- (case values
- #.Nil
- (wrap #.None)
-
- _
- (loop [xs values]
- (case xs
- #.Nil
- (recur values)
-
- (#.Cons x xs')
- (wrap (#.Some [x (do @
- [_ (&.wait time)]
- (recur xs'))])))))))
-
-## Utils
-(def: (tail xs)
- (All [a] (-> (List a) (List a)))
- (case xs
- #.Nil
- #.Nil
-
- (#.Cons _ xs')
- xs'))
+ (All [a] (-> Nat (IO a) (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [_ []]
+ (do io.Monad<IO>
+ [value action
+ _ (publish output value)]
+ (wrap (promise.await recur (promise.wait time))))))
+ output)))
-(def: #export (sliding-window max inputs)
- (All [a] (-> Nat (Channel a) (Channel (List a))))
- (let [(^open) &.Monad<Promise>]
- (folds (function [input window]
- (let [window' (L/compose window (list input))]
- (wrap (if (n/<= max (list.size window'))
- window'
- (tail window')))))
- (list)
- inputs)))
+(def: #export (periodic time)
+ (All [a] (-> Nat (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [_ []]
+ (do io.Monad<IO>
+ [_ (publish output [])]
+ (wrap (promise.await recur (promise.wait time))))))
+ output)))
(def: #export (iterate f init)
- (All [a] (-> (-> a (&.Promise (Maybe a))) a (Channel a)))
- (do &.Monad<Promise>
- []
- (wrap (#.Some [init (do @
- [?next (f init)]
- (case ?next
- #.None
- (wrap #.None)
-
- (#.Some init')
- (iterate f init')))]))))
-
-(def: #export (sample time inputs)
- (All [a] (-> Nat (Channel a) (Channel a)))
- (do &.Monad<Promise>
- [?h+t inputs]
- (case ?h+t
- #.None
- (wrap #.None)
-
- (#.Some [value inputs'])
- (do @
- [_ (&.wait time)
- #let [next-inputs (loop [last-resolved-node inputs']
- (case (&.poll last-resolved-node)
- (^multi (#.Some (#.Some [_ next-node]))
- (&.resolved? next-node))
- (recur next-node)
-
- _
- last-resolved-node))]]
- (wrap (#.Some [value (sample time next-inputs)]))))))
+ (All [a] (-> (-> a (Promise a)) a (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [zero init]
+ (do io.Monad<IO>
+ [_ (publish output zero)]
+ (wrap (promise.await recur (f zero))))))
+ output)))
## [Structures]
-(struct: #export _ (F.Functor Channel)
- (def: (map f xs)
- (:: &.Functor<Promise> map
- (function [?x+xs]
- (case ?x+xs
- #.None #.None
- (#.Some [x xs']) (#.Some [(f x) (map f xs')])))
- xs)))
+(struct: #export _ (Functor Channel)
+ (def: (map f input)
+ (let [output (channel [])]
+ (exec (io.run (listen (|>> f (publish output))
+ input))
+ output))))
-(struct: #export _ (A.Applicative Channel)
+(struct: #export _ (Applicative Channel)
(def: functor Functor<Channel>)
(def: (wrap a)
- (let [(^open) &.Monad<Promise>]
- (wrap (#.Some [a (wrap #.None)]))))
+ (let [output (channel [])]
+ (exec (io.run (publish output a))
+ output)))
(def: (apply ff fa)
- (let [fb (channel)]
- (exec (let [(^open) Functor<Channel>]
- (map (function [f] (pipe (map f fa) fb))
- ff))
- fb))))
+ (let [output (channel [])]
+ (exec (io.run (listen (function [f]
+ (listen (|>> f (publish output))
+ fa))
+ ff))
+ output))))
(struct: #export _ (Monad Channel)
(def: applicative Applicative<Channel>)
(def: (join mma)
- (let [output (channel)]
- (exec (let [(^open) Functor<Channel>]
- (map (function [ma]
- (pipe ma output))
- mma))
+ (let [output (channel [])]
+ (exec (io.run (listen (listen (publish output))
+ mma))
output))))
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 9b1cabe01..cc609a055 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -6,10 +6,7 @@
[io #+ IO io]
(data [product]
[maybe]
- [number "nat/" Codec<Text,Nat>]
- [text]
- (coll [list "list/" Functor<List> Fold<List>]
- [dict #+ Dict]))
+ (coll [list "list/" Functor<List> Fold<List>]))
(concurrency [atom #+ Atom atom]
[promise #+ Promise promise]
[frp "frp/" Functor<Channel>])
@@ -17,12 +14,12 @@
(abstract: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom [a (Dict Text (-> a (IO Unit)))])
+ (Atom [a (List (-> a (IO Unit)))])
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
- (@abstraction (atom.atom [value (dict.new text.Hash<Text>)])))
+ (@abstraction (atom.atom [value (list)])))
(def: read!!
(All [a] (-> (Var a) a))
@@ -42,43 +39,21 @@
succeeded? (atom.compare-and-swap old [new-value _observers] var)]
(if succeeded?
(do @
- [_ (|> _observers
- dict.values
- (monad.map @ (function [f] (f new-value))))]
+ [_ (monad.map @ (function [f] (f new-value)) _observers)]
(wrap []))
(write! new-value (@abstraction var)))))
(def: #export (follow (^@representation target))
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
(All [a] (-> (Var a) (IO (frp.Channel a))))
- (let [head (: (frp.Channel ($ +0)) (frp.channel))
- ## head (frp.channel)
- channel-var (var head)
- observer (function [label value]
- (case (io.run (|> channel-var read!! (frp.write value)))
- #.None
- ## By closing the output Channel, the
- ## observer becomes obsolete.
- (atom.update (function [[value observers]]
- [value (dict.remove label observers)])
- target)
-
- (#.Some tail')
- (write! tail' channel-var)))]
+ (let [channel (: (frp.Channel ($ +0)) (frp.channel []))
+ ## channel (frp.channel)
+ ]
(do io.Monad<IO>
[_ (atom.update (function [[value observers]]
- (let [label (nat/encode (list/fold (function [key base]
- (case (nat/decode key)
- (#.Left _)
- base
-
- (#.Right key-num)
- (n/max key-num base)))
- +0
- (dict.keys observers)))]
- [value (dict.put label (observer label) observers)]))
+ [value (#.Cons (frp.publish channel) observers)])
target)]
- (wrap head))))
+ (wrap channel))))
)
(type: (Tx-Frame a)
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux
index fcd037578..e551e995d 100644
--- a/stdlib/source/lux/world/net/tcp.jvm.lux
+++ b/stdlib/source/lux/world/net/tcp.jvm.lux
@@ -1,13 +1,13 @@
(.module:
lux
(lux (control monad)
- (concurrency ["P" promise]
- ["T" task]
+ (concurrency [promise #+ Promise promise]
+ [task #+ Task]
[frp])
(data ["e" error])
(type abstract)
(world [blob #+ Blob])
- [io]
+ [io #+ Process]
[host])
[//])
@@ -42,90 +42,74 @@
#out OutputStream}
(def: #export (read data offset length self)
- (let [in (get@ #in (@representation self))]
- (P.future
- (do (e.ErrorT io.Monad<IO>)
- [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)]
- in)]
- (wrap (int-to-nat bytes-read))))))
+ (-> Blob Nat Nat TCP (Task Nat))
+ (promise.future
+ (do io.Monad<Process>
+ [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)]
+ (get@ #in (@representation self)))]
+ (wrap (int-to-nat bytes-read)))))
(def: #export (write data offset length self)
+ (-> Blob Nat Nat TCP (Task Unit))
(let [out (get@ #out (@representation self))]
- (P.future
- (do (e.ErrorT io.Monad<IO>)
+ (promise.future
+ (do io.Monad<Process>
[_ (OutputStream::write [data (nat-to-int offset) (nat-to-int length)]
out)]
(Flushable::flush [] out)))))
(def: #export (close self)
+ (-> TCP (Task Unit))
(let [(^open) (@representation self)]
- (P.future
- (do (e.ErrorT io.Monad<IO>)
+ (promise.future
+ (do io.Monad<Process>
[_ (AutoCloseable::close [] in)
_ (AutoCloseable::close [] out)]
(AutoCloseable::close [] socket)))))
(def: (tcp-client socket)
- (-> Socket (io.IO (e.Error TCP)))
- (do (e.ErrorT io.Monad<IO>)
+ (-> Socket (Process TCP))
+ (do io.Monad<Process>
[input (Socket::getInputStream [] socket)
output (Socket::getOutputStream [] socket)]
(wrap (@abstraction {#socket socket
#in input
#out output}))))
+ )
- (def: #export (client address port)
- (-> //.Address //.Port (T.Task TCP))
- (P.future
- (do (e.ErrorT io.Monad<IO>)
- [socket (Socket::new [address (nat-to-int port)])]
- (tcp-client socket))))
-
- (def: (await-server-release client-channel server)
- (-> (frp.Channel TCP) ServerSocket (P.Promise Unit))
- (do P.Monad<Promise>
- [outcome client-channel]
- (case outcome
- ## Channel has been closed.
- ## Must close associated server.
- #.None
- (P.future
- (do io.Monad<IO>
- [_ (AutoCloseable::close [] server)]
- (wrap [])))
-
- ## A client was generated.
- ## Nothing to be done...
- (#.Some _)
- (wrap []))))
+(def: #export (client address port)
+ (-> //.Address //.Port (Task TCP))
+ (promise.future
+ (do io.Monad<Process>
+ [socket (Socket::new [address (nat-to-int port)])]
+ (tcp-client socket))))
- (def: #export (server port)
- (-> //.Port (T.Task (frp.Channel TCP)))
- (P.future
- (do (e.ErrorT io.Monad<IO>)
- [server (ServerSocket::new [(nat-to-int port)])
- #let [output (: (frp.Channel TCP)
- (frp.channel))
- _ (: (P.Promise Bool)
- (P.future
- (loop [tail output]
- (do io.Monad<IO>
- [?client (do (e.ErrorT io.Monad<IO>)
- [socket (ServerSocket::accept [] server)]
- (tcp-client socket))]
- (case ?client
- (#e.Error error)
- (frp.close tail)
-
- (#e.Success client)
- (do @
- [?tail' (frp.write client tail)]
- (case ?tail'
- #.None
- (wrap true)
-
- (#.Some tail')
- (exec (await-server-release tail' server)
- (recur tail')))))))))]]
- (wrap output))))
- )
+(def: #export (server port)
+ (-> //.Port (Task [(Promise Unit)
+ (frp.Channel TCP)]))
+ (promise.future
+ (do (e.ErrorT io.Monad<IO>)
+ [server (ServerSocket::new [(nat-to-int port)])
+ #let [signal (: (Promise Unit)
+ (promise #.None))
+ _ (promise.await (function [_]
+ (AutoCloseable::close [] server))
+ signal)
+ output (: (frp.Channel TCP)
+ (frp.channel []))
+ _ (: (Promise Unit)
+ (promise.future
+ (loop [_ []]
+ (do io.Monad<IO>
+ [?client (do (e.ErrorT io.Monad<IO>)
+ [socket (ServerSocket::accept [] server)]
+ (tcp-client socket))]
+ (case ?client
+ (#e.Error error)
+ (wrap [])
+
+ (#e.Success client)
+ (do @
+ [_ (frp.publish output client)]
+ (recur [])))))))]]
+ (wrap [signal output]))))
diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux
index caec294cd..c0aa54b77 100644
--- a/stdlib/source/lux/world/net/udp.jvm.lux
+++ b/stdlib/source/lux/world/net/udp.jvm.lux
@@ -3,8 +3,7 @@
(lux (control monad
["ex" exception #+ exception:])
(concurrency ["P" promise]
- ["T" task]
- [frp])
+ ["T" task])
(data ["e" error]
[maybe]
(coll [array]))
diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux
index 717eb0624..b24372781 100644
--- a/stdlib/test/test/lux/concurrency/frp.lux
+++ b/stdlib/test/test/lux/concurrency/frp.lux
@@ -1,124 +1,116 @@
(.module:
lux
(lux [io #+ IO io]
- (control ["M" monad #+ do Monad])
+ (control [monad #+ do Monad])
(data [number]
- text/format)
- (concurrency ["P" promise #+ "P/" Monad<Promise>]
- ["&" frp]))
+ text/format
+ (coll [list]))
+ (concurrency [promise #+ "promise/" Monad<Promise>]
+ [frp #+ Channel]
+ [atom #+ Atom atom]))
lux/test)
-(def: (to-channel values)
- (-> (List Int) (&.Channel Int))
- (let [_channel (: (&.Channel Int) (&.channel))]
- (io.run (do io.Monad<IO>
- [_ (M.map @ (function [value] (&.write value _channel))
- values)
- _ (&.close _channel)]
- (wrap _channel)))))
+(def: (write! values channel)
+ (All [a] (-> (List a) (Channel a) (IO Unit)))
+ (do io.Monad<IO>
+ [_ (monad.map @ (frp.publish channel) values)]
+ (wrap [])))
+
+(def: (read! channel)
+ (All [a] (-> (Channel a) (IO (Atom (List a)))))
+ (do io.Monad<IO>
+ [#let [output (atom (list))]
+ _ (frp.listen (function [value]
+ (atom.update (|>> (#.Cons value)) output))
+ channel)]
+ (wrap output)))
(context: "FRP"
- ($_ seq
- (wrap (do P.Monad<Promise>
- [elems (&.consume (to-channel (list 0 1 2 3 4 5)))]
- (assert "Can consume a channel into a list."
- (case elems
- (^ (list 0 1 2 3 4 5))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (let [input (to-channel (list 0 1 2 3 4 5))
- output (: (&.Channel Int) (&.channel))]
- (exec (&.pipe input output)
- output)))]
- (assert "Can pipe one channel into another."
- (case elems
- (^ (list 0 1 2 3 4 5))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (&.filter i/even? (to-channel (list 0 1 2 3 4 5))))]
- (assert "Can filter a channel's elements."
- (case elems
- (^ (list 0 2 4))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (&.merge (list (to-channel (list 0 1 2 3 4 5))
- (to-channel (list 0 -1 -2 -3 -4 -5)))))]
- (assert "Can merge channels."
- (case elems
- (^ (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [output (&.fold (function [base input] (P/wrap (i/+ input base))) 0 (to-channel (list 0 1 2 3 4 5)))]
- (assert "Can fold over a channel."
- (i/= 15 output))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (&.distinct number.Eq<Int> (to-channel (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))]
- (assert "Can avoid immediate repetition in the channel."
- (case elems
- (^ (list 0 1 2 3 4 5))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (&.once (:: @ wrap 12345)))]
- (assert "Can convert a promise into a single-value channel."
- (case elems
- (^ (list 12345))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (:: &.Functor<Channel> map i/inc (to-channel (list 0 1 2 3 4 5))))]
- (assert "Functor goes over every element in a channel."
- (case elems
- (^ (list 1 2 3 4 5 6))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (let [(^open) &.Applicative<Channel>]
- (apply (wrap i/inc) (wrap 12345))))]
- (assert "Applicative works over all channel values."
- (case elems
- (^ (list 12346))
- true
-
- _
- false))))
-
- (wrap (do P.Monad<Promise>
- [elems (&.consume (do &.Monad<Channel>
- [f (wrap i/inc)
- a (wrap 12345)]
- (wrap (f a))))]
- (assert "Monad works over all channel values."
- (case elems
- (^ (list 12346))
- true
-
- _
- false))))
- ))
+ (let [(^open "list/") (list.Eq<List> number.Eq<Int>)]
+ ($_ seq
+ (wrap (do promise.Monad<Promise>
+ [#let [values (list 0 1 2 3 4 5)]
+ output (promise.future
+ (do io.Monad<IO>
+ [#let [input (: (Channel Int) (frp.channel []))]
+ output (read! input)
+ _ (write! values input)]
+ (wrap output)))
+ _ (promise.wait +100)
+ output (promise.future (atom.read output))]
+ (assert "Can pipe one channel into another."
+ (list/= values
+ (list.reverse output)))))
+
+ (wrap (do promise.Monad<Promise>
+ [output (promise.future
+ (do io.Monad<IO>
+ [#let [input (: (Channel Int) (frp.channel []))
+ elems (frp.filter i/even? input)]
+ output (read! elems)
+ _ (write! (list 0 1 2 3 4 5) input)]
+ (wrap output)))
+ _ (promise.wait +100)
+ output (promise.future (atom.read output))]
+ (assert "Can filter a channel's elements."
+ (list/= (list 0 2 4)
+ (list.reverse output)))))
+
+ (wrap (do promise.Monad<Promise>
+ [output (promise.future
+ (do io.Monad<IO>
+ [#let [left (: (Channel Int) (frp.channel []))
+ right (: (Channel Int) (frp.channel []))]
+ merged (frp.merge (list left right))
+ output (read! merged)
+ _ (write! (list 0 1 2 3 4 5) left)
+ _ (write! (list 0 -1 -2 -3 -4 -5) right)]
+ (wrap output)))
+ _ (promise.wait +100)
+ output (promise.future (atom.read output))]
+ (assert "Can merge channels."
+ (list/= (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5)
+ (list.reverse output)))))
+
+ (wrap (do promise.Monad<Promise>
+ [output (promise.future
+ (do io.Monad<IO>
+ [#let [inputs (: (Channel Int) (frp.channel []))
+ mapped (:: frp.Functor<Channel> map i/inc inputs)]
+ output (read! mapped)
+ _ (write! (list 0 1 2 3 4 5) inputs)]
+ (wrap output)))
+ _ (promise.wait +100)
+ output (promise.future (atom.read output))]
+ (assert "Functor goes over every element in a channel."
+ (list/= (list 1 2 3 4 5 6)
+ (list.reverse output)))))
+
+ (wrap (do promise.Monad<Promise>
+ [output (promise.future
+ (do io.Monad<IO>
+ [#let [>f< (: (Channel (-> Int Int)) (frp.channel []))
+ >a< (: (Channel Int) (frp.channel []))]
+ output (read! (let [(^open) frp.Applicative<Channel>]
+ (apply >f< >a<)))
+ _ (write! (list i/inc) >f<)
+ _ (write! (list 12345) >a<)]
+ (wrap output)))
+ _ (promise.wait +100)
+ output (promise.future (atom.read output))]
+ (assert "Applicative works over all channel values."
+ (list/= (list 12346)
+ (list.reverse output)))))
+
+ (wrap (do promise.Monad<Promise>
+ [output (promise.future
+ (read! (do frp.Monad<Channel>
+ [f (frp.from-promise (promise.delay +100 i/inc))
+ a (frp.from-promise (promise.delay +200 12345))]
+ (frp.from-promise (promise.delay +300 (f a))))))
+ _ (promise.wait +600)
+ output (promise.future (atom.read output))]
+ (assert "Valid monad."
+ (list/= (list 12346)
+ (list.reverse output)))))
+ )))
diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux
index d2e299c50..1ca5482bf 100644
--- a/stdlib/test/test/lux/concurrency/stm.lux
+++ b/stdlib/test/test/lux/concurrency/stm.lux
@@ -1,43 +1,59 @@
(.module:
lux
- (lux [io]
+ (lux [io #+ IO]
(control ["M" monad #+ do Monad])
(data [number]
[maybe]
(coll [list "" Functor<List> "List/" Fold<List>])
text/format)
- (concurrency ["&" stm]
- [promise])
+ (concurrency [atom #+ Atom atom]
+ ["&" stm]
+ [promise]
+ [frp #+ Channel])
["r" math/random])
lux/test)
+(def: (read! channel)
+ (All [a] (-> (Channel a) (IO (Atom (List a)))))
+ (do io.Monad<IO>
+ [#let [output (atom (list))]
+ _ (frp.listen (function [value]
+ (atom.update (|>> (#.Cons value)) output))
+ channel)]
+ (wrap output)))
+
(def: iterations/processes Int 100)
(context: "STM"
($_ seq
(wrap (do promise.Monad<Promise>
+ [output (&.commit (&.read (&.var 0)))]
+ (assert "Can read STM vars."
+ (i/= 0 output))))
+ (wrap (do promise.Monad<Promise>
+ [#let [_var (&.var 0)]
+ output (&.commit (do &.Monad<STM>
+ [_ (&.write 5 _var)]
+ (&.read _var)))]
+ (assert "Can write STM vars."
+ (i/= 5 output))))
+ (wrap (do promise.Monad<Promise>
+ [#let [_var (&.var 5)]
+ output (&.commit (do &.Monad<STM>
+ [_ (&.update (i/* 3) _var)]
+ (&.read _var)))]
+ (assert "Can update STM vars."
+ (i/= 15 output))))
+ (wrap (do promise.Monad<Promise>
[#let [_var (&.var 0)
- changes (io.run (&.follow _var))]
- output1 (&.commit (&.read _var))
- output2 (&.commit (do &.Monad<STM>
- [_ (&.write 5 _var)]
- (&.read _var)))
- output3 (&.commit (do &.Monad<STM>
- [temp (&.read _var)
- _ (&.update (i/* 3) _var)]
- (&.read _var)))
- ?c1+changes' changes
- #let [[c1 changes'] (maybe.default [-1 changes] ?c1+changes')]
- ?c2+changes' changes'
- #let [[c2 changes'] (maybe.default [-1 changes] ?c2+changes')]]
- (assert "Can read STM vars.
- Can write STM vars.
- Can update STM vars.
- Can follow all the changes to STM vars."
- (and (i/= 0 output1)
- (i/= 5 output2)
- (i/= 15 output3)
- (and (i/= 5 c1) (i/= 15 c2))))))
+ changes (io.run (read! (io.run (&.follow _var))))]
+ _ (&.commit (&.write 5 _var))
+ _ (&.commit (&.update (i/* 3) _var))
+ changes (promise.future (atom.read changes))]
+ (assert "Can follow all the changes to STM vars."
+ (:: (list.Eq<List> number.Eq<Int>) =
+ (list 5 15)
+ (list.reverse changes)))))
(wrap (let [_concurrency-var (&.var 0)]
(do promise.Monad<Promise>
[_ (M.seq @
diff --git a/stdlib/test/test/lux/world/net/tcp.lux b/stdlib/test/test/lux/world/net/tcp.lux
index 785b1a66b..8d40897d7 100644
--- a/stdlib/test/test/lux/world/net/tcp.lux
+++ b/stdlib/test/test/lux/world/net/tcp.lux
@@ -5,7 +5,7 @@
["ex" exception #+ exception:])
(concurrency ["P" promise]
["T" task]
- [frp])
+ [frp "frp/" Functor<Channel>])
(data ["E" error]
[text]
text/format)
@@ -24,45 +24,39 @@
(|>> (n/% +1000)
(n/+ +8000)))))
-(exception: Empty-Channel)
-
-(def: (head channel)
- (All [a] (-> (frp.Channel a) (T.Task a)))
- (do P.Monad<Promise>
- [head+tail channel]
- (case head+tail
- (#.Some [head tail])
- (wrap (ex.return head))
-
- #.None
- (wrap (ex.throw Empty-Channel "")))))
-
(context: "TCP networking."
(do @
[port ..port
size (|> r.nat (:: @ map (|>> (n/% +100) (n/max +10))))
from (_blob.blob size)
to (_blob.blob size)
- #let [temp (blob.create size)]]
+ #let [temp-from (blob.create size)
+ temp-to (blob.create size)]]
($_ seq
(wrap (do P.Monad<Promise>
[result (do T.Monad<Task>
- [server (@.server port)
+ [[server-close server] (@.server port)
+ #let [from-worked? (: (T.Task Bool)
+ (P.promise #.Nil))
+ _ (frp/map (function [socket]
+ (do @
+ [bytes-from (@.read temp-from +0 size socket)
+ #let [_ (io.run (P.resolve (#E.Success (and (n/= size bytes-from)
+ (:: blob.Eq<Blob> = from temp-from)))
+ from-worked?))]]
+ (@.write to +0 size socket)))
+ server)]
+
client (@.client localhost port)
- ####################
_ (@.write from +0 size client)
- socket (head server)
- bytes-from (@.read temp +0 size socket)
- #let [from-worked? (and (n/= size bytes-from)
- (:: blob.Eq<Blob> = from temp))]
+ from-worked? from-worked?
####################
- _ (@.write to +0 size socket)
- bytes-to (@.read temp +0 size client)
+ bytes-to (@.read temp-to +0 size client)
#let [to-worked? (and (n/= size bytes-to)
- (:: blob.Eq<Blob> = to temp))]
+ (:: blob.Eq<Blob> = to temp-to))]
####################
_ (@.close client)
- _ (T.from-promise (P.future (frp.close server)))]
+ _ (T.from-promise (P.future (P.resolve [] server-close)))]
(wrap (and from-worked?
to-worked?)))]
(assert "Can communicate between client and server."
diff --git a/stdlib/test/test/lux/world/net/udp.lux b/stdlib/test/test/lux/world/net/udp.lux
index aa600e0b5..4cb268a4f 100644
--- a/stdlib/test/test/lux/world/net/udp.lux
+++ b/stdlib/test/test/lux/world/net/udp.lux
@@ -24,19 +24,6 @@
(|>> (n/% +1000)
(n/+ +8000)))))
-(exception: Empty-Channel)
-
-(def: (head channel)
- (All [a] (-> (frp.Channel a) (T.Task a)))
- (do P.Monad<Promise>
- [head+tail channel]
- (case head+tail
- (#.Some [head tail])
- (wrap (ex.return head))
-
- #.None
- (wrap (ex.throw Empty-Channel "")))))
-
(context: "UDP networking."
(do @
[port ..port