diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 149 |
1 files changed, 149 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux new file mode 100644 index 000000000..46762ecf3 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -0,0 +1,149 @@ +(.module: + [lux #* + [control [monad (#+ do)]] + ["." io (#+ IO)] + [type + abstract + ["." refinement]]] + [// + ["." atom (#+ Atom)] + ["." promise (#+ Promise)]]) + +(type: State + {#open-positions Nat + #waiting-list (List (Promise Any))}) + +(abstract: #export Semaphore + {#.doc "A tool for controlling access to resources by multiple concurrent processes."} + + (Atom State) + + (def: #export (semaphore init-open-positions) + (-> Nat Semaphore) + (:abstraction (atom.atom {#open-positions init-open-positions + #waiting-list (list)}))) + + (def: #export (wait semaphore) + (Ex [k] (-> Semaphore (Promise Any))) + (let [semaphore (:representation semaphore)] + (io.run + (loop [signal (: (Promise Any) + (promise.promise #.None))] + (do io.Monad<IO> + [state (atom.read semaphore) + #let [[ready? state'] (: [Bit State] + (case (get@ #open-positions state) + 0 [#0 (update@ #waiting-list (|>> (#.Cons signal)) + state)] + _ [#1 (update@ #open-positions dec + state)]))] + success? (atom.compare-and-swap state state' semaphore) + _ (if ready? + (promise.resolve [] signal) + (wrap #0))] + (if success? + (wrap signal) + (recur signal))))))) + + (def: #export (signal semaphore) + (Ex [k] (-> Semaphore (Promise Any))) + (let [semaphore (:representation semaphore)] + (promise.future + (loop [_ []] + (do io.Monad<IO> + [state (atom.read semaphore) + #let [[?signal state'] (: [(Maybe (Promise Any)) State] + (case (get@ #waiting-list state) + #.Nil + [#.None (update@ #open-positions inc state)] + + (#.Cons head tail) + [(#.Some head) (set@ #waiting-list tail state)]))] + success? (atom.compare-and-swap state state' semaphore)] + (if success? + (do @ + [_ (case ?signal + #.None + (wrap #1) + + (#.Some signal) + (promise.resolve [] signal))] + (wrap [])) + (recur []))))))) + ) + +(abstract: #export Mutex + {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} + + Semaphore + + (def: #export (mutex _) + (-> Any Mutex) + (:abstraction (semaphore 1))) + + (def: acquire + (-> Mutex (Promise Any)) + (|>> :representation wait)) + + (def: release + (-> Mutex (Promise Any)) + (|>> :representation signal)) + + (def: #export (synchronize mutex procedure) + (All [a] (-> Mutex (IO (Promise a)) (Promise a))) + (do promise.Monad<Promise> + [_ (acquire mutex) + output (io.run procedure) + _ (release mutex)] + (wrap output))) + ) + +(def: #export limit (refinement.refinement (n/> 0))) +(`` (type: #export Limit (~~ (refinement.type limit)))) + +(abstract: #export Barrier + {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} + + {#limit Limit + #count (Atom Nat) + #start-turnstile Semaphore + #end-turnstile Semaphore} + + (def: #export (barrier limit) + (-> Limit Barrier) + (:abstraction {#limit limit + #count (atom.atom 0) + #start-turnstile (semaphore 0) + #end-turnstile (semaphore 0)})) + + (def: (un-block times turnstile) + (-> Nat Semaphore (Promise Any)) + (loop [step 0] + (if (n/< times step) + (do promise.Monad<Promise> + [_ (signal turnstile)] + (recur (inc step))) + (:: promise.Monad<Promise> wrap [])))) + + (do-template [<phase> <update> <goal> <turnstile>] + [(def: (<phase> (^:representation barrier)) + (-> Barrier (Promise Any)) + (do promise.Monad<Promise> + [#let [limit (refinement.un-refine (get@ #limit barrier)) + goal <goal> + count (io.run (atom.update <update> (get@ #count barrier)))] + _ (if (n/= goal count) + (un-block limit (get@ <turnstile> barrier)) + (wrap []))] + (wait (get@ <turnstile> barrier))))] + + [start inc limit #start-turnstile] + [end dec 0 #end-turnstile] + ) + + (def: #export (block barrier) + (-> Barrier (Promise Any)) + (do promise.Monad<Promise> + [_ (start barrier)] + (end barrier))) + ) |