From ea32f6d5500b5affa7f5b11cdc05b48ad4fe7a46 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 12 Dec 2017 08:28:55 -0400 Subject: - FRP channels are no longer asynchronously-built lists, but rather mediums for spreading data, that can be chained together to form networks for data distribution. --- stdlib/test/test/lux/concurrency/frp.lux | 224 +++++++++++++++---------------- stdlib/test/test/lux/concurrency/stm.lux | 64 +++++---- stdlib/test/test/lux/world/net/tcp.lux | 44 +++--- stdlib/test/test/lux/world/net/udp.lux | 13 -- 4 files changed, 167 insertions(+), 178 deletions(-) (limited to 'stdlib/test') diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux index 717eb0624..b24372781 100644 --- a/stdlib/test/test/lux/concurrency/frp.lux +++ b/stdlib/test/test/lux/concurrency/frp.lux @@ -1,124 +1,116 @@ (.module: lux (lux [io #+ IO io] - (control ["M" monad #+ do Monad]) + (control [monad #+ do Monad]) (data [number] - text/format) - (concurrency ["P" promise #+ "P/" Monad] - ["&" frp])) + text/format + (coll [list])) + (concurrency [promise #+ "promise/" Monad] + [frp #+ Channel] + [atom #+ Atom atom])) lux/test) -(def: (to-channel values) - (-> (List Int) (&.Channel Int)) - (let [_channel (: (&.Channel Int) (&.channel))] - (io.run (do io.Monad - [_ (M.map @ (function [value] (&.write value _channel)) - values) - _ (&.close _channel)] - (wrap _channel))))) +(def: (write! values channel) + (All [a] (-> (List a) (Channel a) (IO Unit))) + (do io.Monad + [_ (monad.map @ (frp.publish channel) values)] + (wrap []))) + +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function [value] + (atom.update (|>> (#.Cons value)) output)) + channel)] + (wrap output))) (context: "FRP" - ($_ seq - (wrap (do P.Monad - [elems (&.consume (to-channel (list 0 1 2 3 4 5)))] - (assert "Can consume a channel into a list." - (case elems - (^ (list 0 1 2 3 4 5)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (let [input (to-channel (list 0 1 2 3 4 5)) - output (: (&.Channel Int) (&.channel))] - (exec (&.pipe input output) - output)))] - (assert "Can pipe one channel into another." - (case elems - (^ (list 0 1 2 3 4 5)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (&.filter i/even? (to-channel (list 0 1 2 3 4 5))))] - (assert "Can filter a channel's elements." - (case elems - (^ (list 0 2 4)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (&.merge (list (to-channel (list 0 1 2 3 4 5)) - (to-channel (list 0 -1 -2 -3 -4 -5)))))] - (assert "Can merge channels." - (case elems - (^ (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5)) - true - - _ - false)))) - - (wrap (do P.Monad - [output (&.fold (function [base input] (P/wrap (i/+ input base))) 0 (to-channel (list 0 1 2 3 4 5)))] - (assert "Can fold over a channel." - (i/= 15 output)))) - - (wrap (do P.Monad - [elems (&.consume (&.distinct number.Eq (to-channel (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))] - (assert "Can avoid immediate repetition in the channel." - (case elems - (^ (list 0 1 2 3 4 5)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (&.once (:: @ wrap 12345)))] - (assert "Can convert a promise into a single-value channel." - (case elems - (^ (list 12345)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (:: &.Functor map i/inc (to-channel (list 0 1 2 3 4 5))))] - (assert "Functor goes over every element in a channel." - (case elems - (^ (list 1 2 3 4 5 6)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (let [(^open) &.Applicative] - (apply (wrap i/inc) (wrap 12345))))] - (assert "Applicative works over all channel values." - (case elems - (^ (list 12346)) - true - - _ - false)))) - - (wrap (do P.Monad - [elems (&.consume (do &.Monad - [f (wrap i/inc) - a (wrap 12345)] - (wrap (f a))))] - (assert "Monad works over all channel values." - (case elems - (^ (list 12346)) - true - - _ - false)))) - )) + (let [(^open "list/") (list.Eq number.Eq)] + ($_ seq + (wrap (do promise.Monad + [#let [values (list 0 1 2 3 4 5)] + output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel []))] + output (read! input) + _ (write! values input)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Can pipe one channel into another." + (list/= values + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel [])) + elems (frp.filter i/even? input)] + output (read! elems) + _ (write! (list 0 1 2 3 4 5) input)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Can filter a channel's elements." + (list/= (list 0 2 4) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [left (: (Channel Int) (frp.channel [])) + right (: (Channel Int) (frp.channel []))] + merged (frp.merge (list left right)) + output (read! merged) + _ (write! (list 0 1 2 3 4 5) left) + _ (write! (list 0 -1 -2 -3 -4 -5) right)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Can merge channels." + (list/= (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [inputs (: (Channel Int) (frp.channel [])) + mapped (:: frp.Functor map i/inc inputs)] + output (read! mapped) + _ (write! (list 0 1 2 3 4 5) inputs)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Functor goes over every element in a channel." + (list/= (list 1 2 3 4 5 6) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) + >a< (: (Channel Int) (frp.channel []))] + output (read! (let [(^open) frp.Applicative] + (apply >f< >a<))) + _ (write! (list i/inc) >f<) + _ (write! (list 12345) >a<)] + (wrap output))) + _ (promise.wait +100) + output (promise.future (atom.read output))] + (assert "Applicative works over all channel values." + (list/= (list 12346) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (read! (do frp.Monad + [f (frp.from-promise (promise.delay +100 i/inc)) + a (frp.from-promise (promise.delay +200 12345))] + (frp.from-promise (promise.delay +300 (f a)))))) + _ (promise.wait +600) + output (promise.future (atom.read output))] + (assert "Valid monad." + (list/= (list 12346) + (list.reverse output))))) + ))) diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux index d2e299c50..1ca5482bf 100644 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ b/stdlib/test/test/lux/concurrency/stm.lux @@ -1,43 +1,59 @@ (.module: lux - (lux [io] + (lux [io #+ IO] (control ["M" monad #+ do Monad]) (data [number] [maybe] (coll [list "" Functor "List/" Fold]) text/format) - (concurrency ["&" stm] - [promise]) + (concurrency [atom #+ Atom atom] + ["&" stm] + [promise] + [frp #+ Channel]) ["r" math/random]) lux/test) +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function [value] + (atom.update (|>> (#.Cons value)) output)) + channel)] + (wrap output))) + (def: iterations/processes Int 100) (context: "STM" ($_ seq + (wrap (do promise.Monad + [output (&.commit (&.read (&.var 0)))] + (assert "Can read STM vars." + (i/= 0 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 0)] + output (&.commit (do &.Monad + [_ (&.write 5 _var)] + (&.read _var)))] + (assert "Can write STM vars." + (i/= 5 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 5)] + output (&.commit (do &.Monad + [_ (&.update (i/* 3) _var)] + (&.read _var)))] + (assert "Can update STM vars." + (i/= 15 output)))) (wrap (do promise.Monad [#let [_var (&.var 0) - changes (io.run (&.follow _var))] - output1 (&.commit (&.read _var)) - output2 (&.commit (do &.Monad - [_ (&.write 5 _var)] - (&.read _var))) - output3 (&.commit (do &.Monad - [temp (&.read _var) - _ (&.update (i/* 3) _var)] - (&.read _var))) - ?c1+changes' changes - #let [[c1 changes'] (maybe.default [-1 changes] ?c1+changes')] - ?c2+changes' changes' - #let [[c2 changes'] (maybe.default [-1 changes] ?c2+changes')]] - (assert "Can read STM vars. - Can write STM vars. - Can update STM vars. - Can follow all the changes to STM vars." - (and (i/= 0 output1) - (i/= 5 output2) - (i/= 15 output3) - (and (i/= 5 c1) (i/= 15 c2)))))) + changes (io.run (read! (io.run (&.follow _var))))] + _ (&.commit (&.write 5 _var)) + _ (&.commit (&.update (i/* 3) _var)) + changes (promise.future (atom.read changes))] + (assert "Can follow all the changes to STM vars." + (:: (list.Eq number.Eq) = + (list 5 15) + (list.reverse changes))))) (wrap (let [_concurrency-var (&.var 0)] (do promise.Monad [_ (M.seq @ diff --git a/stdlib/test/test/lux/world/net/tcp.lux b/stdlib/test/test/lux/world/net/tcp.lux index 785b1a66b..8d40897d7 100644 --- a/stdlib/test/test/lux/world/net/tcp.lux +++ b/stdlib/test/test/lux/world/net/tcp.lux @@ -5,7 +5,7 @@ ["ex" exception #+ exception:]) (concurrency ["P" promise] ["T" task] - [frp]) + [frp "frp/" Functor]) (data ["E" error] [text] text/format) @@ -24,45 +24,39 @@ (|>> (n/% +1000) (n/+ +8000))))) -(exception: Empty-Channel) - -(def: (head channel) - (All [a] (-> (frp.Channel a) (T.Task a))) - (do P.Monad - [head+tail channel] - (case head+tail - (#.Some [head tail]) - (wrap (ex.return head)) - - #.None - (wrap (ex.throw Empty-Channel ""))))) - (context: "TCP networking." (do @ [port ..port size (|> r.nat (:: @ map (|>> (n/% +100) (n/max +10)))) from (_blob.blob size) to (_blob.blob size) - #let [temp (blob.create size)]] + #let [temp-from (blob.create size) + temp-to (blob.create size)]] ($_ seq (wrap (do P.Monad [result (do T.Monad - [server (@.server port) + [[server-close server] (@.server port) + #let [from-worked? (: (T.Task Bool) + (P.promise #.Nil)) + _ (frp/map (function [socket] + (do @ + [bytes-from (@.read temp-from +0 size socket) + #let [_ (io.run (P.resolve (#E.Success (and (n/= size bytes-from) + (:: blob.Eq = from temp-from))) + from-worked?))]] + (@.write to +0 size socket))) + server)] + client (@.client localhost port) - #################### _ (@.write from +0 size client) - socket (head server) - bytes-from (@.read temp +0 size socket) - #let [from-worked? (and (n/= size bytes-from) - (:: blob.Eq = from temp))] + from-worked? from-worked? #################### - _ (@.write to +0 size socket) - bytes-to (@.read temp +0 size client) + bytes-to (@.read temp-to +0 size client) #let [to-worked? (and (n/= size bytes-to) - (:: blob.Eq = to temp))] + (:: blob.Eq = to temp-to))] #################### _ (@.close client) - _ (T.from-promise (P.future (frp.close server)))] + _ (T.from-promise (P.future (P.resolve [] server-close)))] (wrap (and from-worked? to-worked?)))] (assert "Can communicate between client and server." diff --git a/stdlib/test/test/lux/world/net/udp.lux b/stdlib/test/test/lux/world/net/udp.lux index aa600e0b5..4cb268a4f 100644 --- a/stdlib/test/test/lux/world/net/udp.lux +++ b/stdlib/test/test/lux/world/net/udp.lux @@ -24,19 +24,6 @@ (|>> (n/% +1000) (n/+ +8000))))) -(exception: Empty-Channel) - -(def: (head channel) - (All [a] (-> (frp.Channel a) (T.Task a))) - (do P.Monad - [head+tail channel] - (case head+tail - (#.Some [head tail]) - (wrap (ex.return head)) - - #.None - (wrap (ex.throw Empty-Channel ""))))) - (context: "UDP networking." (do @ [port ..port -- cgit v1.2.3