diff options
author | Eduardo Julian | 2021-09-08 03:08:13 -0400 |
---|---|---|
committer | Eduardo Julian | 2021-09-08 03:08:13 -0400 |
commit | 392582885500d8201bbe502943ca4b02c5c77ac0 (patch) | |
tree | 6e7410546048547560c767dba9c303d3f2f9597a /stdlib/source/library/lux/control/concurrency/semaphore.lux | |
parent | 609cc6c16e75c13d87183c38245136fa038b0496 (diff) |
Normalized the syntax of "abstract:" and "actor:".
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/semaphore.lux')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/semaphore.lux | 234 |
1 files changed, 114 insertions, 120 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/semaphore.lux b/stdlib/source/library/lux/control/concurrency/semaphore.lux index 3b0461579..bcbd71158 100644 --- a/stdlib/source/library/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/library/lux/control/concurrency/semaphore.lux @@ -31,95 +31,91 @@ #waiting_list (Queue (Resolver Any))])) (abstract: .public Semaphore - {} - (Atom State) - (def: most_positions_possible - (.nat (\ i.interval top))) - - (def: .public (semaphore initial_open_positions) - (-> Nat Semaphore) - (let [max_positions (n.min initial_open_positions - ..most_positions_possible)] - (:abstraction (atom.atom [#max_positions max_positions - #open_positions (.int max_positions) - #waiting_list queue.empty])))) - - (def: .public (wait! semaphore) - (Ex (_ k) (-> Semaphore (Async Any))) - (let [semaphore (:representation semaphore) - [signal sink] (: [(Async Any) (Resolver Any)] - (async.async []))] - (exec - (io.run! - (with_expansions [<had_open_position?> (as_is (value@ #open_positions) (i.> -1))] - (do io.monad - [[_ state'] (atom.update! (|>> (revised@ #open_positions --) - (if> [<had_open_position?>] - [] - [(revised@ #waiting_list (queue.end sink))])) - semaphore)] - (with_expansions [<go_ahead> (sink []) - <get_in_line> (in false)] - (if (|> state' <had_open_position?>) - <go_ahead> - <get_in_line>))))) - signal))) - - (exception: .public (semaphore_is_maxed_out {max_positions Nat}) - (exception.report - ["Max Positions" (%.nat max_positions)])) - - (def: .public (signal! semaphore) - (Ex (_ k) (-> Semaphore (Async (Try Int)))) - (let [semaphore (:representation semaphore)] - (async.future - (do [! io.monad] - [[pre post] (atom.update! (function (_ state) - (if (i.= (.int (value@ #max_positions state)) - (value@ #open_positions state)) - state - (|> state - (revised@ #open_positions ++) - (revised@ #waiting_list queue.next)))) - semaphore)] - (if (same? pre post) - (in (exception.except ..semaphore_is_maxed_out [(value@ #max_positions pre)])) - (do ! - [_ (case (queue.front (value@ #waiting_list pre)) - #.None - (in true) - - (#.Some sink) - (sink []))] - (in (#try.Success (value@ #open_positions post))))))))) + [(def: most_positions_possible + (.nat (\ i.interval top))) + + (def: .public (semaphore initial_open_positions) + (-> Nat Semaphore) + (let [max_positions (n.min initial_open_positions + ..most_positions_possible)] + (:abstraction (atom.atom [#max_positions max_positions + #open_positions (.int max_positions) + #waiting_list queue.empty])))) + + (def: .public (wait! semaphore) + (Ex (_ k) (-> Semaphore (Async Any))) + (let [semaphore (:representation semaphore) + [signal sink] (: [(Async Any) (Resolver Any)] + (async.async []))] + (exec + (io.run! + (with_expansions [<had_open_position?> (as_is (value@ #open_positions) (i.> -1))] + (do io.monad + [[_ state'] (atom.update! (|>> (revised@ #open_positions --) + (if> [<had_open_position?>] + [] + [(revised@ #waiting_list (queue.end sink))])) + semaphore)] + (with_expansions [<go_ahead> (sink []) + <get_in_line> (in false)] + (if (|> state' <had_open_position?>) + <go_ahead> + <get_in_line>))))) + signal))) + + (exception: .public (semaphore_is_maxed_out {max_positions Nat}) + (exception.report + ["Max Positions" (%.nat max_positions)])) + + (def: .public (signal! semaphore) + (Ex (_ k) (-> Semaphore (Async (Try Int)))) + (let [semaphore (:representation semaphore)] + (async.future + (do [! io.monad] + [[pre post] (atom.update! (function (_ state) + (if (i.= (.int (value@ #max_positions state)) + (value@ #open_positions state)) + state + (|> state + (revised@ #open_positions ++) + (revised@ #waiting_list queue.next)))) + semaphore)] + (if (same? pre post) + (in (exception.except ..semaphore_is_maxed_out [(value@ #max_positions pre)])) + (do ! + [_ (case (queue.front (value@ #waiting_list pre)) + #.None + (in true) + + (#.Some sink) + (sink []))] + (in (#try.Success (value@ #open_positions post)))))))))] ) (abstract: .public Mutex - {} - Semaphore - (def: .public (mutex _) - (-> Any Mutex) - (:abstraction (semaphore 1))) - - (def: acquire! - (-> Mutex (Async Any)) - (|>> :representation ..wait!)) - - (def: release! - (-> Mutex (Async Any)) - (|>> :representation ..signal!)) - - (def: .public (synchronize! mutex procedure) - (All (_ a) (-> Mutex (IO (Async a)) (Async a))) - (do async.monad - [_ (..acquire! mutex) - output (io.run! procedure) - _ (..release! mutex)] - (in output))) + [(def: .public (mutex _) + (-> Any Mutex) + (:abstraction (semaphore 1))) + + (def: acquire! + (-> Mutex (Async Any)) + (|>> :representation ..wait!)) + + (def: release! + (-> Mutex (Async Any)) + (|>> :representation ..signal!)) + + (def: .public (synchronize! mutex procedure) + (All (_ a) (-> Mutex (IO (Async a)) (Async a))) + (do async.monad + [_ (..acquire! mutex) + output (io.run! procedure) + _ (..release! mutex)] + (in output)))] ) (def: .public limit @@ -129,49 +125,47 @@ (:~ (refinement.type limit))) (abstract: .public Barrier - {} - (Record [#limit Limit #count (Atom Nat) #start_turnstile Semaphore #end_turnstile Semaphore]) - (def: .public (barrier limit) - (-> Limit Barrier) - (:abstraction [#limit limit - #count (atom.atom 0) - #start_turnstile (..semaphore 0) - #end_turnstile (..semaphore 0)])) - - (def: (un_block! times turnstile) - (-> Nat Semaphore (Async Any)) - (loop [step 0] - (if (n.< times step) + [(def: .public (barrier limit) + (-> Limit Barrier) + (:abstraction [#limit limit + #count (atom.atom 0) + #start_turnstile (..semaphore 0) + #end_turnstile (..semaphore 0)])) + + (def: (un_block! times turnstile) + (-> Nat Semaphore (Async Any)) + (loop [step 0] + (if (n.< times step) + (do async.monad + [outcome (..signal! turnstile)] + (recur (++ step))) + (\ async.monad in [])))) + + (template [<phase> <update> <goal> <turnstile>] + [(def: (<phase> (^:representation barrier)) + (-> Barrier (Async Any)) (do async.monad - [outcome (..signal! turnstile)] - (recur (++ step))) - (\ async.monad in [])))) - - (template [<phase> <update> <goal> <turnstile>] - [(def: (<phase> (^:representation barrier)) - (-> Barrier (Async Any)) - (do async.monad - [.let [limit (refinement.value (value@ #limit barrier)) - goal <goal> - [_ count] (io.run! (atom.update! <update> (value@ #count barrier))) - reached? (n.= goal count)]] - (if reached? - (..un_block! (-- limit) (value@ <turnstile> barrier)) - (..wait! (value@ <turnstile> barrier)))))] - - [start! ++ limit #start_turnstile] - [end! -- 0 #end_turnstile] - ) - - (def: .public (block! barrier) - (-> Barrier (Async Any)) - (do async.monad - [_ (..start! barrier)] - (..end! barrier))) + [.let [limit (refinement.value (value@ #limit barrier)) + goal <goal> + [_ count] (io.run! (atom.update! <update> (value@ #count barrier))) + reached? (n.= goal count)]] + (if reached? + (..un_block! (-- limit) (value@ <turnstile> barrier)) + (..wait! (value@ <turnstile> barrier)))))] + + [start! ++ limit #start_turnstile] + [end! -- 0 #end_turnstile] + ) + + (def: .public (block! barrier) + (-> Barrier (Async Any)) + (do async.monad + [_ (..start! barrier)] + (..end! barrier)))] ) |