From ea32f6d5500b5affa7f5b11cdc05b48ad4fe7a46 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 12 Dec 2017 08:28:55 -0400 Subject: - FRP channels are no longer asynchronously-built lists, but rather mediums for spreading data, that can be chained together to form networks for data distribution. --- stdlib/source/lux/concurrency/frp.lux | 375 ++++++++----------------------- stdlib/source/lux/concurrency/stm.lux | 43 +--- stdlib/source/lux/world/net/tcp.jvm.lux | 122 +++++----- stdlib/source/lux/world/net/udp.jvm.lux | 3 +- stdlib/test/test/lux/concurrency/frp.lux | 224 +++++++++--------- stdlib/test/test/lux/concurrency/stm.lux | 64 ++++-- stdlib/test/test/lux/world/net/tcp.lux | 44 ++-- stdlib/test/test/lux/world/net/udp.lux | 13 -- 8 files changed, 326 insertions(+), 562 deletions(-) diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux index 533d2a7e5..e7efcbf3d 100644 --- a/stdlib/source/lux/concurrency/frp.lux +++ b/stdlib/source/lux/concurrency/frp.lux @@ -1,310 +1,127 @@ (.module: lux - (lux (control ["F" functor] - ["A" applicative] - ["M" monad #+ do Monad] - [eq #+ Eq] - ["p" parser]) - [io #- run] - (data (coll [list "L/" Monoid]) - text/format) - [macro] - (macro ["s" syntax #+ syntax: Syntax])) - (// ["&" promise])) + (lux (control [functor #+ Functor] + [applicative #+ Applicative] + [monad #+ do Monad]) + [io #+ IO io] + (data (coll [list "list/" Monoid])) + (type abstract)) + (// [atom #+ Atom atom] + [promise #+ Promise])) ## [Types] -(type: #export (Channel a) - {#.doc "An asynchronous channel of values which may be closed. - - Reading from a channel does not remove the read piece of data, as it can still be accessed if you have an earlier node of the channel."} - (&.Promise (Maybe [a (Channel a)]))) - -## [Syntax] -(syntax: #export (channel) - {#.doc (doc "Makes an uninitialized Channel." - (channel))} - (wrap (list (` (&.promise #.None))))) +(abstract: #export (Channel a) + {#.doc "An asynchronous channel to distribute values."} + (Atom (List (-> a (IO Top)))) + + (def: #export (channel _) + (All [a] (-> Top (Channel a))) + (@abstraction (atom (list)))) + + (def: #export (listen listener (^@representation channel)) + (All [a] (-> (-> a (IO Top)) (Channel a) (IO Unit))) + (atom.update (|>> (#.Cons listener)) channel)) + + (def: #export (publish (^@representation channel) value) + {#.doc "Publish to a channel."} + (All [a] (-> (Channel a) a (IO Unit))) + (do io.Monad + [listeners (atom.read channel) + _ (monad.map @ (function [listener] (listener value)) listeners)] + (wrap []))) + ) ## [Values] -(def: #export (filter p xs) +(def: #export (filter predicate input) (All [a] (-> (-> a Bool) (Channel a) (Channel a))) - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap #.None) - (#.Some [x xs']) (if (p x) - (wrap (#.Some [x (filter p xs')])) - (filter p xs'))))) - -(def: #export (write value target) - {#.doc "Write to a channel, so long as it's still open."} - (All [a] (-> a (Channel a) (IO (Maybe (Channel a))))) - (case (&.poll target) - (^template [ ] - - (do Monad - [#let [new-tail (channel)] - done? (&.resolve (#.Some [value new-tail]) )] - (if done? - (wrap (#.Some new-tail)) - (write value )))) - ([#.None target] - [(#.Some (#.Some [_ target'])) target']) - - _ - (:: Monad wrap #.None) - )) - -(def: #export (close target) - (All [a] (-> (Channel a) (IO Bool))) - (case (&.poll target) - (^template [ ] - - (do Monad - [done? (&.resolve #.None )] - (if done? - (wrap true) - (close )))) - ([#.None target] - [(#.Some (#.Some [_ target'])) target']) - - _ - (:: Monad wrap false) - )) - -(def: (pipe' input output) - (All [a] (-> (Channel a) (Channel a) (&.Promise Unit))) - (do &.Monad - [?x+xs input] - (case ?x+xs - #.None (wrap []) - (#.Some [x input']) (case (io.run (write x output)) - #.None - (wrap []) - - (#.Some output') - (pipe' input' output'))))) + (let [output (channel [])] + (exec (io.run (listen (function [value] + (if (predicate value) + (publish output value) + (io []))) + input)) + output))) -(def: #export (pipe input output) +(def: #export (pipe output input) {#.doc "Copy/pipe the contents of a channel on to another."} - (All [a] (-> (Channel a) (Channel a) (&.Promise Unit))) - (do &.Monad - [_ (pipe' input output)] - (exec (io.run (close output)) - (wrap [])))) + (All [a] (-> (Channel a) (Channel a) (IO Unit))) + (listen (publish output) + input)) -(def: #export (merge xss) +(def: #export (merge inputs) {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} - (All [a] (-> (List (Channel a)) (Channel a))) - (let [output (channel)] - (exec (do &.Monad - [_ (M.map @ (function [input] (pipe' input output)) xss)] - (exec (io.run (close output)) - (wrap []))) + (All [a] (-> (List (Channel a)) (IO (Channel a)))) + (let [output (channel [])] + (do io.Monad + [_ (monad.map @ (pipe output) inputs)] + (wrap output)))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Channel a))) + (let [output (channel [])] + (exec (promise.await (publish output) promise) output))) -(def: #export (fold f init xs) - {#.doc "Asynchronous fold over channels."} - (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (&.Promise a))) - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap init) - (#.Some [x xs']) (do @ - [init' (f x init)] - (fold f init' xs'))))) - -(def: #export (folds f init xs) - {#.doc "A channel of folds."} - (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (Channel a))) - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap (#.Some [init (wrap #.None)])) - (#.Some [x xs']) (do @ - [init' (f x init)] - (folds f init' xs'))))) - -(def: (distinct' eq last-one xs) - (All [a] (-> (Eq a) a (Channel a) (Channel a))) - (let [(^open) eq] - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap #.None) - (#.Some [x xs']) (if (= x last-one) - (distinct' eq last-one xs') - (wrap (#.Some [x (distinct' eq x xs')]))))))) - -(def: #export (distinct eq xs) - {#.doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."} - (All [a] (-> (Eq a) (Channel a) (Channel a))) - (let [(^open) eq] - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap #.None) - (#.Some [x xs']) (wrap (#.Some [x (distinct' eq x xs')])))))) - -(def: #export (consume xs) - {#.doc "Reads the entirety of a channel's contents and returns them as a list."} - (All [a] (-> (Channel a) (&.Promise (List a)))) - (do &.Monad - [?x+xs' xs] - (case ?x+xs' - #.None - (wrap #.Nil) - - (#.Some [x xs']) - (do @ - [=xs (consume xs')] - (wrap (#.Cons x =xs)))))) - -(def: #export (once p) - (All [a] (-> (&.Promise a) (Channel a))) - (do &.Monad - [x p] - (wrap (#.Some [x (wrap #.None)])))) - (def: #export (poll time action) - (All [a] (-> Nat (IO (Maybe a)) (Channel a))) - (do &.Monad - [?output (&.future action)] - (case ?output - #.None - (wrap #.None) - - (#.Some head) - (do @ - [_ (&.wait time)] - (wrap (#.Some [head (poll time action)])))))) - -(def: #export (periodic time value) - (All [a] (-> Nat a (Channel a))) - (do &.Monad - [] - (wrap (#.Some [value (do @ - [_ (&.wait time)] - (periodic time value))])))) - -(def: #export (sequential time xs) - (All [a] (-> Nat (List a) (Channel a))) - (do &.Monad - [] - (case xs - #.Nil - (wrap #.None) - - (#.Cons x xs') - (wrap (#.Some [x (do @ - [_ (&.wait time)] - (sequential time xs'))]))))) - -(def: #export (cycle time values) - (All [a] (-> Nat (List a) (Channel a))) - (do &.Monad - [] - (case values - #.Nil - (wrap #.None) - - _ - (loop [xs values] - (case xs - #.Nil - (recur values) - - (#.Cons x xs') - (wrap (#.Some [x (do @ - [_ (&.wait time)] - (recur xs'))]))))))) - -## Utils -(def: (tail xs) - (All [a] (-> (List a) (List a))) - (case xs - #.Nil - #.Nil - - (#.Cons _ xs') - xs')) + (All [a] (-> Nat (IO a) (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad + [value action + _ (publish output value)] + (wrap (promise.await recur (promise.wait time)))))) + output))) -(def: #export (sliding-window max inputs) - (All [a] (-> Nat (Channel a) (Channel (List a)))) - (let [(^open) &.Monad] - (folds (function [input window] - (let [window' (L/compose window (list input))] - (wrap (if (n/<= max (list.size window')) - window' - (tail window'))))) - (list) - inputs))) +(def: #export (periodic time) + (All [a] (-> Nat (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad + [_ (publish output [])] + (wrap (promise.await recur (promise.wait time)))))) + output))) (def: #export (iterate f init) - (All [a] (-> (-> a (&.Promise (Maybe a))) a (Channel a))) - (do &.Monad - [] - (wrap (#.Some [init (do @ - [?next (f init)] - (case ?next - #.None - (wrap #.None) - - (#.Some init') - (iterate f init')))])))) - -(def: #export (sample time inputs) - (All [a] (-> Nat (Channel a) (Channel a))) - (do &.Monad - [?h+t inputs] - (case ?h+t - #.None - (wrap #.None) - - (#.Some [value inputs']) - (do @ - [_ (&.wait time) - #let [next-inputs (loop [last-resolved-node inputs'] - (case (&.poll last-resolved-node) - (^multi (#.Some (#.Some [_ next-node])) - (&.resolved? next-node)) - (recur next-node) - - _ - last-resolved-node))]] - (wrap (#.Some [value (sample time next-inputs)])))))) + (All [a] (-> (-> a (Promise a)) a (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [zero init] + (do io.Monad + [_ (publish output zero)] + (wrap (promise.await recur (f zero)))))) + output))) ## [Structures] -(struct: #export _ (F.Functor Channel) - (def: (map f xs) - (:: &.Functor map - (function [?x+xs] - (case ?x+xs - #.None #.None - (#.Some [x xs']) (#.Some [(f x) (map f xs')]))) - xs))) +(struct: #export _ (Functor Channel) + (def: (map f input) + (let [output (channel [])] + (exec (io.run (listen (|>> f (publish output)) + input)) + output)))) -(struct: #export _ (A.Applicative Channel) +(struct: #export _ (Applicative Channel) (def: functor Functor) (def: (wrap a) - (let [(^open) &.Monad] - (wrap (#.Some [a (wrap #.None)])))) + (let [output (channel [])] + (exec (io.run (publish output a)) + output))) (def: (apply ff fa) - (let [fb (channel)] - (exec (let [(^open) Functor] - (map (function [f] (pipe (map f fa) fb)) - ff)) - fb)))) + (let [output (channel [])] + (exec (io.run (listen (function [f] + (listen (|>> f (publish output)) + fa)) + ff)) + output)))) (struct: #export _ (Monad Channel) (def: applicative Applicative) (def: (join mma) - (let [output (channel)] - (exec (let [(^open) Functor] - (map (function [ma] - (pipe ma output)) - mma)) + (let [output (channel [])] + (exec (io.run (listen (listen (publish output)) + mma)) output)))) diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 9b1cabe01..cc609a055 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -6,10 +6,7 @@ [io #+ IO io] (data [product] [maybe] - [number "nat/" Codec] - [text] - (coll [list "list/" Functor Fold] - [dict #+ Dict])) + (coll [list "list/" Functor Fold])) (concurrency [atom #+ Atom atom] [promise #+ Promise promise] [frp "frp/" Functor]) @@ -17,12 +14,12 @@ (abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom [a (Dict Text (-> a (IO Unit)))]) + (Atom [a (List (-> a (IO Unit)))]) (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) - (@abstraction (atom.atom [value (dict.new text.Hash)]))) + (@abstraction (atom.atom [value (list)]))) (def: read!! (All [a] (-> (Var a) a)) @@ -42,43 +39,21 @@ succeeded? (atom.compare-and-swap old [new-value _observers] var)] (if succeeded? (do @ - [_ (|> _observers - dict.values - (monad.map @ (function [f] (f new-value))))] + [_ (monad.map @ (function [f] (f new-value)) _observers)] (wrap [])) (write! new-value (@abstraction var))))) (def: #export (follow (^@representation target)) {#.doc "Creates a channel that will receive all changes to the value of the given var."} (All [a] (-> (Var a) (IO (frp.Channel a)))) - (let [head (: (frp.Channel ($ +0)) (frp.channel)) - ## head (frp.channel) - channel-var (var head) - observer (function [label value] - (case (io.run (|> channel-var read!! (frp.write value))) - #.None - ## By closing the output Channel, the - ## observer becomes obsolete. - (atom.update (function [[value observers]] - [value (dict.remove label observers)]) - target) - - (#.Some tail') - (write! tail' channel-var)))] + (let [channel (: (frp.Channel ($ +0)) (frp.channel [])) + ## channel (frp.channel) + ] (do io.Monad [_ (atom.update (function [[value observers]] - (let [label (nat/encode (list/fold (function [key base] - (case (nat/decode key) - (#.Left _) - base - - (#.Right key-num) - (n/max key-num base))) - +0 - (dict.keys observers)))] - [value (dict.put label (observer label) observers)])) + [value (#.Cons (frp.publish channel) observers)]) target)] - (wrap head)))) + (wrap channel)))) ) (type: (Tx-Frame a) diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux index fcd037578..e551e995d 100644 --- a/stdlib/source/lux/world/net/tcp.jvm.lux +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -1,13 +1,13 @@ (.module: lux (lux (control monad) - (concurrency ["P" promise] - ["T" task] + (concurrency [promise #+ Promise promise] + [task #+ Task] [frp]) (data ["e" error]) (type abstract) (world [blob #+ Blob]) - [io] + [io #+ Process] [host]) [//]) @@ -42,90 +42,74 @@ #out OutputStream} (def: #export (read data offset length self) - (let [in (get@ #in (@representation self))] - (P.future - (do (e.ErrorT io.Monad) - [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)] - in)] - (wrap (int-to-nat bytes-read)))))) + (-> Blob Nat Nat TCP (Task Nat)) + (promise.future + (do io.Monad + [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)] + (get@ #in (@representation self)))] + (wrap (int-to-nat bytes-read))))) (def: #export (write data offset length self) + (-> Blob Nat Nat TCP (Task Unit)) (let [out (get@ #out (@representation self))] - (P.future - (do (e.ErrorT io.Monad) + (promise.future + (do io.Monad [_ (OutputStream::write [data (nat-to-int offset) (nat-to-int length)] out)] (Flushable::flush [] out))))) (def: #export (close self) + (-> TCP (Task Unit)) (let [(^open) (@representation self)] - (P.future - (do (e.ErrorT io.Monad) + (promise.future + (do io.Monad [_ (AutoCloseable::close [] in) _ (AutoCloseable::close [] out)] (AutoCloseable::close [] socket))))) (def: (tcp-client socket) - (-> Socket (io.IO (e.Error TCP))) - (do (e.ErrorT io.Monad) + (-> Socket (Process TCP)) + (do io.Monad [input (Socket::getInputStream [] socket) output (Socket::getOutputStream [] socket)] (wrap (@abstraction {#socket socket #in input #out output})))) + ) - (def: #export (client address port) - (-> //.Address //.Port (T.Task TCP)) - (P.future - (do (e.ErrorT io.Monad) - [socket (Socket::new [address (nat-to-int port)])] - (tcp-client socket)))) - - (def: (await-server-release client-channel server) - (-> (frp.Channel TCP) ServerSocket (P.Promise Unit)) - (do P.Monad - [outcome client-channel] - (case outcome - ## Channel has been closed. - ## Must close associated server. - #.None - (P.future - (do io.Monad - [_ (AutoCloseable::close [] server)] - (wrap []))) - - ## A client was generated. - ## Nothing to be done... - (#.Some _) - (wrap [])))) +(def: #export (client address port) + (-> //.Address //.Port (Task TCP)) + (promise.future + (do io.Monad + [socket (Socket::new [address (nat-to-int port)])] + (tcp-client socket)))) - (def: #export (server port) - (-> //.Port (T.Task (frp.Channel TCP))) - (P.future - (do (e.ErrorT io.Monad) - [server (ServerSocket::new [(nat-to-int port)]) - #let [output (: (frp.Channel TCP) - (frp.channel)) - _ (: (P.Promise Bool) - (P.future - (loop [tail output] - (do io.Monad - [?client (do (e.ErrorT io.Monad) - [socket (ServerSocket::accept [] server)] - (tcp-client socket))] - (case ?client - (#e.Error error) - (frp.close tail) - - (#e.Success client) - (do @ - [?tail' (frp.write client tail)] - (case ?tail' - #.None - (wrap true) - - (#.Some tail') - (exec (await-server-release tail' server) - (recur tail')))))))))]] - (wrap output)))) - ) +(def: #export (server port) + (-> //.Port (Task [(Promise Unit) + (frp.Channel TCP)])) + (promise.future + (do (e.ErrorT io.Monad) + [server (ServerSocket::new [(nat-to-int port)]) + #let [signal (: (Promise Unit) + (promise #.None)) + _ (promise.await (function [_] + (AutoCloseable::close [] server)) + signal) + output (: (frp.Channel TCP) + (frp.channel [])) + _ (: (Promise Unit) + (promise.future + (loop [_ []] + (do io.Monad + [?client (do (e.ErrorT io.Monad) + [socket (ServerSocket::accept [] server)] + (tcp-client socket))] + (case ?client + (#e.Error error) + (wrap []) + + (#e.Success client) + (do @ + [_ (frp.publish output client)] + (recur [])))))))]] + (wrap [signal output])))) diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux index caec294cd..c0aa54b77 100644 --- a/stdlib/source/lux/world/net/udp.jvm.lux +++ b/stdlib/source/lux/world/net/udp.jvm.lux @@ -3,8 +3,7 @@ (lux (control monad ["ex" exception #+ exception:]) (concurrency ["P" promise] - ["T" task] - [frp]) + ["T" task]) (data ["e" error] [maybe] (coll [array])) diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux index 717eb0624..b24372781 100644 --- a/stdlib/test/test/lux/concurrency/frp.lux +++ b/stdlib/test/test/lux/concurrency/frp.lux @@ -1,124 +1,116 @@ (.module: lux (lux [io #+ IO io] - (control ["M" monad #+ do Monad]) + (control [monad #+ do Monad]) (data [number] - text/format) - (concurrency ["P" promise #+ "P/" Monad] - ["&" frp])) + text/format + (coll [list])) + (concurrency [promise #+ "promise/" Monad] + [frp #+ Channel] + [atom #+ Atom atom])) lux/test) -(def: (to-channel values) - (-> (List Int) (&.Channel Int)) - (let [_channel (: (&.Channel Int) (&.channel))] - (io.run (do io.Monad - [_ (M.map @ (function [value] (&.write value _channel)) - values) - _ (&.close _channel)] - (wrap _channel))))) +(def: (write! values channel) + (All [a] (-> (List a) (Channel a) (IO Unit))) + (do io.Monad + [_ (monad.map @ (frp.publish channel) values)] + (wrap []))) + +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function [value] + (atom.update (|>> (#.Cons value)) output)) + channel)] + (wrap output))) (context: "FRP" - ($_ seq - (wrap (do P.Monad - [elems (&.consume (to-channel (list 0 1 2 3 4 5)))] - (assert "Can consume a channel into a list." - (case elems - (^ (list 0 1 2 3 4 5)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (let [input (to-channel (list 0 1 2 3 4 5)) - output (: (&.Channel Int) (&.channel))] - (exec (&.pipe input output) - output)))] - (assert "Can pipe one channel into another." - (case elems - (^ (list 0 1 2 3 4 5)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (&.filter i/even? (to-channel (list 0 1 2 3 4 5))))] - (assert "Can filter a channel's elements." - (case elems - (^ (list 0 2 4)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (&.merge (list (to-channel (list 0 1 2 3 4 5)) - (to-channel (list 0 -1 -2 -3 -4 -5)))))] - (assert "Can merge channels." - (case elems - (^ (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5)) - true - - _ - false)))) - - (wrap (do P.Monad - [output (&.fold (function [base input] (P/wrap (i/+ input base))) 0 (to-channel (list 0 1 2 3 4 5)))] - (assert "Can fold over a channel." - (i/= 15 output)))) - - (wrap (do P.Monad - [elems (&.consume (&.distinct number.Eq (to-channel (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))] - (assert "Can avoid immediate repetition in the channel." - (case elems - (^ (list 0 1 2 3 4 5)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (&.once (:: @ wrap 12345)))] - (assert "Can convert a promise into a single-value channel." - (case elems - (^ (list 12345)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (:: &.Functor map i/inc (to-channel (list 0 1 2 3 4 5))))] - (assert "Functor goes over every element in a channel." - (case elems - (^ (list 1 2 3 4 5 6)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (let [(^open) &.Applicative] - (apply (wrap i/inc) (wrap 12345))))] - (assert "Applicative works over all channel values." - (case elems - (^ (list 12346)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (do &.Monad - [f (wrap i/inc) - a (wrap 12345)] - (wrap (f a))))] - (assert "Monad works over all channel values." - (case elems - (^ (list 12346)) - true - - _ - false)))) - )) + (let [(^open "list/") (list.Eq number.Eq)] + ($_ seq + (wrap (do promise.Monad + [#let [values (list 0 1 2 3 4 5)] + output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel []))] + output (read! input) + _ (write! values input)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Can pipe one channel into another." + (list/= values + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel [])) + elems (frp.filter i/even? input)] + output (read! elems) + _ (write! (list 0 1 2 3 4 5) input)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Can filter a channel's elements." + (list/= (list 0 2 4) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [left (: (Channel Int) (frp.channel [])) + right (: (Channel Int) (frp.channel []))] + merged (frp.merge (list left right)) + output (read! merged) + _ (write! (list 0 1 2 3 4 5) left) + _ (write! (list 0 -1 -2 -3 -4 -5) right)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Can merge channels." + (list/= (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [inputs (: (Channel Int) (frp.channel [])) + mapped (:: frp.Functor map i/inc inputs)] + output (read! mapped) + _ (write! (list 0 1 2 3 4 5) inputs)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Functor goes over every element in a channel." + (list/= (list 1 2 3 4 5 6) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) + >a< (: (Channel Int) (frp.channel []))] + output (read! (let [(^open) frp.Applicative] + (apply >f< >a<))) + _ (write! (list i/inc) >f<) + _ (write! (list 12345) >a<)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Applicative works over all channel values." + (list/= (list 12346) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (read! (do frp.Monad + [f (frp.from-promise (promise.delay +100 i/inc)) + a (frp.from-promise (promise.delay +200 12345))] + (frp.from-promise (promise.delay +300 (f a)))))) + _ (promise.wait +600) + output (promise.future (atom.read output))] + (assert "Valid monad." + (list/= (list 12346) + (list.reverse output))))) + ))) diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux index d2e299c50..1ca5482bf 100644 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ b/stdlib/test/test/lux/concurrency/stm.lux @@ -1,43 +1,59 @@ (.module: lux - (lux [io] + (lux [io #+ IO] (control ["M" monad #+ do Monad]) (data [number] [maybe] (coll [list "" Functor "List/" Fold]) text/format) - (concurrency ["&" stm] - [promise]) + (concurrency [atom #+ Atom atom] + ["&" stm] + [promise] + [frp #+ Channel]) ["r" math/random]) lux/test) +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function [value] + (atom.update (|>> (#.Cons value)) output)) + channel)] + (wrap output))) + (def: iterations/processes Int 100) (context: "STM" ($_ seq + (wrap (do promise.Monad + [output (&.commit (&.read (&.var 0)))] + (assert "Can read STM vars." + (i/= 0 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 0)] + output (&.commit (do &.Monad + [_ (&.write 5 _var)] + (&.read _var)))] + (assert "Can write STM vars." + (i/= 5 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 5)] + output (&.commit (do &.Monad + [_ (&.update (i/* 3) _var)] + (&.read _var)))] + (assert "Can update STM vars." + (i/= 15 output)))) (wrap (do promise.Monad [#let [_var (&.var 0) - changes (io.run (&.follow _var))] - output1 (&.commit (&.read _var)) - output2 (&.commit (do &.Monad - [_ (&.write 5 _var)] - (&.read _var))) - output3 (&.commit (do &.Monad - [temp (&.read _var) - _ (&.update (i/* 3) _var)] - (&.read _var))) - ?c1+changes' changes - #let [[c1 changes'] (maybe.default [-1 changes] ?c1+changes')] - ?c2+changes' changes' - #let [[c2 changes'] (maybe.default [-1 changes] ?c2+changes')]] - (assert "Can read STM vars. - Can write STM vars. - Can update STM vars. - Can follow all the changes to STM vars." - (and (i/= 0 output1) - (i/= 5 output2) - (i/= 15 output3) - (and (i/= 5 c1) (i/= 15 c2)))))) + changes (io.run (read! (io.run (&.follow _var))))] + _ (&.commit (&.write 5 _var)) + _ (&.commit (&.update (i/* 3) _var)) + changes (promise.future (atom.read changes))] + (assert "Can follow all the changes to STM vars." + (:: (list.Eq number.Eq) = + (list 5 15) + (list.reverse changes))))) (wrap (let [_concurrency-var (&.var 0)] (do promise.Monad [_ (M.seq @ diff --git a/stdlib/test/test/lux/world/net/tcp.lux b/stdlib/test/test/lux/world/net/tcp.lux index 785b1a66b..8d40897d7 100644 --- a/stdlib/test/test/lux/world/net/tcp.lux +++ b/stdlib/test/test/lux/world/net/tcp.lux @@ -5,7 +5,7 @@ ["ex" exception #+ exception:]) (concurrency ["P" promise] ["T" task] - [frp]) + [frp "frp/" Functor]) (data ["E" error] [text] text/format) @@ -24,45 +24,39 @@ (|>> (n/% +1000) (n/+ +8000))))) -(exception: Empty-Channel) - -(def: (head channel) - (All [a] (-> (frp.Channel a) (T.Task a))) - (do P.Monad - [head+tail channel] - (case head+tail - (#.Some [head tail]) - (wrap (ex.return head)) - - #.None - (wrap (ex.throw Empty-Channel ""))))) - (context: "TCP networking." (do @ [port ..port size (|> r.nat (:: @ map (|>> (n/% +100) (n/max +10)))) from (_blob.blob size) to (_blob.blob size) - #let [temp (blob.create size)]] + #let [temp-from (blob.create size) + temp-to (blob.create size)]] ($_ seq (wrap (do P.Monad [result (do T.Monad - [server (@.server port) + [[server-close server] (@.server port) + #let [from-worked? (: (T.Task Bool) + (P.promise #.Nil)) + _ (frp/map (function [socket] + (do @ + [bytes-from (@.read temp-from +0 size socket) + #let [_ (io.run (P.resolve (#E.Success (and (n/= size bytes-from) + (:: blob.Eq = from temp-from))) + from-worked?))]] + (@.write to +0 size socket))) + server)] + client (@.client localhost port) - #################### _ (@.write from +0 size client) - socket (head server) - bytes-from (@.read temp +0 size socket) - #let [from-worked? (and (n/= size bytes-from) - (:: blob.Eq = from temp))] + from-worked? from-worked? #################### - _ (@.write to +0 size socket) - bytes-to (@.read temp +0 size client) + bytes-to (@.read temp-to +0 size client) #let [to-worked? (and (n/= size bytes-to) - (:: blob.Eq = to temp))] + (:: blob.Eq = to temp-to))] #################### _ (@.close client) - _ (T.from-promise (P.future (frp.close server)))] + _ (T.from-promise (P.future (P.resolve [] server-close)))] (wrap (and from-worked? to-worked?)))] (assert "Can communicate between client and server." diff --git a/stdlib/test/test/lux/world/net/udp.lux b/stdlib/test/test/lux/world/net/udp.lux index aa600e0b5..4cb268a4f 100644 --- a/stdlib/test/test/lux/world/net/udp.lux +++ b/stdlib/test/test/lux/world/net/udp.lux @@ -24,19 +24,6 @@ (|>> (n/% +1000) (n/+ +8000))))) -(exception: Empty-Channel) - -(def: (head channel) - (All [a] (-> (frp.Channel a) (T.Task a))) - (do P.Monad - [head+tail channel] - (case head+tail - (#.Some [head tail]) - (wrap (ex.return head)) - - #.None - (wrap (ex.throw Empty-Channel ""))))) - (context: "UDP networking." (do @ [port ..port -- cgit v1.2.3