aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux148
1 files changed, 92 insertions, 56 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
index ade45984e..39bac32a1 100644
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -2,79 +2,115 @@
[lux #*
[abstract
[monad (#+ do)]]
+ [control
+ ["." io (#+ IO)]
+ ["." try (#+ Try)]
+ ["." exception (#+ exception:)]]
[data
+ [text
+ ["%" format (#+ format)]]
[number
- ["n" nat]]]
- [control
- ["." io (#+ IO)]]
+ ["n" nat]
+ ["i" int]]
+ [collection
+ ["." queue (#+ Queue)]]]
[type
abstract
- ["." refinement]]]
+ ["." refinement]]
+ ["." macro]]
[//
["." atom (#+ Atom)]
- ["." promise (#+ Promise)]])
+ ["." promise (#+ Promise Resolver)]])
(type: State
- {#open-positions Nat
- #waiting-list (List (Promise Any))})
+ {#max-positions Nat
+ #open-positions Int
+ #waiting-list (Queue (Resolver Any))})
(abstract: #export Semaphore
{#.doc "A tool for controlling access to resources by multiple concurrent processes."}
(Atom State)
- (def: #export (semaphore init-open-positions)
+ (def: most-positions-possible
+ (.nat (:: i.interval top)))
+
+ (def: #export (semaphore initial-open-positions)
(-> Nat Semaphore)
- (:abstraction (atom.atom {#open-positions init-open-positions
- #waiting-list (list)})))
+ (let [max-positions (n.min initial-open-positions
+ ..most-positions-possible)]
+ (:abstraction (atom.atom {#max-positions max-positions
+ #open-positions (.int max-positions)
+ #waiting-list queue.empty}))))
(def: #export (wait semaphore)
(Ex [k] (-> Semaphore (Promise Any)))
- (let [semaphore (:representation semaphore)]
- (io.run
- (loop [signal (: (Promise Any)
- (promise.promise #.None))]
- (do io.monad
- [state (atom.read semaphore)
- #let [[ready? state'] (: [Bit State]
- (case (get@ #open-positions state)
- 0 [#0 (update@ #waiting-list (|>> (#.Cons signal))
- state)]
- _ [#1 (update@ #open-positions dec
- state)]))]
- success? (atom.compare-and-swap state state' semaphore)
- _ (if ready?
- (promise.resolve [] signal)
- (wrap #0))]
- (if success?
- (wrap signal)
- (recur signal)))))))
+ (let [semaphore (:representation semaphore)
+ [signal sink] (: [(Promise Any) (Resolver Any)]
+ (promise.promise []))]
+ (exec (promise.future
+ (loop [_ []]
+ (do io.monad
+ [state (atom.read semaphore)
+ #let [[ready? state'] (: [Bit State]
+ (if (i.> +0 (get@ #open-positions state))
+ [true (|> state
+ (update@ #open-positions dec))]
+ [false (|> state
+ (update@ #open-positions dec)
+ (update@ #waiting-list (queue.push sink)))]))]
+ success? (atom.compare-and-swap state state' semaphore)]
+ (if success?
+ (if ready?
+ (sink [])
+ (wrap false))
+ (recur [])))))
+ signal)))
+
+ (exception: #export (semaphore-is-maxed-out {max-positions Nat})
+ (exception.report
+ ["Max Positions" (%.nat max-positions)]))
(def: #export (signal semaphore)
- (Ex [k] (-> Semaphore (Promise Any)))
+ (Ex [k] (-> Semaphore (Promise (Try Int))))
(let [semaphore (:representation semaphore)]
(promise.future
(loop [_ []]
(do io.monad
[state (atom.read semaphore)
- #let [[?signal state'] (: [(Maybe (Promise Any)) State]
- (case (get@ #waiting-list state)
- #.Nil
- [#.None (update@ #open-positions inc state)]
-
- (#.Cons head tail)
- [(#.Some head) (set@ #waiting-list tail state)]))]
- success? (atom.compare-and-swap state state' semaphore)]
- (if success?
+ #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit]
+ (case (queue.peek (get@ #waiting-list state))
+ #.None
+ (if (n.= (get@ #max-positions state)
+ (.nat (get@ #open-positions state)))
+ [#.None
+ state
+ true]
+ [#.None
+ (update@ #open-positions inc state)
+ false])
+
+ (#.Some head)
+ [(#.Some head)
+ (|> state
+ (update@ #open-positions inc)
+ (update@ #waiting-list queue.pop))
+ false]))]]
+ (if maxed-out?
+ (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)]))
(do @
- [_ (case ?signal
- #.None
- (wrap #1)
-
- (#.Some signal)
- (promise.resolve [] signal))]
- (wrap []))
- (recur [])))))))
+ [#let [open-positions (get@ #open-positions state')]
+ success? (atom.compare-and-swap state state' semaphore)]
+ (if success?
+ (do @
+ [_ (case ?sink
+ #.None
+ (wrap true)
+
+ (#.Some sink)
+ (sink []))]
+ (wrap (#try.Success open-positions)))
+ (recur [])))))))))
)
(abstract: #export Mutex
@@ -104,7 +140,7 @@
)
(def: #export limit (refinement.refinement (n.> 0)))
-(`` (type: #export Limit (~~ (refinement.type limit))))
+(type: #export Limit (:~ (refinement.type limit)))
(abstract: #export Barrier
{#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
@@ -126,7 +162,7 @@
(loop [step 0]
(if (n.< times step)
(do promise.monad
- [_ (signal turnstile)]
+ [_ (..signal turnstile)]
(recur (inc step)))
(:: promise.monad wrap []))))
@@ -136,11 +172,11 @@
(do promise.monad
[#let [limit (refinement.un-refine (get@ #limit barrier))
goal <goal>
- count (io.run (atom.update <update> (get@ #count barrier)))]
- _ (if (n.= goal count)
- (un-block limit (get@ <turnstile> barrier))
- (wrap []))]
- (wait (get@ <turnstile> barrier))))]
+ count (io.run (atom.update <update> (get@ #count barrier)))
+ reached? (n.= goal count)]]
+ (if reached?
+ (un-block limit (get@ <turnstile> barrier))
+ (wait (get@ <turnstile> barrier)))))]
[start inc limit #start-turnstile]
[end dec 0 #end-turnstile]
@@ -149,6 +185,6 @@
(def: #export (block barrier)
(-> Barrier (Promise Any))
(do promise.monad
- [_ (start barrier)]
- (end barrier)))
+ [_ (..start barrier)]
+ (..end barrier)))
)