aboutsummaryrefslogtreecommitdiff
path: root/stdlib/test
diff options
context:
space:
mode:
authorEduardo Julian2018-01-30 01:13:07 -0400
committerEduardo Julian2018-01-30 01:13:07 -0400
commit927694bdd07f25105f28649cf3c93a4275321a12 (patch)
tree8ff4b270b71497e1ceed7d618b13ff5b4951323f /stdlib/test
parentdc0bddf24a9c016756700b84e1905886fed1050b (diff)
- Implemented semaphores, mutexes and barriers.
- Fixed a bug when checking "last-index-of" for Text.
Diffstat (limited to 'stdlib/test')
-rw-r--r--stdlib/test/test/lux/concurrency/frp.lux5
-rw-r--r--stdlib/test/test/lux/concurrency/semaphore.lux138
-rw-r--r--stdlib/test/test/lux/concurrency/stm.lux5
3 files changed, 146 insertions, 2 deletions
diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux
index 5360dcda2..5f7245d0f 100644
--- a/stdlib/test/test/lux/concurrency/frp.lux
+++ b/stdlib/test/test/lux/concurrency/frp.lux
@@ -21,7 +21,10 @@
(do io.Monad<IO>
[#let [output (atom (list))]
_ (frp.listen (function [value]
- (atom.update (|>> (#.Cons value)) output))
+ ## TODO: Simplify when possible.
+ (do @
+ [_ (atom.update (|>> (#.Cons value)) output)]
+ (wrap [])))
channel)]
(wrap output)))
diff --git a/stdlib/test/test/lux/concurrency/semaphore.lux b/stdlib/test/test/lux/concurrency/semaphore.lux
new file mode 100644
index 000000000..59a9618fa
--- /dev/null
+++ b/stdlib/test/test/lux/concurrency/semaphore.lux
@@ -0,0 +1,138 @@
+(.module:
+ lux
+ (lux (control [monad #+ do])
+ (data [maybe]
+ [text "text/" Eq<Text> Monoid<Text>]
+ text/format
+ (coll [list "list/" Functor<List>]))
+ (concurrency ["/" semaphore]
+ [promise #+ Promise]
+ [atom #+ Atom])
+ [io]
+ ["r" math/random])
+ lux/test)
+
+(def: (wait-many-times times semaphore)
+ (-> Nat /.Semaphore (Promise Top))
+ (loop [steps times]
+ (if (n/> +0 steps)
+ (do promise.Monad<Promise>
+ [_ (/.wait semaphore)]
+ (recur (n/dec steps)))
+ (:: promise.Monad<Promise> wrap []))))
+
+(context: "Semaphore."
+ (<| (times +100)
+ (do @
+ [open-positions (|> r.nat (:: @ map (|>> (n/% +10) (n/max +1))))]
+ ($_ seq
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [_ (wait-many-times open-positions semaphore)]
+ (assert "Can wait on a semaphore up to the number of open positions without blocking."
+ true))))
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [result (<| (promise.time-out +100)
+ (wait-many-times (n/inc open-positions) semaphore))]
+ (assert "Waiting on a semaphore more than the number of open positions blocks the process."
+ (case result
+ (#.Some _)
+ false
+
+ #.None
+ true)))))
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [_ (: (Promise Top)
+ (loop [steps (n/* +2 open-positions)]
+ (if (n/> +0 steps)
+ (do @
+ [_ (/.wait semaphore)
+ _ (/.signal semaphore)]
+ (recur (n/dec steps)))
+ (wrap []))))]
+ (assert "Signaling a semaphore replenishes its open positions."
+ true))))
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [#let [resource (atom.atom "")
+ blocked (do @
+ [_ (wait-many-times open-positions semaphore)
+ _ (/.wait semaphore)
+ #let [_ (io.run (atom.update (|>> (format "B"))
+ resource))]]
+ (wrap []))]
+ _ (promise.wait +100)
+ _ (exec (io.run (atom.update (|>> (format "A"))
+ resource))
+ (/.signal semaphore))
+ _ blocked]
+ (assert "A blocked process can be un-blocked by a signal somewhere else."
+ (text/= "BA"
+ (io.run (atom.read resource)))))))
+ ))))
+
+(context: "Mutex."
+ (<| (times +100)
+ (do @
+ [repetitions (|> r.nat (:: @ map (|>> (n/% +100) (n/max +10))))]
+ ($_ seq
+ (let [mutex (/.mutex [])]
+ (wrap (do promise.Monad<Promise>
+ [#let [resource (atom.atom "")
+ expected-As (text.join-with "" (list.repeat repetitions "A"))
+ expected-Bs (text.join-with "" (list.repeat repetitions "B"))
+ processA (<| (/.synchronize mutex)
+ io.io
+ promise.future
+ (do io.Monad<IO>
+ [_ (<| (monad.seq @)
+ (list.repeat repetitions)
+ (atom.update (|>> (format "A")) resource))]
+ (wrap [])))
+ processB (<| (/.synchronize mutex)
+ io.io
+ promise.future
+ (do io.Monad<IO>
+ [_ (<| (monad.seq @)
+ (list.repeat repetitions)
+ (atom.update (|>> (format "B")) resource))]
+ (wrap [])))]
+ _ processA
+ _ processB
+ #let [outcome (io.run (atom.read resource))]]
+ (assert "Mutexes only allow one process to execute at a time."
+ (or (text/= (format expected-As expected-Bs)
+ outcome)
+ (text/= (format expected-Bs expected-As)
+ outcome))))))
+ ))))
+
+(def: (waiter resource barrier id)
+ (-> (Atom Text) /.Barrier Nat (Promise Top))
+ (do promise.Monad<Promise>
+ [_ (/.block barrier)
+ #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]]
+ (wrap [])))
+
+(context: "Barrier."
+ (let [limit +10
+ barrier (/.barrier (maybe.assume (/.limit limit)))
+ resource (atom.atom "")]
+ ($_ seq
+ (wrap (do promise.Monad<Promise>
+ [#let [ids (list.n/range +0 (n/dec limit))
+ waiters (list/map (function [id]
+ (let [process (waiter resource barrier id)]
+ (exec (io.run (atom.update (|>> (format "_")) resource))
+ process)))
+ ids)]
+ _ (monad.seq @ waiters)
+ #let [outcome (io.run (atom.read resource))]]
+ (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all."
+ (and (text.ends-with? "__________" outcome)
+ (list.every? (function [id]
+ (text.contains? (%n id) outcome))
+ ids)
+ )))))))
diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux
index 1ca5482bf..d7764dfa2 100644
--- a/stdlib/test/test/lux/concurrency/stm.lux
+++ b/stdlib/test/test/lux/concurrency/stm.lux
@@ -18,7 +18,10 @@
(do io.Monad<IO>
[#let [output (atom (list))]
_ (frp.listen (function [value]
- (atom.update (|>> (#.Cons value)) output))
+ ## TODO: Simplify when possible.
+ (do @
+ [_ (atom.update (|>> (#.Cons value)) output)]
+ (wrap [])))
channel)]
(wrap output)))