aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux111
1 files changed, 47 insertions, 64 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
index 5be5582de..0e8fa2b94 100644
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -3,6 +3,7 @@
[abstract
[monad (#+ do)]]
[control
+ [pipe (#+ if>)]
["." io (#+ IO)]
["." try (#+ Try)]
["." exception (#+ exception:)]]
@@ -48,23 +49,19 @@
(let [semaphore (:representation semaphore)
[signal sink] (: [(Promise Any) (Resolver Any)]
(promise.promise []))]
- (exec (promise.future
- (loop [_ []]
+ (exec (io.run
+ (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))]
(do io.monad
- [state (atom.read semaphore)
- #let [[ready? state'] (: [Bit State]
- (if (i.> +0 (get@ #open_positions state))
- [true (|> state
- (update@ #open_positions dec))]
- [false (|> state
- (update@ #open_positions dec)
- (update@ #waiting_list (queue.push sink)))]))]
- success? (atom.compare_and_swap state state' semaphore)]
- (if success?
- (if ready?
- (sink [])
- (wrap false))
- (recur [])))))
+ [[_ state'] (atom.update (|>> (update@ #open_positions dec)
+ (if> [<had_open_position?>]
+ []
+ [(update@ #waiting_list (queue.push sink))]))
+ semaphore)]
+ (with_expansions [<go_ahead> (sink [])
+ <get_in_line> (wrap false)]
+ (if (|> state' <had_open_position?>)
+ <go_ahead>
+ <get_in_line>)))))
signal)))
(exception: #export (semaphore_is_maxed_out {max_positions Nat})
@@ -75,42 +72,25 @@
(Ex [k] (-> Semaphore (Promise (Try Int))))
(let [semaphore (:representation semaphore)]
(promise.future
- (loop [_ []]
- (do {! io.monad}
- [state (atom.read semaphore)
- #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit]
- (case (queue.peek (get@ #waiting_list state))
- #.None
- (if (n.= (get@ #max_positions state)
- (.nat (get@ #open_positions state)))
- [#.None
- state
- true]
- [#.None
- (update@ #open_positions inc state)
- false])
-
- (#.Some head)
- [(#.Some head)
- (|> state
- (update@ #open_positions inc)
- (update@ #waiting_list queue.pop))
- false]))]]
- (if maxed_out?
- (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)]))
- (do !
- [#let [open_positions (get@ #open_positions state')]
- success? (atom.compare_and_swap state state' semaphore)]
- (if success?
- (do !
- [_ (case ?sink
- #.None
- (wrap true)
-
- (#.Some sink)
- (sink []))]
- (wrap (#try.Success open_positions)))
- (recur [])))))))))
+ (do {! io.monad}
+ [[pre post] (atom.update (function (_ state)
+ (if (i.= (.int (get@ #max_positions state))
+ (get@ #open_positions state))
+ state
+ (|> state
+ (update@ #open_positions inc)
+ (update@ #waiting_list queue.pop))))
+ semaphore)]
+ (if (is? pre post)
+ (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)]))
+ (do !
+ [_ (case (queue.peek (get@ #waiting_list pre))
+ #.None
+ (wrap true)
+
+ (#.Some sink)
+ (sink []))]
+ (wrap (#try.Success (get@ #open_positions post)))))))))
)
(abstract: #export Mutex
@@ -124,23 +104,26 @@
(def: acquire
(-> Mutex (Promise Any))
- (|>> :representation wait))
+ (|>> :representation ..wait))
(def: release
(-> Mutex (Promise Any))
- (|>> :representation signal))
+ (|>> :representation ..signal))
(def: #export (synchronize mutex procedure)
(All [a] (-> Mutex (IO (Promise a)) (Promise a)))
(do promise.monad
- [_ (acquire mutex)
+ [_ (..acquire mutex)
output (io.run procedure)
- _ (release mutex)]
+ _ (..release mutex)]
(wrap output)))
)
-(def: #export limit (refinement.refinement (n.> 0)))
-(type: #export Limit (:~ (refinement.type limit)))
+(def: #export limit
+ (refinement.refinement (n.> 0)))
+
+(type: #export Limit
+ (:~ (refinement.type limit)))
(abstract: #export Barrier
{#limit Limit
@@ -154,15 +137,15 @@
(-> Limit Barrier)
(:abstraction {#limit limit
#count (atom.atom 0)
- #start_turnstile (semaphore 0)
- #end_turnstile (semaphore 0)}))
+ #start_turnstile (..semaphore 0)
+ #end_turnstile (..semaphore 0)}))
(def: (un_block times turnstile)
(-> Nat Semaphore (Promise Any))
(loop [step 0]
(if (n.< times step)
(do promise.monad
- [_ (..signal turnstile)]
+ [outcome (..signal turnstile)]
(recur (inc step)))
(\ promise.monad wrap []))))
@@ -172,11 +155,11 @@
(do promise.monad
[#let [limit (refinement.un_refine (get@ #limit barrier))
goal <goal>
- count (io.run (atom.update <update> (get@ #count barrier)))
+ [_ count] (io.run (atom.update <update> (get@ #count barrier)))
reached? (n.= goal count)]]
(if reached?
- (un_block limit (get@ <turnstile> barrier))
- (wait (get@ <turnstile> barrier)))))]
+ (..un_block (dec limit) (get@ <turnstile> barrier))
+ (..wait (get@ <turnstile> barrier)))))]
[start inc limit #start_turnstile]
[end dec 0 #end_turnstile]