From f6e280bd4ab41d12083c0eef2c823ad3962d6a04 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sat, 15 Dec 2018 12:39:15 -0400 Subject: Moved the "lux/concurrency" modules under "lux/control". --- stdlib/test/test/lux/concurrency/actor.lux | 75 ----------- stdlib/test/test/lux/concurrency/atom.lux | 34 ----- stdlib/test/test/lux/concurrency/frp.lux | 122 ------------------ stdlib/test/test/lux/concurrency/promise.lux | 72 ----------- stdlib/test/test/lux/concurrency/semaphore.lux | 143 --------------------- stdlib/test/test/lux/concurrency/stm.lux | 77 ----------- stdlib/test/test/lux/control/concurrency/actor.lux | 75 +++++++++++ stdlib/test/test/lux/control/concurrency/atom.lux | 34 +++++ stdlib/test/test/lux/control/concurrency/frp.lux | 122 ++++++++++++++++++ .../test/test/lux/control/concurrency/promise.lux | 72 +++++++++++ .../test/lux/control/concurrency/semaphore.lux | 143 +++++++++++++++++++++ stdlib/test/test/lux/control/concurrency/stm.lux | 77 +++++++++++ 12 files changed, 523 insertions(+), 523 deletions(-) delete mode 100644 stdlib/test/test/lux/concurrency/actor.lux delete mode 100644 stdlib/test/test/lux/concurrency/atom.lux delete mode 100644 stdlib/test/test/lux/concurrency/frp.lux delete mode 100644 stdlib/test/test/lux/concurrency/promise.lux delete mode 100644 stdlib/test/test/lux/concurrency/semaphore.lux delete mode 100644 stdlib/test/test/lux/concurrency/stm.lux create mode 100644 stdlib/test/test/lux/control/concurrency/actor.lux create mode 100644 stdlib/test/test/lux/control/concurrency/atom.lux create mode 100644 stdlib/test/test/lux/control/concurrency/frp.lux create mode 100644 stdlib/test/test/lux/control/concurrency/promise.lux create mode 100644 stdlib/test/test/lux/control/concurrency/semaphore.lux create mode 100644 stdlib/test/test/lux/control/concurrency/stm.lux (limited to 'stdlib/test') diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux deleted file mode 100644 index a43845380..000000000 --- a/stdlib/test/test/lux/concurrency/actor.lux +++ /dev/null @@ -1,75 +0,0 @@ -(.module: - [lux #* - ["." io (#+ IO io)] - [control - ["M" monad (#+ do Monad)] - ["ex" exception]] - [data - ["e" error] - [text - format]] - [concurrency - ["P" promise ("promise/." Monad)] - ["T" task] - ["&" actor (#+ actor: message:)]]] - lux/test) - -(actor: Counter - Nat - - ((handle message state self) - (do T.Monad - [#let [_ (log! "BEFORE")] - output (message state self) - #let [_ (log! "AFTER")]] - (wrap output))) - - ((stop cause state) - (promise/wrap (log! (if (ex.match? &.poisoned cause) - (format "Counter was poisoned: " (%n state)) - cause))))) - -(message: #export Counter - (count! {increment Nat} state self Nat) - (let [state' (n/+ increment state)] - (T.return [state' state']))) - -(context: "Actors" - ($_ seq - (test "Can check if an actor is alive." - (io.run (do io.Monad - [counter (new@Counter 0)] - (wrap (&.alive? counter))))) - - (test "Can poison actors." - (io.run (do io.Monad - [counter (new@Counter 0) - poisoned? (&.poison counter)] - (wrap (and poisoned? - (not (&.alive? counter))))))) - - (test "Cannot poison an already dead actor." - (io.run (do io.Monad - [counter (new@Counter 0) - first-time (&.poison counter) - second-time (&.poison counter)] - (wrap (and first-time - (not second-time)))))) - - (wrap (do P.Monad - [result (do T.Monad - [#let [counter (io.run (new@Counter 0))] - output-1 (count! 1 counter) - output-2 (count! 1 counter) - output-3 (count! 1 counter)] - (wrap (and (n/= 1 output-1) - (n/= 2 output-2) - (n/= 3 output-3))))] - (assert "Can send messages to actors." - (case result - (#e.Success outcome) - outcome - - (#e.Error error) - #0)))) - )) diff --git a/stdlib/test/test/lux/concurrency/atom.lux b/stdlib/test/test/lux/concurrency/atom.lux deleted file mode 100644 index a10edcae7..000000000 --- a/stdlib/test/test/lux/concurrency/atom.lux +++ /dev/null @@ -1,34 +0,0 @@ -(.module: - [lux #* - ["." io] - [control - ["M" monad (#+ do Monad)]] - [concurrency - ["&" atom]] - [math - ["r" random]]] - lux/test) - -(context: "Atoms" - (<| (times 100) - (do @ - [value r.nat - swap-value r.nat - set-value r.nat - #let [box (&.atom value)]] - ($_ seq - (test "Can obtain the value of an atom." - (n/= value (io.run (&.read box)))) - - (test "Can swap the value of an atom." - (and (io.run (&.compare-and-swap value swap-value box)) - (n/= swap-value (io.run (&.read box))))) - - (test "Can update the value of an atom." - (exec (io.run (&.update inc box)) - (n/= (inc swap-value) (io.run (&.read box))))) - - (test "Can immediately set the value of an atom." - (exec (io.run (&.write set-value box)) - (n/= set-value (io.run (&.read box))))) - )))) diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux deleted file mode 100644 index 46db40889..000000000 --- a/stdlib/test/test/lux/concurrency/frp.lux +++ /dev/null @@ -1,122 +0,0 @@ -(.module: - [lux #* - ["." io (#+ IO io)] - [control - ["." monad (#+ do Monad)]] - [data - ["." number] - [collection - ["." list]]] - [concurrency - ["." promise ("promise/." Monad)] - ["." frp (#+ Channel)] - ["." atom (#+ Atom atom)]]] - lux/test) - -(def: (write! values channel) - (All [a] (-> (List a) (Channel a) (IO Any))) - (do io.Monad - [_ (monad.map @ (frp.publish channel) values)] - (wrap []))) - -(def: (read! channel) - (All [a] (-> (Channel a) (IO (Atom (List a))))) - (do io.Monad - [#let [output (atom (list))] - _ (frp.listen (function (_ value) - ## TODO: Simplify when possible. - (do @ - [_ (atom.update (|>> (#.Cons value)) output)] - (wrap []))) - channel)] - (wrap output))) - -(context: "FRP" - (let [(^open "list/.") (list.Equivalence number.Equivalence)] - ($_ seq - (wrap (do promise.Monad - [#let [values (list +0 +1 +2 +3 +4 +5)] - output (promise.future - (do io.Monad - [#let [input (: (Channel Int) (frp.channel []))] - output (read! input) - _ (write! values input)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can pipe one channel into another." - (list/= values - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [input (: (Channel Int) (frp.channel [])) - elems (frp.filter i/even? input)] - output (read! elems) - _ (write! (list +0 +1 +2 +3 +4 +5) input)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can filter a channel's elements." - (list/= (list +0 +2 +4) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [left (: (Channel Int) (frp.channel [])) - right (: (Channel Int) (frp.channel []))] - merged (frp.merge (list left right)) - output (read! merged) - _ (write! (list +0 +1 +2 +3 +4 +5) left) - _ (write! (list +0 -1 -2 -3 -4 -5) right)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can merge channels." - (list/= (list +0 +1 +2 +3 +4 +5 +0 -1 -2 -3 -4 -5) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [inputs (: (Channel Int) (frp.channel [])) - mapped (:: frp.Functor map inc inputs)] - output (read! mapped) - _ (write! (list +0 +1 +2 +3 +4 +5) inputs)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Functor goes over every element in a channel." - (list/= (list +1 +2 +3 +4 +5 +6) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) - >a< (: (Channel Int) (frp.channel []))] - output (read! (let [(^open ".") frp.Apply] - (apply >f< >a<))) - _ (write! (list inc) >f<) - _ (write! (list +12345) >a<)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Apply works over all channel values." - (list/= (list +12346) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (read! (do frp.Monad - [f (frp.from-promise (promise.delay 100 inc)) - a (frp.from-promise (promise.delay 200 +12345))] - (frp.from-promise (promise.delay 300 (f a)))))) - _ (promise.wait 700) - output (promise.future (atom.read output))] - (assert "Valid monad." - (list/= (list +12346) - (list.reverse output))))) - ))) diff --git a/stdlib/test/test/lux/concurrency/promise.lux b/stdlib/test/test/lux/concurrency/promise.lux deleted file mode 100644 index e857d0708..000000000 --- a/stdlib/test/test/lux/concurrency/promise.lux +++ /dev/null @@ -1,72 +0,0 @@ -(.module: - [lux #* - ["." io] - [control - ["M" monad (#+ do Monad)] - pipe] - [concurrency - ["&" promise ("&/." Monad)]] - [math - ["r" random]]] - lux/test) - -(context: "Promises" - ($_ seq - (wrap (do &.Monad - [running? (&.future (io.io #1))] - (assert "Can run IO actions in separate threads." - running?))) - - (wrap (do &.Monad - [_ (&.wait 500)] - (assert "Can wait for a specified amount of time." - #1))) - - (wrap (do &.Monad - [[left right] (&.and (&.future (io.io #1)) - (&.future (io.io #0)))] - (assert "Can combine promises sequentially." - (and left (not right))))) - - (wrap (do &.Monad - [?left (&.or (&.delay 100 #1) - (&.delay 200 #0)) - ?right (&.or (&.delay 200 #1) - (&.delay 100 #0))] - (assert "Can combine promises alternatively." - (case [?left ?right] - [(#.Left #1) (#.Right #0)] - #1 - - _ - #0)))) - - (wrap (do &.Monad - [?left (&.either (&.delay 100 #1) - (&.delay 200 #0)) - ?right (&.either (&.delay 200 #1) - (&.delay 100 #0))] - (assert "Can combine promises alternatively [Part 2]." - (and ?left (not ?right))))) - - (test "Can poll a promise for its value." - (and (|> (&.poll (&/wrap #1)) - (case> (#.Some #1) #1 _ #0)) - (|> (&.poll (&.delay 200 #1)) - (case> #.None #1 _ #0)))) - - (test "Cannot re-resolve a resolved promise." - (and (not (io.run (&.resolve #0 (&/wrap #1)))) - (io.run (&.resolve #1 (: (&.Promise Bit) (&.promise #.None)))))) - - (wrap (do &.Monad - [?none (&.time-out 100 (&.delay 200 #1)) - ?some (&.time-out 200 (&.delay 100 #1))] - (assert "Can establish maximum waiting times for promises to be fulfilled." - (case [?none ?some] - [#.None (#.Some #1)] - #1 - - _ - #0)))) - )) diff --git a/stdlib/test/test/lux/concurrency/semaphore.lux b/stdlib/test/test/lux/concurrency/semaphore.lux deleted file mode 100644 index f309fcd0c..000000000 --- a/stdlib/test/test/lux/concurrency/semaphore.lux +++ /dev/null @@ -1,143 +0,0 @@ -(.module: - [lux #* - [control - ["." monad (#+ do)]] - [data - ["." maybe] - ["." text ("text/." Equivalence Monoid) - format] - [collection - ["." list ("list/." Functor)]]] - [concurrency - ["/" semaphore] - ["." promise (#+ Promise)] - ["." atom (#+ Atom)]] - ["." io] - [math - ["r" random]]] - lux/test) - -(def: (wait-many-times times semaphore) - (-> Nat /.Semaphore (Promise Any)) - (loop [steps times] - (if (n/> 0 steps) - (do promise.Monad - [_ (/.wait semaphore)] - (recur (dec steps))) - (:: promise.Monad wrap [])))) - -(context: "Semaphore." - (<| (times 100) - (do @ - [open-positions (|> r.nat (:: @ map (|>> (n/% 10) (n/max 1))))] - ($_ seq - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [_ (wait-many-times open-positions semaphore)] - (assert "Can wait on a semaphore up to the number of open positions without blocking." - #1)))) - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [result (<| (promise.time-out 100) - (wait-many-times (inc open-positions) semaphore))] - (assert "Waiting on a semaphore more than the number of open positions blocks the process." - (case result - (#.Some _) - #0 - - #.None - #1))))) - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [_ (: (Promise Any) - (loop [steps (n/* 2 open-positions)] - (if (n/> 0 steps) - (do @ - [_ (/.wait semaphore) - _ (/.signal semaphore)] - (recur (dec steps))) - (wrap []))))] - (assert "Signaling a semaphore replenishes its open positions." - #1)))) - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [#let [resource (atom.atom "") - blocked (do @ - [_ (wait-many-times open-positions semaphore) - _ (/.wait semaphore) - #let [_ (io.run (atom.update (|>> (format "B")) - resource))]] - (wrap []))] - _ (promise.wait 100) - _ (exec (io.run (atom.update (|>> (format "A")) - resource)) - (/.signal semaphore)) - _ blocked] - (assert "A blocked process can be un-blocked by a signal somewhere else." - (text/= "BA" - (io.run (atom.read resource))))))) - )))) - -(context: "Mutex." - (<| (times 100) - (do @ - [repetitions (|> r.nat (:: @ map (|>> (n/% 100) (n/max 10))))] - ($_ seq - (let [mutex (/.mutex [])] - (wrap (do promise.Monad - [#let [resource (atom.atom "") - expected-As (text.join-with "" (list.repeat repetitions "A")) - expected-Bs (text.join-with "" (list.repeat repetitions "B")) - processA (<| (/.synchronize mutex) - io.io - promise.future - (do io.Monad - [_ (<| (monad.seq @) - (list.repeat repetitions) - (atom.update (|>> (format "A")) resource))] - (wrap []))) - processB (<| (/.synchronize mutex) - io.io - promise.future - (do io.Monad - [_ (<| (monad.seq @) - (list.repeat repetitions) - (atom.update (|>> (format "B")) resource))] - (wrap [])))] - _ processA - _ processB - #let [outcome (io.run (atom.read resource))]] - (assert "Mutexes only allow one process to execute at a time." - (or (text/= (format expected-As expected-Bs) - outcome) - (text/= (format expected-Bs expected-As) - outcome)))))) - )))) - -(def: (waiter resource barrier id) - (-> (Atom Text) /.Barrier Nat (Promise Any)) - (do promise.Monad - [_ (/.block barrier) - #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]] - (wrap []))) - -(context: "Barrier." - (let [limit 10 - barrier (/.barrier (maybe.assume (/.limit limit))) - resource (atom.atom "")] - ($_ seq - (wrap (do promise.Monad - [#let [ids (list.n/range 0 (dec limit)) - waiters (list/map (function (_ id) - (let [process (waiter resource barrier id)] - (exec (io.run (atom.update (|>> (format "_")) resource)) - process))) - ids)] - _ (monad.seq @ waiters) - #let [outcome (io.run (atom.read resource))]] - (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all." - (and (text.ends-with? "__________" outcome) - (list.every? (function (_ id) - (text.contains? (%n id) outcome)) - ids) - ))))))) diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux deleted file mode 100644 index ee84f193e..000000000 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ /dev/null @@ -1,77 +0,0 @@ -(.module: - [lux #* - ["." io (#+ IO)] - [control - ["M" monad (#+ do Monad)]] - [data - ["." number] - [collection - ["." list ("list/." Functor)]]] - [concurrency - ["." atom (#+ Atom atom)] - ["&" stm] - ["." process] - ["." promise] - ["." frp (#+ Channel)]] - [math - ["r" random]]] - lux/test) - -(def: (read! channel) - (All [a] (-> (Channel a) (IO (Atom (List a))))) - (do io.Monad - [#let [output (atom (list))] - _ (frp.listen (function (_ value) - ## TODO: Simplify when possible. - (do @ - [_ (atom.update (|>> (#.Cons value)) output)] - (wrap []))) - channel)] - (wrap output))) - -(def: iterations-per-process Nat 100) - -(context: "STM" - ($_ seq - (wrap (do promise.Monad - [output (&.commit (&.read (&.var 0)))] - (assert "Can read STM vars." - (n/= 0 output)))) - (wrap (do promise.Monad - [#let [_var (&.var 0)] - output (&.commit (do &.Monad - [_ (&.write 5 _var)] - (&.read _var)))] - (assert "Can write STM vars." - (n/= 5 output)))) - (wrap (do promise.Monad - [#let [_var (&.var 5)] - output (&.commit (do &.Monad - [_ (&.update (n/* 3) _var)] - (&.read _var)))] - (assert "Can update STM vars." - (n/= 15 output)))) - (wrap (do promise.Monad - [#let [_var (&.var 0) - changes (io.run (read! (io.run (&.follow _var))))] - _ (&.commit (&.write 5 _var)) - _ (&.commit (&.update (n/* 3) _var)) - changes (promise.future (atom.read changes))] - (assert "Can follow all the changes to STM vars." - (:: (list.Equivalence number.Equivalence) = - (list 5 15) - (list.reverse changes))))) - (wrap (let [_concurrency-var (&.var 0)] - (do promise.Monad - [_ (|> process.parallelism - (list.n/range 1) - (list/map (function (_ _) - (|> iterations-per-process - (list.n/range 1) - (M.map @ (function (_ _) (&.commit (&.update inc _concurrency-var))))))) - (M.seq @)) - last-val (&.commit (&.read _concurrency-var))] - (assert "Can modify STM vars concurrently from multiple threads." - (|> process.parallelism - (n/* iterations-per-process) - (n/= last-val)))))))) diff --git a/stdlib/test/test/lux/control/concurrency/actor.lux b/stdlib/test/test/lux/control/concurrency/actor.lux new file mode 100644 index 000000000..90c3d6dd4 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/actor.lux @@ -0,0 +1,75 @@ +(.module: + [lux #* + ["." io (#+ IO io)] + [control + ["M" monad (#+ do Monad)] + ["ex" exception] + [concurrency + ["P" promise ("promise/." Monad)] + ["T" task] + ["&" actor (#+ actor: message:)]]] + [data + ["e" error] + [text + format]]] + lux/test) + +(actor: Counter + Nat + + ((handle message state self) + (do T.Monad + [#let [_ (log! "BEFORE")] + output (message state self) + #let [_ (log! "AFTER")]] + (wrap output))) + + ((stop cause state) + (promise/wrap (log! (if (ex.match? &.poisoned cause) + (format "Counter was poisoned: " (%n state)) + cause))))) + +(message: #export Counter + (count! {increment Nat} state self Nat) + (let [state' (n/+ increment state)] + (T.return [state' state']))) + +(context: "Actors" + ($_ seq + (test "Can check if an actor is alive." + (io.run (do io.Monad + [counter (new@Counter 0)] + (wrap (&.alive? counter))))) + + (test "Can poison actors." + (io.run (do io.Monad + [counter (new@Counter 0) + poisoned? (&.poison counter)] + (wrap (and poisoned? + (not (&.alive? counter))))))) + + (test "Cannot poison an already dead actor." + (io.run (do io.Monad + [counter (new@Counter 0) + first-time (&.poison counter) + second-time (&.poison counter)] + (wrap (and first-time + (not second-time)))))) + + (wrap (do P.Monad + [result (do T.Monad + [#let [counter (io.run (new@Counter 0))] + output-1 (count! 1 counter) + output-2 (count! 1 counter) + output-3 (count! 1 counter)] + (wrap (and (n/= 1 output-1) + (n/= 2 output-2) + (n/= 3 output-3))))] + (assert "Can send messages to actors." + (case result + (#e.Success outcome) + outcome + + (#e.Error error) + #0)))) + )) diff --git a/stdlib/test/test/lux/control/concurrency/atom.lux b/stdlib/test/test/lux/control/concurrency/atom.lux new file mode 100644 index 000000000..720547e27 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/atom.lux @@ -0,0 +1,34 @@ +(.module: + [lux #* + ["." io] + [control + ["M" monad (#+ do Monad)] + [concurrency + ["&" atom]]] + [math + ["r" random]]] + lux/test) + +(context: "Atoms" + (<| (times 100) + (do @ + [value r.nat + swap-value r.nat + set-value r.nat + #let [box (&.atom value)]] + ($_ seq + (test "Can obtain the value of an atom." + (n/= value (io.run (&.read box)))) + + (test "Can swap the value of an atom." + (and (io.run (&.compare-and-swap value swap-value box)) + (n/= swap-value (io.run (&.read box))))) + + (test "Can update the value of an atom." + (exec (io.run (&.update inc box)) + (n/= (inc swap-value) (io.run (&.read box))))) + + (test "Can immediately set the value of an atom." + (exec (io.run (&.write set-value box)) + (n/= set-value (io.run (&.read box))))) + )))) diff --git a/stdlib/test/test/lux/control/concurrency/frp.lux b/stdlib/test/test/lux/control/concurrency/frp.lux new file mode 100644 index 000000000..04ddd5986 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/frp.lux @@ -0,0 +1,122 @@ +(.module: + [lux #* + ["." io (#+ IO io)] + [control + ["." monad (#+ do Monad)] + [concurrency + ["." promise ("promise/." Monad)] + ["." frp (#+ Channel)] + ["." atom (#+ Atom atom)]]] + [data + ["." number] + [collection + ["." list]]]] + lux/test) + +(def: (write! values channel) + (All [a] (-> (List a) (Channel a) (IO Any))) + (do io.Monad + [_ (monad.map @ (frp.publish channel) values)] + (wrap []))) + +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function (_ value) + ## TODO: Simplify when possible. + (do @ + [_ (atom.update (|>> (#.Cons value)) output)] + (wrap []))) + channel)] + (wrap output))) + +(context: "FRP" + (let [(^open "list/.") (list.Equivalence number.Equivalence)] + ($_ seq + (wrap (do promise.Monad + [#let [values (list +0 +1 +2 +3 +4 +5)] + output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel []))] + output (read! input) + _ (write! values input)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Can pipe one channel into another." + (list/= values + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel [])) + elems (frp.filter i/even? input)] + output (read! elems) + _ (write! (list +0 +1 +2 +3 +4 +5) input)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Can filter a channel's elements." + (list/= (list +0 +2 +4) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [left (: (Channel Int) (frp.channel [])) + right (: (Channel Int) (frp.channel []))] + merged (frp.merge (list left right)) + output (read! merged) + _ (write! (list +0 +1 +2 +3 +4 +5) left) + _ (write! (list +0 -1 -2 -3 -4 -5) right)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Can merge channels." + (list/= (list +0 +1 +2 +3 +4 +5 +0 -1 -2 -3 -4 -5) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [inputs (: (Channel Int) (frp.channel [])) + mapped (:: frp.Functor map inc inputs)] + output (read! mapped) + _ (write! (list +0 +1 +2 +3 +4 +5) inputs)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Functor goes over every element in a channel." + (list/= (list +1 +2 +3 +4 +5 +6) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) + >a< (: (Channel Int) (frp.channel []))] + output (read! (let [(^open ".") frp.Apply] + (apply >f< >a<))) + _ (write! (list inc) >f<) + _ (write! (list +12345) >a<)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Apply works over all channel values." + (list/= (list +12346) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (read! (do frp.Monad + [f (frp.from-promise (promise.delay 100 inc)) + a (frp.from-promise (promise.delay 200 +12345))] + (frp.from-promise (promise.delay 300 (f a)))))) + _ (promise.wait 700) + output (promise.future (atom.read output))] + (assert "Valid monad." + (list/= (list +12346) + (list.reverse output))))) + ))) diff --git a/stdlib/test/test/lux/control/concurrency/promise.lux b/stdlib/test/test/lux/control/concurrency/promise.lux new file mode 100644 index 000000000..d666f3b31 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/promise.lux @@ -0,0 +1,72 @@ +(.module: + [lux #* + ["." io] + [control + ["M" monad (#+ do Monad)] + pipe + [concurrency + ["&" promise ("&/." Monad)]]] + [math + ["r" random]]] + lux/test) + +(context: "Promises" + ($_ seq + (wrap (do &.Monad + [running? (&.future (io.io #1))] + (assert "Can run IO actions in separate threads." + running?))) + + (wrap (do &.Monad + [_ (&.wait 500)] + (assert "Can wait for a specified amount of time." + #1))) + + (wrap (do &.Monad + [[left right] (&.and (&.future (io.io #1)) + (&.future (io.io #0)))] + (assert "Can combine promises sequentially." + (and left (not right))))) + + (wrap (do &.Monad + [?left (&.or (&.delay 100 #1) + (&.delay 200 #0)) + ?right (&.or (&.delay 200 #1) + (&.delay 100 #0))] + (assert "Can combine promises alternatively." + (case [?left ?right] + [(#.Left #1) (#.Right #0)] + #1 + + _ + #0)))) + + (wrap (do &.Monad + [?left (&.either (&.delay 100 #1) + (&.delay 200 #0)) + ?right (&.either (&.delay 200 #1) + (&.delay 100 #0))] + (assert "Can combine promises alternatively [Part 2]." + (and ?left (not ?right))))) + + (test "Can poll a promise for its value." + (and (|> (&.poll (&/wrap #1)) + (case> (#.Some #1) #1 _ #0)) + (|> (&.poll (&.delay 200 #1)) + (case> #.None #1 _ #0)))) + + (test "Cannot re-resolve a resolved promise." + (and (not (io.run (&.resolve #0 (&/wrap #1)))) + (io.run (&.resolve #1 (: (&.Promise Bit) (&.promise #.None)))))) + + (wrap (do &.Monad + [?none (&.time-out 100 (&.delay 200 #1)) + ?some (&.time-out 200 (&.delay 100 #1))] + (assert "Can establish maximum waiting times for promises to be fulfilled." + (case [?none ?some] + [#.None (#.Some #1)] + #1 + + _ + #0)))) + )) diff --git a/stdlib/test/test/lux/control/concurrency/semaphore.lux b/stdlib/test/test/lux/control/concurrency/semaphore.lux new file mode 100644 index 000000000..5c09f4bac --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/semaphore.lux @@ -0,0 +1,143 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + [concurrency + ["/" semaphore] + ["." promise (#+ Promise)] + ["." atom (#+ Atom)]]] + [data + ["." maybe] + ["." text ("text/." Equivalence Monoid) + format] + [collection + ["." list ("list/." Functor)]]] + ["." io] + [math + ["r" random]]] + lux/test) + +## (def: (wait-many-times times semaphore) +## (-> Nat /.Semaphore (Promise Any)) +## (loop [steps times] +## (if (n/> 0 steps) +## (do promise.Monad +## [_ (/.wait semaphore)] +## (recur (dec steps))) +## (:: promise.Monad wrap [])))) + +## (context: "Semaphore." +## (<| (times 100) +## (do @ +## [open-positions (|> r.nat (:: @ map (|>> (n/% 10) (n/max 1))))] +## ($_ seq +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [_ (wait-many-times open-positions semaphore)] +## (assert "Can wait on a semaphore up to the number of open positions without blocking." +## true)))) +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [result (<| (promise.time-out 100) +## (wait-many-times (inc open-positions) semaphore))] +## (assert "Waiting on a semaphore more than the number of open positions blocks the process." +## (case result +## (#.Some _) +## false + +## #.None +## true))))) +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [_ (: (Promise Any) +## (loop [steps (n/* 2 open-positions)] +## (if (n/> 0 steps) +## (do @ +## [_ (/.wait semaphore) +## _ (/.signal semaphore)] +## (recur (dec steps))) +## (wrap []))))] +## (assert "Signaling a semaphore replenishes its open positions." +## true)))) +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [#let [resource (atom.atom "") +## blocked (do @ +## [_ (wait-many-times open-positions semaphore) +## _ (/.wait semaphore) +## #let [_ (io.run (atom.update (|>> (format "B")) +## resource))]] +## (wrap []))] +## _ (promise.wait 100) +## _ (exec (io.run (atom.update (|>> (format "A")) +## resource)) +## (/.signal semaphore)) +## _ blocked] +## (assert "A blocked process can be un-blocked by a signal somewhere else." +## (text/= "BA" +## (io.run (atom.read resource))))))) +## )))) + +## (context: "Mutex." +## (<| (times 100) +## (do @ +## [repetitions (|> r.nat (:: @ map (|>> (n/% 100) (n/max 10))))] +## ($_ seq +## (let [mutex (/.mutex [])] +## (wrap (do promise.Monad +## [#let [resource (atom.atom "") +## expected-As (text.join-with "" (list.repeat repetitions "A")) +## expected-Bs (text.join-with "" (list.repeat repetitions "B")) +## processA (<| (/.synchronize mutex) +## io.io +## promise.future +## (do io.Monad +## [_ (<| (monad.seq @) +## (list.repeat repetitions) +## (atom.update (|>> (format "A")) resource))] +## (wrap []))) +## processB (<| (/.synchronize mutex) +## io.io +## promise.future +## (do io.Monad +## [_ (<| (monad.seq @) +## (list.repeat repetitions) +## (atom.update (|>> (format "B")) resource))] +## (wrap [])))] +## _ processA +## _ processB +## #let [outcome (io.run (atom.read resource))]] +## (assert "Mutexes only allow one process to execute at a time." +## (or (text/= (format expected-As expected-Bs) +## outcome) +## (text/= (format expected-Bs expected-As) +## outcome)))))) +## )))) + +## (def: (waiter resource barrier id) +## (-> (Atom Text) /.Barrier Nat (Promise Any)) +## (do promise.Monad +## [_ (/.block barrier) +## #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]] +## (wrap []))) + +## (context: "Barrier." +## (let [limit 10 +## barrier (/.barrier (maybe.assume (/.limit limit))) +## resource (atom.atom "")] +## ($_ seq +## (wrap (do promise.Monad +## [#let [ids (list.n/range 0 (dec limit)) +## waiters (list/map (function (_ id) +## (let [process (waiter resource barrier id)] +## (exec (io.run (atom.update (|>> (format "_")) resource)) +## process))) +## ids)] +## _ (monad.seq @ waiters) +## #let [outcome (io.run (atom.read resource))]] +## (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all." +## (and (text.ends-with? "__________" outcome) +## (list.every? (function (_ id) +## (text.contains? (%n id) outcome)) +## ids) +## ))))))) diff --git a/stdlib/test/test/lux/control/concurrency/stm.lux b/stdlib/test/test/lux/control/concurrency/stm.lux new file mode 100644 index 000000000..c84ce44cc --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/stm.lux @@ -0,0 +1,77 @@ +(.module: + [lux #* + ["." io (#+ IO)] + [control + ["M" monad (#+ do Monad)] + [concurrency + ["." atom (#+ Atom atom)] + ["&" stm] + ["." process] + ["." promise] + ["." frp (#+ Channel)]]] + [data + ["." number] + [collection + ["." list ("list/." Functor)]]] + [math + ["r" random]]] + lux/test) + +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function (_ value) + ## TODO: Simplify when possible. + (do @ + [_ (atom.update (|>> (#.Cons value)) output)] + (wrap []))) + channel)] + (wrap output))) + +(def: iterations-per-process Nat 100) + +(context: "STM" + ($_ seq + (wrap (do promise.Monad + [output (&.commit (&.read (&.var 0)))] + (assert "Can read STM vars." + (n/= 0 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 0)] + output (&.commit (do &.Monad + [_ (&.write 5 _var)] + (&.read _var)))] + (assert "Can write STM vars." + (n/= 5 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 5)] + output (&.commit (do &.Monad + [_ (&.update (n/* 3) _var)] + (&.read _var)))] + (assert "Can update STM vars." + (n/= 15 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 0) + changes (io.run (read! (io.run (&.follow _var))))] + _ (&.commit (&.write 5 _var)) + _ (&.commit (&.update (n/* 3) _var)) + changes (promise.future (atom.read changes))] + (assert "Can follow all the changes to STM vars." + (:: (list.Equivalence number.Equivalence) = + (list 5 15) + (list.reverse changes))))) + (wrap (let [_concurrency-var (&.var 0)] + (do promise.Monad + [_ (|> process.parallelism + (list.n/range 1) + (list/map (function (_ _) + (|> iterations-per-process + (list.n/range 1) + (M.map @ (function (_ _) (&.commit (&.update inc _concurrency-var))))))) + (M.seq @)) + last-val (&.commit (&.read _concurrency-var))] + (assert "Can modify STM vars concurrently from multiple threads." + (|> process.parallelism + (n/* iterations-per-process) + (n/= last-val)))))))) -- cgit v1.2.3