diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 148 |
1 files changed, 92 insertions, 56 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index ade45984e..39bac32a1 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -2,79 +2,115 @@ [lux #* [abstract [monad (#+ do)]] + [control + ["." io (#+ IO)] + ["." try (#+ Try)] + ["." exception (#+ exception:)]] [data + [text + ["%" format (#+ format)]] [number - ["n" nat]]] - [control - ["." io (#+ IO)]] + ["n" nat] + ["i" int]] + [collection + ["." queue (#+ Queue)]]] [type abstract - ["." refinement]]] + ["." refinement]] + ["." macro]] [// ["." atom (#+ Atom)] - ["." promise (#+ Promise)]]) + ["." promise (#+ Promise Resolver)]]) (type: State - {#open-positions Nat - #waiting-list (List (Promise Any))}) + {#max-positions Nat + #open-positions Int + #waiting-list (Queue (Resolver Any))}) (abstract: #export Semaphore {#.doc "A tool for controlling access to resources by multiple concurrent processes."} (Atom State) - (def: #export (semaphore init-open-positions) + (def: most-positions-possible + (.nat (:: i.interval top))) + + (def: #export (semaphore initial-open-positions) (-> Nat Semaphore) - (:abstraction (atom.atom {#open-positions init-open-positions - #waiting-list (list)}))) + (let [max-positions (n.min initial-open-positions + ..most-positions-possible)] + (:abstraction (atom.atom {#max-positions max-positions + #open-positions (.int max-positions) + #waiting-list queue.empty})))) (def: #export (wait semaphore) (Ex [k] (-> Semaphore (Promise Any))) - (let [semaphore (:representation semaphore)] - (io.run - (loop [signal (: (Promise Any) - (promise.promise #.None))] - (do io.monad - [state (atom.read semaphore) - #let [[ready? state'] (: [Bit State] - (case (get@ #open-positions state) - 0 [#0 (update@ #waiting-list (|>> (#.Cons signal)) - state)] - _ [#1 (update@ #open-positions dec - state)]))] - success? (atom.compare-and-swap state state' semaphore) - _ (if ready? - (promise.resolve [] signal) - (wrap #0))] - (if success? - (wrap signal) - (recur signal))))))) + (let [semaphore (:representation semaphore) + [signal sink] (: [(Promise Any) (Resolver Any)] + (promise.promise []))] + (exec (promise.future + (loop [_ []] + (do io.monad + [state (atom.read semaphore) + #let [[ready? state'] (: [Bit State] + (if (i.> +0 (get@ #open-positions state)) + [true (|> state + (update@ #open-positions dec))] + [false (|> state + (update@ #open-positions dec) + (update@ #waiting-list (queue.push sink)))]))] + success? (atom.compare-and-swap state state' semaphore)] + (if success? + (if ready? + (sink []) + (wrap false)) + (recur []))))) + signal))) + + (exception: #export (semaphore-is-maxed-out {max-positions Nat}) + (exception.report + ["Max Positions" (%.nat max-positions)])) (def: #export (signal semaphore) - (Ex [k] (-> Semaphore (Promise Any))) + (Ex [k] (-> Semaphore (Promise (Try Int)))) (let [semaphore (:representation semaphore)] (promise.future (loop [_ []] (do io.monad [state (atom.read semaphore) - #let [[?signal state'] (: [(Maybe (Promise Any)) State] - (case (get@ #waiting-list state) - #.Nil - [#.None (update@ #open-positions inc state)] - - (#.Cons head tail) - [(#.Some head) (set@ #waiting-list tail state)]))] - success? (atom.compare-and-swap state state' semaphore)] - (if success? + #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit] + (case (queue.peek (get@ #waiting-list state)) + #.None + (if (n.= (get@ #max-positions state) + (.nat (get@ #open-positions state))) + [#.None + state + true] + [#.None + (update@ #open-positions inc state) + false]) + + (#.Some head) + [(#.Some head) + (|> state + (update@ #open-positions inc) + (update@ #waiting-list queue.pop)) + false]))]] + (if maxed-out? + (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)])) (do @ - [_ (case ?signal - #.None - (wrap #1) - - (#.Some signal) - (promise.resolve [] signal))] - (wrap [])) - (recur []))))))) + [#let [open-positions (get@ #open-positions state')] + success? (atom.compare-and-swap state state' semaphore)] + (if success? + (do @ + [_ (case ?sink + #.None + (wrap true) + + (#.Some sink) + (sink []))] + (wrap (#try.Success open-positions))) + (recur []))))))))) ) (abstract: #export Mutex @@ -104,7 +140,7 @@ ) (def: #export limit (refinement.refinement (n.> 0))) -(`` (type: #export Limit (~~ (refinement.type limit)))) +(type: #export Limit (:~ (refinement.type limit))) (abstract: #export Barrier {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} @@ -126,7 +162,7 @@ (loop [step 0] (if (n.< times step) (do promise.monad - [_ (signal turnstile)] + [_ (..signal turnstile)] (recur (inc step))) (:: promise.monad wrap [])))) @@ -136,11 +172,11 @@ (do promise.monad [#let [limit (refinement.un-refine (get@ #limit barrier)) goal <goal> - count (io.run (atom.update <update> (get@ #count barrier)))] - _ (if (n.= goal count) - (un-block limit (get@ <turnstile> barrier)) - (wrap []))] - (wait (get@ <turnstile> barrier))))] + count (io.run (atom.update <update> (get@ #count barrier))) + reached? (n.= goal count)]] + (if reached? + (un-block limit (get@ <turnstile> barrier)) + (wait (get@ <turnstile> barrier)))))] [start inc limit #start-turnstile] [end dec 0 #end-turnstile] @@ -149,6 +185,6 @@ (def: #export (block barrier) (-> Barrier (Promise Any)) (do promise.monad - [_ (start barrier)] - (end barrier))) + [_ (..start barrier)] + (..end barrier))) ) |