From 927694bdd07f25105f28649cf3c93a4275321a12 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 30 Jan 2018 01:13:07 -0400 Subject: - Implemented semaphores, mutexes and barriers. - Fixed a bug when checking "last-index-of" for Text. --- stdlib/test/test/lux/concurrency/frp.lux | 5 +- stdlib/test/test/lux/concurrency/semaphore.lux | 138 +++++++++++++++++++++++++ stdlib/test/test/lux/concurrency/stm.lux | 5 +- 3 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 stdlib/test/test/lux/concurrency/semaphore.lux (limited to 'stdlib/test') diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux index 5360dcda2..5f7245d0f 100644 --- a/stdlib/test/test/lux/concurrency/frp.lux +++ b/stdlib/test/test/lux/concurrency/frp.lux @@ -21,7 +21,10 @@ (do io.Monad [#let [output (atom (list))] _ (frp.listen (function [value] - (atom.update (|>> (#.Cons value)) output)) + ## TODO: Simplify when possible. + (do @ + [_ (atom.update (|>> (#.Cons value)) output)] + (wrap []))) channel)] (wrap output))) diff --git a/stdlib/test/test/lux/concurrency/semaphore.lux b/stdlib/test/test/lux/concurrency/semaphore.lux new file mode 100644 index 000000000..59a9618fa --- /dev/null +++ b/stdlib/test/test/lux/concurrency/semaphore.lux @@ -0,0 +1,138 @@ +(.module: + lux + (lux (control [monad #+ do]) + (data [maybe] + [text "text/" Eq Monoid] + text/format + (coll [list "list/" Functor])) + (concurrency ["/" semaphore] + [promise #+ Promise] + [atom #+ Atom]) + [io] + ["r" math/random]) + lux/test) + +(def: (wait-many-times times semaphore) + (-> Nat /.Semaphore (Promise Top)) + (loop [steps times] + (if (n/> +0 steps) + (do promise.Monad + [_ (/.wait semaphore)] + (recur (n/dec steps))) + (:: promise.Monad wrap [])))) + +(context: "Semaphore." + (<| (times +100) + (do @ + [open-positions (|> r.nat (:: @ map (|>> (n/% +10) (n/max +1))))] + ($_ seq + (let [semaphore (/.semaphore open-positions)] + (wrap (do promise.Monad + [_ (wait-many-times open-positions semaphore)] + (assert "Can wait on a semaphore up to the number of open positions without blocking." + true)))) + (let [semaphore (/.semaphore open-positions)] + (wrap (do promise.Monad + [result (<| (promise.time-out +100) + (wait-many-times (n/inc open-positions) semaphore))] + (assert "Waiting on a semaphore more than the number of open positions blocks the process." + (case result + (#.Some _) + false + + #.None + true))))) + (let [semaphore (/.semaphore open-positions)] + (wrap (do promise.Monad + [_ (: (Promise Top) + (loop [steps (n/* +2 open-positions)] + (if (n/> +0 steps) + (do @ + [_ (/.wait semaphore) + _ (/.signal semaphore)] + (recur (n/dec steps))) + (wrap []))))] + (assert "Signaling a semaphore replenishes its open positions." + true)))) + (let [semaphore (/.semaphore open-positions)] + (wrap (do promise.Monad + [#let [resource (atom.atom "") + blocked (do @ + [_ (wait-many-times open-positions semaphore) + _ (/.wait semaphore) + #let [_ (io.run (atom.update (|>> (format "B")) + resource))]] + (wrap []))] + _ (promise.wait +100) + _ (exec (io.run (atom.update (|>> (format "A")) + resource)) + (/.signal semaphore)) + _ blocked] + (assert "A blocked process can be un-blocked by a signal somewhere else." + (text/= "BA" + (io.run (atom.read resource))))))) + )))) + +(context: "Mutex." + (<| (times +100) + (do @ + [repetitions (|> r.nat (:: @ map (|>> (n/% +100) (n/max +10))))] + ($_ seq + (let [mutex (/.mutex [])] + (wrap (do promise.Monad + [#let [resource (atom.atom "") + expected-As (text.join-with "" (list.repeat repetitions "A")) + expected-Bs (text.join-with "" (list.repeat repetitions "B")) + processA (<| (/.synchronize mutex) + io.io + promise.future + (do io.Monad + [_ (<| (monad.seq @) + (list.repeat repetitions) + (atom.update (|>> (format "A")) resource))] + (wrap []))) + processB (<| (/.synchronize mutex) + io.io + promise.future + (do io.Monad + [_ (<| (monad.seq @) + (list.repeat repetitions) + (atom.update (|>> (format "B")) resource))] + (wrap [])))] + _ processA + _ processB + #let [outcome (io.run (atom.read resource))]] + (assert "Mutexes only allow one process to execute at a time." + (or (text/= (format expected-As expected-Bs) + outcome) + (text/= (format expected-Bs expected-As) + outcome)))))) + )))) + +(def: (waiter resource barrier id) + (-> (Atom Text) /.Barrier Nat (Promise Top)) + (do promise.Monad + [_ (/.block barrier) + #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]] + (wrap []))) + +(context: "Barrier." + (let [limit +10 + barrier (/.barrier (maybe.assume (/.limit limit))) + resource (atom.atom "")] + ($_ seq + (wrap (do promise.Monad + [#let [ids (list.n/range +0 (n/dec limit)) + waiters (list/map (function [id] + (let [process (waiter resource barrier id)] + (exec (io.run (atom.update (|>> (format "_")) resource)) + process))) + ids)] + _ (monad.seq @ waiters) + #let [outcome (io.run (atom.read resource))]] + (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all." + (and (text.ends-with? "__________" outcome) + (list.every? (function [id] + (text.contains? (%n id) outcome)) + ids) + ))))))) diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux index 1ca5482bf..d7764dfa2 100644 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ b/stdlib/test/test/lux/concurrency/stm.lux @@ -18,7 +18,10 @@ (do io.Monad [#let [output (atom (list))] _ (frp.listen (function [value] - (atom.update (|>> (#.Cons value)) output)) + ## TODO: Simplify when possible. + (do @ + [_ (atom.update (|>> (#.Cons value)) output)] + (wrap []))) channel)] (wrap output))) -- cgit v1.2.3