diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 111 |
1 files changed, 47 insertions, 64 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index 5be5582de..0e8fa2b94 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -3,6 +3,7 @@ [abstract [monad (#+ do)]] [control + [pipe (#+ if>)] ["." io (#+ IO)] ["." try (#+ Try)] ["." exception (#+ exception:)]] @@ -48,23 +49,19 @@ (let [semaphore (:representation semaphore) [signal sink] (: [(Promise Any) (Resolver Any)] (promise.promise []))] - (exec (promise.future - (loop [_ []] + (exec (io.run + (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))] (do io.monad - [state (atom.read semaphore) - #let [[ready? state'] (: [Bit State] - (if (i.> +0 (get@ #open_positions state)) - [true (|> state - (update@ #open_positions dec))] - [false (|> state - (update@ #open_positions dec) - (update@ #waiting_list (queue.push sink)))]))] - success? (atom.compare_and_swap state state' semaphore)] - (if success? - (if ready? - (sink []) - (wrap false)) - (recur []))))) + [[_ state'] (atom.update (|>> (update@ #open_positions dec) + (if> [<had_open_position?>] + [] + [(update@ #waiting_list (queue.push sink))])) + semaphore)] + (with_expansions [<go_ahead> (sink []) + <get_in_line> (wrap false)] + (if (|> state' <had_open_position?>) + <go_ahead> + <get_in_line>))))) signal))) (exception: #export (semaphore_is_maxed_out {max_positions Nat}) @@ -75,42 +72,25 @@ (Ex [k] (-> Semaphore (Promise (Try Int)))) (let [semaphore (:representation semaphore)] (promise.future - (loop [_ []] - (do {! io.monad} - [state (atom.read semaphore) - #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit] - (case (queue.peek (get@ #waiting_list state)) - #.None - (if (n.= (get@ #max_positions state) - (.nat (get@ #open_positions state))) - [#.None - state - true] - [#.None - (update@ #open_positions inc state) - false]) - - (#.Some head) - [(#.Some head) - (|> state - (update@ #open_positions inc) - (update@ #waiting_list queue.pop)) - false]))]] - (if maxed_out? - (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)])) - (do ! - [#let [open_positions (get@ #open_positions state')] - success? (atom.compare_and_swap state state' semaphore)] - (if success? - (do ! - [_ (case ?sink - #.None - (wrap true) - - (#.Some sink) - (sink []))] - (wrap (#try.Success open_positions))) - (recur []))))))))) + (do {! io.monad} + [[pre post] (atom.update (function (_ state) + (if (i.= (.int (get@ #max_positions state)) + (get@ #open_positions state)) + state + (|> state + (update@ #open_positions inc) + (update@ #waiting_list queue.pop)))) + semaphore)] + (if (is? pre post) + (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)])) + (do ! + [_ (case (queue.peek (get@ #waiting_list pre)) + #.None + (wrap true) + + (#.Some sink) + (sink []))] + (wrap (#try.Success (get@ #open_positions post))))))))) ) (abstract: #export Mutex @@ -124,23 +104,26 @@ (def: acquire (-> Mutex (Promise Any)) - (|>> :representation wait)) + (|>> :representation ..wait)) (def: release (-> Mutex (Promise Any)) - (|>> :representation signal)) + (|>> :representation ..signal)) (def: #export (synchronize mutex procedure) (All [a] (-> Mutex (IO (Promise a)) (Promise a))) (do promise.monad - [_ (acquire mutex) + [_ (..acquire mutex) output (io.run procedure) - _ (release mutex)] + _ (..release mutex)] (wrap output))) ) -(def: #export limit (refinement.refinement (n.> 0))) -(type: #export Limit (:~ (refinement.type limit))) +(def: #export limit + (refinement.refinement (n.> 0))) + +(type: #export Limit + (:~ (refinement.type limit))) (abstract: #export Barrier {#limit Limit @@ -154,15 +137,15 @@ (-> Limit Barrier) (:abstraction {#limit limit #count (atom.atom 0) - #start_turnstile (semaphore 0) - #end_turnstile (semaphore 0)})) + #start_turnstile (..semaphore 0) + #end_turnstile (..semaphore 0)})) (def: (un_block times turnstile) (-> Nat Semaphore (Promise Any)) (loop [step 0] (if (n.< times step) (do promise.monad - [_ (..signal turnstile)] + [outcome (..signal turnstile)] (recur (inc step))) (\ promise.monad wrap [])))) @@ -172,11 +155,11 @@ (do promise.monad [#let [limit (refinement.un_refine (get@ #limit barrier)) goal <goal> - count (io.run (atom.update <update> (get@ #count barrier))) + [_ count] (io.run (atom.update <update> (get@ #count barrier))) reached? (n.= goal count)]] (if reached? - (un_block limit (get@ <turnstile> barrier)) - (wait (get@ <turnstile> barrier)))))] + (..un_block (dec limit) (get@ <turnstile> barrier)) + (..wait (get@ <turnstile> barrier)))))] [start inc limit #start_turnstile] [end dec 0 #end_turnstile] |