aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux76
1 files changed, 38 insertions, 38 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
index a405b7b3e..9e6ff9b29 100644
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -22,25 +22,25 @@
["." promise (#+ Promise Resolver)]])
(type: State
- {#max-positions Nat
- #open-positions Int
- #waiting-list (Queue (Resolver Any))})
+ {#max_positions Nat
+ #open_positions Int
+ #waiting_list (Queue (Resolver Any))})
(abstract: #export Semaphore
(Atom State)
{#.doc "A tool for controlling access to resources by multiple concurrent processes."}
- (def: most-positions-possible
+ (def: most_positions_possible
(.nat (\ i.interval top)))
- (def: #export (semaphore initial-open-positions)
+ (def: #export (semaphore initial_open_positions)
(-> Nat Semaphore)
- (let [max-positions (n.min initial-open-positions
- ..most-positions-possible)]
- (:abstraction (atom.atom {#max-positions max-positions
- #open-positions (.int max-positions)
- #waiting-list queue.empty}))))
+ (let [max_positions (n.min initial_open_positions
+ ..most_positions_possible)]
+ (:abstraction (atom.atom {#max_positions max_positions
+ #open_positions (.int max_positions)
+ #waiting_list queue.empty}))))
(def: #export (wait semaphore)
(Ex [k] (-> Semaphore (Promise Any)))
@@ -52,13 +52,13 @@
(do io.monad
[state (atom.read semaphore)
#let [[ready? state'] (: [Bit State]
- (if (i.> +0 (get@ #open-positions state))
+ (if (i.> +0 (get@ #open_positions state))
[true (|> state
- (update@ #open-positions dec))]
+ (update@ #open_positions dec))]
[false (|> state
- (update@ #open-positions dec)
- (update@ #waiting-list (queue.push sink)))]))]
- success? (atom.compare-and-swap state state' semaphore)]
+ (update@ #open_positions dec)
+ (update@ #waiting_list (queue.push sink)))]))]
+ success? (atom.compare_and_swap state state' semaphore)]
(if success?
(if ready?
(sink [])
@@ -66,9 +66,9 @@
(recur [])))))
signal)))
- (exception: #export (semaphore-is-maxed-out {max-positions Nat})
+ (exception: #export (semaphore_is_maxed_out {max_positions Nat})
(exception.report
- ["Max Positions" (%.nat max-positions)]))
+ ["Max Positions" (%.nat max_positions)]))
(def: #export (signal semaphore)
(Ex [k] (-> Semaphore (Promise (Try Int))))
@@ -77,29 +77,29 @@
(loop [_ []]
(do {! io.monad}
[state (atom.read semaphore)
- #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit]
- (case (queue.peek (get@ #waiting-list state))
+ #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit]
+ (case (queue.peek (get@ #waiting_list state))
#.None
- (if (n.= (get@ #max-positions state)
- (.nat (get@ #open-positions state)))
+ (if (n.= (get@ #max_positions state)
+ (.nat (get@ #open_positions state)))
[#.None
state
true]
[#.None
- (update@ #open-positions inc state)
+ (update@ #open_positions inc state)
false])
(#.Some head)
[(#.Some head)
(|> state
- (update@ #open-positions inc)
- (update@ #waiting-list queue.pop))
+ (update@ #open_positions inc)
+ (update@ #waiting_list queue.pop))
false]))]]
- (if maxed-out?
- (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)]))
+ (if maxed_out?
+ (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)]))
(do !
- [#let [open-positions (get@ #open-positions state')]
- success? (atom.compare-and-swap state state' semaphore)]
+ [#let [open_positions (get@ #open_positions state')]
+ success? (atom.compare_and_swap state state' semaphore)]
(if success?
(do !
[_ (case ?sink
@@ -108,7 +108,7 @@
(#.Some sink)
(sink []))]
- (wrap (#try.Success open-positions)))
+ (wrap (#try.Success open_positions)))
(recur [])))))))))
)
@@ -144,8 +144,8 @@
(abstract: #export Barrier
{#limit Limit
#count (Atom Nat)
- #start-turnstile Semaphore
- #end-turnstile Semaphore}
+ #start_turnstile Semaphore
+ #end_turnstile Semaphore}
{#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
@@ -153,10 +153,10 @@
(-> Limit Barrier)
(:abstraction {#limit limit
#count (atom.atom 0)
- #start-turnstile (semaphore 0)
- #end-turnstile (semaphore 0)}))
+ #start_turnstile (semaphore 0)
+ #end_turnstile (semaphore 0)}))
- (def: (un-block times turnstile)
+ (def: (un_block times turnstile)
(-> Nat Semaphore (Promise Any))
(loop [step 0]
(if (n.< times step)
@@ -169,16 +169,16 @@
[(def: (<phase> (^:representation barrier))
(-> Barrier (Promise Any))
(do promise.monad
- [#let [limit (refinement.un-refine (get@ #limit barrier))
+ [#let [limit (refinement.un_refine (get@ #limit barrier))
goal <goal>
count (io.run (atom.update <update> (get@ #count barrier)))
reached? (n.= goal count)]]
(if reached?
- (un-block limit (get@ <turnstile> barrier))
+ (un_block limit (get@ <turnstile> barrier))
(wait (get@ <turnstile> barrier)))))]
- [start inc limit #start-turnstile]
- [end dec 0 #end-turnstile]
+ [start inc limit #start_turnstile]
+ [end dec 0 #end_turnstile]
)
(def: #export (block barrier)