diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 76 |
1 files changed, 38 insertions, 38 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index a405b7b3e..9e6ff9b29 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -22,25 +22,25 @@ ["." promise (#+ Promise Resolver)]]) (type: State - {#max-positions Nat - #open-positions Int - #waiting-list (Queue (Resolver Any))}) + {#max_positions Nat + #open_positions Int + #waiting_list (Queue (Resolver Any))}) (abstract: #export Semaphore (Atom State) {#.doc "A tool for controlling access to resources by multiple concurrent processes."} - (def: most-positions-possible + (def: most_positions_possible (.nat (\ i.interval top))) - (def: #export (semaphore initial-open-positions) + (def: #export (semaphore initial_open_positions) (-> Nat Semaphore) - (let [max-positions (n.min initial-open-positions - ..most-positions-possible)] - (:abstraction (atom.atom {#max-positions max-positions - #open-positions (.int max-positions) - #waiting-list queue.empty})))) + (let [max_positions (n.min initial_open_positions + ..most_positions_possible)] + (:abstraction (atom.atom {#max_positions max_positions + #open_positions (.int max_positions) + #waiting_list queue.empty})))) (def: #export (wait semaphore) (Ex [k] (-> Semaphore (Promise Any))) @@ -52,13 +52,13 @@ (do io.monad [state (atom.read semaphore) #let [[ready? state'] (: [Bit State] - (if (i.> +0 (get@ #open-positions state)) + (if (i.> +0 (get@ #open_positions state)) [true (|> state - (update@ #open-positions dec))] + (update@ #open_positions dec))] [false (|> state - (update@ #open-positions dec) - (update@ #waiting-list (queue.push sink)))]))] - success? (atom.compare-and-swap state state' semaphore)] + (update@ #open_positions dec) + (update@ #waiting_list (queue.push sink)))]))] + success? (atom.compare_and_swap state state' semaphore)] (if success? (if ready? (sink []) @@ -66,9 +66,9 @@ (recur []))))) signal))) - (exception: #export (semaphore-is-maxed-out {max-positions Nat}) + (exception: #export (semaphore_is_maxed_out {max_positions Nat}) (exception.report - ["Max Positions" (%.nat max-positions)])) + ["Max Positions" (%.nat max_positions)])) (def: #export (signal semaphore) (Ex [k] (-> Semaphore (Promise (Try Int)))) @@ -77,29 +77,29 @@ (loop [_ []] (do {! io.monad} [state (atom.read semaphore) - #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit] - (case (queue.peek (get@ #waiting-list state)) + #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit] + (case (queue.peek (get@ #waiting_list state)) #.None - (if (n.= (get@ #max-positions state) - (.nat (get@ #open-positions state))) + (if (n.= (get@ #max_positions state) + (.nat (get@ #open_positions state))) [#.None state true] [#.None - (update@ #open-positions inc state) + (update@ #open_positions inc state) false]) (#.Some head) [(#.Some head) (|> state - (update@ #open-positions inc) - (update@ #waiting-list queue.pop)) + (update@ #open_positions inc) + (update@ #waiting_list queue.pop)) false]))]] - (if maxed-out? - (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)])) + (if maxed_out? + (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)])) (do ! - [#let [open-positions (get@ #open-positions state')] - success? (atom.compare-and-swap state state' semaphore)] + [#let [open_positions (get@ #open_positions state')] + success? (atom.compare_and_swap state state' semaphore)] (if success? (do ! [_ (case ?sink @@ -108,7 +108,7 @@ (#.Some sink) (sink []))] - (wrap (#try.Success open-positions))) + (wrap (#try.Success open_positions))) (recur []))))))))) ) @@ -144,8 +144,8 @@ (abstract: #export Barrier {#limit Limit #count (Atom Nat) - #start-turnstile Semaphore - #end-turnstile Semaphore} + #start_turnstile Semaphore + #end_turnstile Semaphore} {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} @@ -153,10 +153,10 @@ (-> Limit Barrier) (:abstraction {#limit limit #count (atom.atom 0) - #start-turnstile (semaphore 0) - #end-turnstile (semaphore 0)})) + #start_turnstile (semaphore 0) + #end_turnstile (semaphore 0)})) - (def: (un-block times turnstile) + (def: (un_block times turnstile) (-> Nat Semaphore (Promise Any)) (loop [step 0] (if (n.< times step) @@ -169,16 +169,16 @@ [(def: (<phase> (^:representation barrier)) (-> Barrier (Promise Any)) (do promise.monad - [#let [limit (refinement.un-refine (get@ #limit barrier)) + [#let [limit (refinement.un_refine (get@ #limit barrier)) goal <goal> count (io.run (atom.update <update> (get@ #count barrier))) reached? (n.= goal count)]] (if reached? - (un-block limit (get@ <turnstile> barrier)) + (un_block limit (get@ <turnstile> barrier)) (wait (get@ <turnstile> barrier)))))] - [start inc limit #start-turnstile] - [end dec 0 #end-turnstile] + [start inc limit #start_turnstile] + [end dec 0 #end_turnstile] ) (def: #export (block barrier) |