aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/test/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/test/lux/control/concurrency/semaphore.lux')
-rw-r--r--stdlib/source/test/lux/control/concurrency/semaphore.lux143
1 files changed, 143 insertions, 0 deletions
diff --git a/stdlib/source/test/lux/control/concurrency/semaphore.lux b/stdlib/source/test/lux/control/concurrency/semaphore.lux
new file mode 100644
index 000000000..0c4167ee7
--- /dev/null
+++ b/stdlib/source/test/lux/control/concurrency/semaphore.lux
@@ -0,0 +1,143 @@
+(.module:
+ [lux #*
+ [control
+ ["." monad (#+ do)]
+ [concurrency
+ ["/" semaphore]
+ ["." promise (#+ Promise)]
+ ["." atom (#+ Atom)]]]
+ [data
+ ["." maybe]
+ ["." text ("text/." equivalence monoid)
+ format]
+ [collection
+ ["." list ("list/." functor)]]]
+ ["." io]
+ [math
+ ["r" random]]]
+ lux/test)
+
+## (def: (wait-many-times times semaphore)
+## (-> Nat /.Semaphore (Promise Any))
+## (loop [steps times]
+## (if (n/> 0 steps)
+## (do promise.monad
+## [_ (/.wait semaphore)]
+## (recur (dec steps)))
+## (:: promise.monad wrap []))))
+
+## (context: "Semaphore."
+## (<| (times 100)
+## (do @
+## [open-positions (|> r.nat (:: @ map (|>> (n/% 10) (n/max 1))))]
+## ($_ seq
+## (let [semaphore (/.semaphore open-positions)]
+## (wrap (do promise.monad
+## [_ (wait-many-times open-positions semaphore)]
+## (assert "Can wait on a semaphore up to the number of open positions without blocking."
+## true))))
+## (let [semaphore (/.semaphore open-positions)]
+## (wrap (do promise.monad
+## [result (<| (promise.time-out 100)
+## (wait-many-times (inc open-positions) semaphore))]
+## (assert "Waiting on a semaphore more than the number of open positions blocks the process."
+## (case result
+## (#.Some _)
+## false
+
+## #.None
+## true)))))
+## (let [semaphore (/.semaphore open-positions)]
+## (wrap (do promise.monad
+## [_ (: (Promise Any)
+## (loop [steps (n/* 2 open-positions)]
+## (if (n/> 0 steps)
+## (do @
+## [_ (/.wait semaphore)
+## _ (/.signal semaphore)]
+## (recur (dec steps)))
+## (wrap []))))]
+## (assert "Signaling a semaphore replenishes its open positions."
+## true))))
+## (let [semaphore (/.semaphore open-positions)]
+## (wrap (do promise.monad
+## [#let [resource (atom.atom "")
+## blocked (do @
+## [_ (wait-many-times open-positions semaphore)
+## _ (/.wait semaphore)
+## #let [_ (io.run (atom.update (|>> (format "B"))
+## resource))]]
+## (wrap []))]
+## _ (promise.wait 100)
+## _ (exec (io.run (atom.update (|>> (format "A"))
+## resource))
+## (/.signal semaphore))
+## _ blocked]
+## (assert "A blocked process can be un-blocked by a signal somewhere else."
+## (text/= "BA"
+## (io.run (atom.read resource)))))))
+## ))))
+
+## (context: "Mutex."
+## (<| (times 100)
+## (do @
+## [repetitions (|> r.nat (:: @ map (|>> (n/% 100) (n/max 10))))]
+## ($_ seq
+## (let [mutex (/.mutex [])]
+## (wrap (do promise.monad
+## [#let [resource (atom.atom "")
+## expected-As (text.join-with "" (list.repeat repetitions "A"))
+## expected-Bs (text.join-with "" (list.repeat repetitions "B"))
+## processA (<| (/.synchronize mutex)
+## io.io
+## promise.future
+## (do io.monad
+## [_ (<| (monad.seq @)
+## (list.repeat repetitions)
+## (atom.update (|>> (format "A")) resource))]
+## (wrap [])))
+## processB (<| (/.synchronize mutex)
+## io.io
+## promise.future
+## (do io.monad
+## [_ (<| (monad.seq @)
+## (list.repeat repetitions)
+## (atom.update (|>> (format "B")) resource))]
+## (wrap [])))]
+## _ processA
+## _ processB
+## #let [outcome (io.run (atom.read resource))]]
+## (assert "Mutexes only allow one process to execute at a time."
+## (or (text/= (format expected-As expected-Bs)
+## outcome)
+## (text/= (format expected-Bs expected-As)
+## outcome))))))
+## ))))
+
+## (def: (waiter resource barrier id)
+## (-> (Atom Text) /.Barrier Nat (Promise Any))
+## (do promise.monad
+## [_ (/.block barrier)
+## #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]]
+## (wrap [])))
+
+## (context: "Barrier."
+## (let [limit 10
+## barrier (/.barrier (maybe.assume (/.limit limit)))
+## resource (atom.atom "")]
+## ($_ seq
+## (wrap (do promise.monad
+## [#let [ids (list.n/range 0 (dec limit))
+## waiters (list/map (function (_ id)
+## (let [process (waiter resource barrier id)]
+## (exec (io.run (atom.update (|>> (format "_")) resource))
+## process)))
+## ids)]
+## _ (monad.seq @ waiters)
+## #let [outcome (io.run (atom.read resource))]]
+## (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all."
+## (and (text.ends-with? "__________" outcome)
+## (list.every? (function (_ id)
+## (text.contains? (%n id) outcome))
+## ids)
+## )))))))