diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 173 |
1 files changed, 0 insertions, 173 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux deleted file mode 100644 index 0e8fa2b94..000000000 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ /dev/null @@ -1,173 +0,0 @@ -(.module: - [lux #* - [abstract - [monad (#+ do)]] - [control - [pipe (#+ if>)] - ["." io (#+ IO)] - ["." try (#+ Try)] - ["." exception (#+ exception:)]] - [data - [text - ["%" format (#+ format)]] - [collection - ["." queue (#+ Queue)]]] - [math - [number - ["n" nat] - ["i" int]]] - [type - abstract - ["." refinement]]] - [// - ["." atom (#+ Atom)] - ["." promise (#+ Promise Resolver)]]) - -(type: State - {#max_positions Nat - #open_positions Int - #waiting_list (Queue (Resolver Any))}) - -(abstract: #export Semaphore - (Atom State) - - {#.doc "A tool for controlling access to resources by multiple concurrent processes."} - - (def: most_positions_possible - (.nat (\ i.interval top))) - - (def: #export (semaphore initial_open_positions) - (-> Nat Semaphore) - (let [max_positions (n.min initial_open_positions - ..most_positions_possible)] - (:abstraction (atom.atom {#max_positions max_positions - #open_positions (.int max_positions) - #waiting_list queue.empty})))) - - (def: #export (wait semaphore) - (Ex [k] (-> Semaphore (Promise Any))) - (let [semaphore (:representation semaphore) - [signal sink] (: [(Promise Any) (Resolver Any)] - (promise.promise []))] - (exec (io.run - (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))] - (do io.monad - [[_ state'] (atom.update (|>> (update@ #open_positions dec) - (if> [<had_open_position?>] - [] - [(update@ #waiting_list (queue.push sink))])) - semaphore)] - (with_expansions [<go_ahead> (sink []) - <get_in_line> (wrap false)] - (if (|> state' <had_open_position?>) - <go_ahead> - <get_in_line>))))) - signal))) - - (exception: #export (semaphore_is_maxed_out {max_positions Nat}) - (exception.report - ["Max Positions" (%.nat max_positions)])) - - (def: #export (signal semaphore) - (Ex [k] (-> Semaphore (Promise (Try Int)))) - (let [semaphore (:representation semaphore)] - (promise.future - (do {! io.monad} - [[pre post] (atom.update (function (_ state) - (if (i.= (.int (get@ #max_positions state)) - (get@ #open_positions state)) - state - (|> state - (update@ #open_positions inc) - (update@ #waiting_list queue.pop)))) - semaphore)] - (if (is? pre post) - (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)])) - (do ! - [_ (case (queue.peek (get@ #waiting_list pre)) - #.None - (wrap true) - - (#.Some sink) - (sink []))] - (wrap (#try.Success (get@ #open_positions post))))))))) - ) - -(abstract: #export Mutex - Semaphore - - {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} - - (def: #export (mutex _) - (-> Any Mutex) - (:abstraction (semaphore 1))) - - (def: acquire - (-> Mutex (Promise Any)) - (|>> :representation ..wait)) - - (def: release - (-> Mutex (Promise Any)) - (|>> :representation ..signal)) - - (def: #export (synchronize mutex procedure) - (All [a] (-> Mutex (IO (Promise a)) (Promise a))) - (do promise.monad - [_ (..acquire mutex) - output (io.run procedure) - _ (..release mutex)] - (wrap output))) - ) - -(def: #export limit - (refinement.refinement (n.> 0))) - -(type: #export Limit - (:~ (refinement.type limit))) - -(abstract: #export Barrier - {#limit Limit - #count (Atom Nat) - #start_turnstile Semaphore - #end_turnstile Semaphore} - - {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} - - (def: #export (barrier limit) - (-> Limit Barrier) - (:abstraction {#limit limit - #count (atom.atom 0) - #start_turnstile (..semaphore 0) - #end_turnstile (..semaphore 0)})) - - (def: (un_block times turnstile) - (-> Nat Semaphore (Promise Any)) - (loop [step 0] - (if (n.< times step) - (do promise.monad - [outcome (..signal turnstile)] - (recur (inc step))) - (\ promise.monad wrap [])))) - - (template [<phase> <update> <goal> <turnstile>] - [(def: (<phase> (^:representation barrier)) - (-> Barrier (Promise Any)) - (do promise.monad - [#let [limit (refinement.un_refine (get@ #limit barrier)) - goal <goal> - [_ count] (io.run (atom.update <update> (get@ #count barrier))) - reached? (n.= goal count)]] - (if reached? - (..un_block (dec limit) (get@ <turnstile> barrier)) - (..wait (get@ <turnstile> barrier)))))] - - [start inc limit #start_turnstile] - [end dec 0 #end_turnstile] - ) - - (def: #export (block barrier) - (-> Barrier (Promise Any)) - (do promise.monad - [_ (..start barrier)] - (..end barrier))) - ) |