aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/semaphore.lux')
-rw-r--r--stdlib/source/library/lux/control/concurrency/semaphore.lux234
1 files changed, 114 insertions, 120 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/semaphore.lux b/stdlib/source/library/lux/control/concurrency/semaphore.lux
index 3b0461579..bcbd71158 100644
--- a/stdlib/source/library/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/library/lux/control/concurrency/semaphore.lux
@@ -31,95 +31,91 @@
#waiting_list (Queue (Resolver Any))]))
(abstract: .public Semaphore
- {}
-
(Atom State)
- (def: most_positions_possible
- (.nat (\ i.interval top)))
-
- (def: .public (semaphore initial_open_positions)
- (-> Nat Semaphore)
- (let [max_positions (n.min initial_open_positions
- ..most_positions_possible)]
- (:abstraction (atom.atom [#max_positions max_positions
- #open_positions (.int max_positions)
- #waiting_list queue.empty]))))
-
- (def: .public (wait! semaphore)
- (Ex (_ k) (-> Semaphore (Async Any)))
- (let [semaphore (:representation semaphore)
- [signal sink] (: [(Async Any) (Resolver Any)]
- (async.async []))]
- (exec
- (io.run!
- (with_expansions [<had_open_position?> (as_is (value@ #open_positions) (i.> -1))]
- (do io.monad
- [[_ state'] (atom.update! (|>> (revised@ #open_positions --)
- (if> [<had_open_position?>]
- []
- [(revised@ #waiting_list (queue.end sink))]))
- semaphore)]
- (with_expansions [<go_ahead> (sink [])
- <get_in_line> (in false)]
- (if (|> state' <had_open_position?>)
- <go_ahead>
- <get_in_line>)))))
- signal)))
-
- (exception: .public (semaphore_is_maxed_out {max_positions Nat})
- (exception.report
- ["Max Positions" (%.nat max_positions)]))
-
- (def: .public (signal! semaphore)
- (Ex (_ k) (-> Semaphore (Async (Try Int))))
- (let [semaphore (:representation semaphore)]
- (async.future
- (do [! io.monad]
- [[pre post] (atom.update! (function (_ state)
- (if (i.= (.int (value@ #max_positions state))
- (value@ #open_positions state))
- state
- (|> state
- (revised@ #open_positions ++)
- (revised@ #waiting_list queue.next))))
- semaphore)]
- (if (same? pre post)
- (in (exception.except ..semaphore_is_maxed_out [(value@ #max_positions pre)]))
- (do !
- [_ (case (queue.front (value@ #waiting_list pre))
- #.None
- (in true)
-
- (#.Some sink)
- (sink []))]
- (in (#try.Success (value@ #open_positions post)))))))))
+ [(def: most_positions_possible
+ (.nat (\ i.interval top)))
+
+ (def: .public (semaphore initial_open_positions)
+ (-> Nat Semaphore)
+ (let [max_positions (n.min initial_open_positions
+ ..most_positions_possible)]
+ (:abstraction (atom.atom [#max_positions max_positions
+ #open_positions (.int max_positions)
+ #waiting_list queue.empty]))))
+
+ (def: .public (wait! semaphore)
+ (Ex (_ k) (-> Semaphore (Async Any)))
+ (let [semaphore (:representation semaphore)
+ [signal sink] (: [(Async Any) (Resolver Any)]
+ (async.async []))]
+ (exec
+ (io.run!
+ (with_expansions [<had_open_position?> (as_is (value@ #open_positions) (i.> -1))]
+ (do io.monad
+ [[_ state'] (atom.update! (|>> (revised@ #open_positions --)
+ (if> [<had_open_position?>]
+ []
+ [(revised@ #waiting_list (queue.end sink))]))
+ semaphore)]
+ (with_expansions [<go_ahead> (sink [])
+ <get_in_line> (in false)]
+ (if (|> state' <had_open_position?>)
+ <go_ahead>
+ <get_in_line>)))))
+ signal)))
+
+ (exception: .public (semaphore_is_maxed_out {max_positions Nat})
+ (exception.report
+ ["Max Positions" (%.nat max_positions)]))
+
+ (def: .public (signal! semaphore)
+ (Ex (_ k) (-> Semaphore (Async (Try Int))))
+ (let [semaphore (:representation semaphore)]
+ (async.future
+ (do [! io.monad]
+ [[pre post] (atom.update! (function (_ state)
+ (if (i.= (.int (value@ #max_positions state))
+ (value@ #open_positions state))
+ state
+ (|> state
+ (revised@ #open_positions ++)
+ (revised@ #waiting_list queue.next))))
+ semaphore)]
+ (if (same? pre post)
+ (in (exception.except ..semaphore_is_maxed_out [(value@ #max_positions pre)]))
+ (do !
+ [_ (case (queue.front (value@ #waiting_list pre))
+ #.None
+ (in true)
+
+ (#.Some sink)
+ (sink []))]
+ (in (#try.Success (value@ #open_positions post)))))))))]
)
(abstract: .public Mutex
- {}
-
Semaphore
- (def: .public (mutex _)
- (-> Any Mutex)
- (:abstraction (semaphore 1)))
-
- (def: acquire!
- (-> Mutex (Async Any))
- (|>> :representation ..wait!))
-
- (def: release!
- (-> Mutex (Async Any))
- (|>> :representation ..signal!))
-
- (def: .public (synchronize! mutex procedure)
- (All (_ a) (-> Mutex (IO (Async a)) (Async a)))
- (do async.monad
- [_ (..acquire! mutex)
- output (io.run! procedure)
- _ (..release! mutex)]
- (in output)))
+ [(def: .public (mutex _)
+ (-> Any Mutex)
+ (:abstraction (semaphore 1)))
+
+ (def: acquire!
+ (-> Mutex (Async Any))
+ (|>> :representation ..wait!))
+
+ (def: release!
+ (-> Mutex (Async Any))
+ (|>> :representation ..signal!))
+
+ (def: .public (synchronize! mutex procedure)
+ (All (_ a) (-> Mutex (IO (Async a)) (Async a)))
+ (do async.monad
+ [_ (..acquire! mutex)
+ output (io.run! procedure)
+ _ (..release! mutex)]
+ (in output)))]
)
(def: .public limit
@@ -129,49 +125,47 @@
(:~ (refinement.type limit)))
(abstract: .public Barrier
- {}
-
(Record
[#limit Limit
#count (Atom Nat)
#start_turnstile Semaphore
#end_turnstile Semaphore])
- (def: .public (barrier limit)
- (-> Limit Barrier)
- (:abstraction [#limit limit
- #count (atom.atom 0)
- #start_turnstile (..semaphore 0)
- #end_turnstile (..semaphore 0)]))
-
- (def: (un_block! times turnstile)
- (-> Nat Semaphore (Async Any))
- (loop [step 0]
- (if (n.< times step)
+ [(def: .public (barrier limit)
+ (-> Limit Barrier)
+ (:abstraction [#limit limit
+ #count (atom.atom 0)
+ #start_turnstile (..semaphore 0)
+ #end_turnstile (..semaphore 0)]))
+
+ (def: (un_block! times turnstile)
+ (-> Nat Semaphore (Async Any))
+ (loop [step 0]
+ (if (n.< times step)
+ (do async.monad
+ [outcome (..signal! turnstile)]
+ (recur (++ step)))
+ (\ async.monad in []))))
+
+ (template [<phase> <update> <goal> <turnstile>]
+ [(def: (<phase> (^:representation barrier))
+ (-> Barrier (Async Any))
(do async.monad
- [outcome (..signal! turnstile)]
- (recur (++ step)))
- (\ async.monad in []))))
-
- (template [<phase> <update> <goal> <turnstile>]
- [(def: (<phase> (^:representation barrier))
- (-> Barrier (Async Any))
- (do async.monad
- [.let [limit (refinement.value (value@ #limit barrier))
- goal <goal>
- [_ count] (io.run! (atom.update! <update> (value@ #count barrier)))
- reached? (n.= goal count)]]
- (if reached?
- (..un_block! (-- limit) (value@ <turnstile> barrier))
- (..wait! (value@ <turnstile> barrier)))))]
-
- [start! ++ limit #start_turnstile]
- [end! -- 0 #end_turnstile]
- )
-
- (def: .public (block! barrier)
- (-> Barrier (Async Any))
- (do async.monad
- [_ (..start! barrier)]
- (..end! barrier)))
+ [.let [limit (refinement.value (value@ #limit barrier))
+ goal <goal>
+ [_ count] (io.run! (atom.update! <update> (value@ #count barrier)))
+ reached? (n.= goal count)]]
+ (if reached?
+ (..un_block! (-- limit) (value@ <turnstile> barrier))
+ (..wait! (value@ <turnstile> barrier)))))]
+
+ [start! ++ limit #start_turnstile]
+ [end! -- 0 #end_turnstile]
+ )
+
+ (def: .public (block! barrier)
+ (-> Barrier (Async Any))
+ (do async.monad
+ [_ (..start! barrier)]
+ (..end! barrier)))]
)