aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux149
1 files changed, 149 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
new file mode 100644
index 000000000..46762ecf3
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -0,0 +1,149 @@
+(.module:
+ [lux #*
+ [control [monad (#+ do)]]
+ ["." io (#+ IO)]
+ [type
+ abstract
+ ["." refinement]]]
+ [//
+ ["." atom (#+ Atom)]
+ ["." promise (#+ Promise)]])
+
+(type: State
+ {#open-positions Nat
+ #waiting-list (List (Promise Any))})
+
+(abstract: #export Semaphore
+ {#.doc "A tool for controlling access to resources by multiple concurrent processes."}
+
+ (Atom State)
+
+ (def: #export (semaphore init-open-positions)
+ (-> Nat Semaphore)
+ (:abstraction (atom.atom {#open-positions init-open-positions
+ #waiting-list (list)})))
+
+ (def: #export (wait semaphore)
+ (Ex [k] (-> Semaphore (Promise Any)))
+ (let [semaphore (:representation semaphore)]
+ (io.run
+ (loop [signal (: (Promise Any)
+ (promise.promise #.None))]
+ (do io.Monad<IO>
+ [state (atom.read semaphore)
+ #let [[ready? state'] (: [Bit State]
+ (case (get@ #open-positions state)
+ 0 [#0 (update@ #waiting-list (|>> (#.Cons signal))
+ state)]
+ _ [#1 (update@ #open-positions dec
+ state)]))]
+ success? (atom.compare-and-swap state state' semaphore)
+ _ (if ready?
+ (promise.resolve [] signal)
+ (wrap #0))]
+ (if success?
+ (wrap signal)
+ (recur signal)))))))
+
+ (def: #export (signal semaphore)
+ (Ex [k] (-> Semaphore (Promise Any)))
+ (let [semaphore (:representation semaphore)]
+ (promise.future
+ (loop [_ []]
+ (do io.Monad<IO>
+ [state (atom.read semaphore)
+ #let [[?signal state'] (: [(Maybe (Promise Any)) State]
+ (case (get@ #waiting-list state)
+ #.Nil
+ [#.None (update@ #open-positions inc state)]
+
+ (#.Cons head tail)
+ [(#.Some head) (set@ #waiting-list tail state)]))]
+ success? (atom.compare-and-swap state state' semaphore)]
+ (if success?
+ (do @
+ [_ (case ?signal
+ #.None
+ (wrap #1)
+
+ (#.Some signal)
+ (promise.resolve [] signal))]
+ (wrap []))
+ (recur [])))))))
+ )
+
+(abstract: #export Mutex
+ {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."}
+
+ Semaphore
+
+ (def: #export (mutex _)
+ (-> Any Mutex)
+ (:abstraction (semaphore 1)))
+
+ (def: acquire
+ (-> Mutex (Promise Any))
+ (|>> :representation wait))
+
+ (def: release
+ (-> Mutex (Promise Any))
+ (|>> :representation signal))
+
+ (def: #export (synchronize mutex procedure)
+ (All [a] (-> Mutex (IO (Promise a)) (Promise a)))
+ (do promise.Monad<Promise>
+ [_ (acquire mutex)
+ output (io.run procedure)
+ _ (release mutex)]
+ (wrap output)))
+ )
+
+(def: #export limit (refinement.refinement (n/> 0)))
+(`` (type: #export Limit (~~ (refinement.type limit))))
+
+(abstract: #export Barrier
+ {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
+
+ {#limit Limit
+ #count (Atom Nat)
+ #start-turnstile Semaphore
+ #end-turnstile Semaphore}
+
+ (def: #export (barrier limit)
+ (-> Limit Barrier)
+ (:abstraction {#limit limit
+ #count (atom.atom 0)
+ #start-turnstile (semaphore 0)
+ #end-turnstile (semaphore 0)}))
+
+ (def: (un-block times turnstile)
+ (-> Nat Semaphore (Promise Any))
+ (loop [step 0]
+ (if (n/< times step)
+ (do promise.Monad<Promise>
+ [_ (signal turnstile)]
+ (recur (inc step)))
+ (:: promise.Monad<Promise> wrap []))))
+
+ (do-template [<phase> <update> <goal> <turnstile>]
+ [(def: (<phase> (^:representation barrier))
+ (-> Barrier (Promise Any))
+ (do promise.Monad<Promise>
+ [#let [limit (refinement.un-refine (get@ #limit barrier))
+ goal <goal>
+ count (io.run (atom.update <update> (get@ #count barrier)))]
+ _ (if (n/= goal count)
+ (un-block limit (get@ <turnstile> barrier))
+ (wrap []))]
+ (wait (get@ <turnstile> barrier))))]
+
+ [start inc limit #start-turnstile]
+ [end dec 0 #end-turnstile]
+ )
+
+ (def: #export (block barrier)
+ (-> Barrier (Promise Any))
+ (do promise.Monad<Promise>
+ [_ (start barrier)]
+ (end barrier)))
+ )