aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/semaphore.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/semaphore.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux173
1 files changed, 0 insertions, 173 deletions
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
deleted file mode 100644
index 0e8fa2b94..000000000
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ /dev/null
@@ -1,173 +0,0 @@
-(.module:
- [lux #*
- [abstract
- [monad (#+ do)]]
- [control
- [pipe (#+ if>)]
- ["." io (#+ IO)]
- ["." try (#+ Try)]
- ["." exception (#+ exception:)]]
- [data
- [text
- ["%" format (#+ format)]]
- [collection
- ["." queue (#+ Queue)]]]
- [math
- [number
- ["n" nat]
- ["i" int]]]
- [type
- abstract
- ["." refinement]]]
- [//
- ["." atom (#+ Atom)]
- ["." promise (#+ Promise Resolver)]])
-
-(type: State
- {#max_positions Nat
- #open_positions Int
- #waiting_list (Queue (Resolver Any))})
-
-(abstract: #export Semaphore
- (Atom State)
-
- {#.doc "A tool for controlling access to resources by multiple concurrent processes."}
-
- (def: most_positions_possible
- (.nat (\ i.interval top)))
-
- (def: #export (semaphore initial_open_positions)
- (-> Nat Semaphore)
- (let [max_positions (n.min initial_open_positions
- ..most_positions_possible)]
- (:abstraction (atom.atom {#max_positions max_positions
- #open_positions (.int max_positions)
- #waiting_list queue.empty}))))
-
- (def: #export (wait semaphore)
- (Ex [k] (-> Semaphore (Promise Any)))
- (let [semaphore (:representation semaphore)
- [signal sink] (: [(Promise Any) (Resolver Any)]
- (promise.promise []))]
- (exec (io.run
- (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))]
- (do io.monad
- [[_ state'] (atom.update (|>> (update@ #open_positions dec)
- (if> [<had_open_position?>]
- []
- [(update@ #waiting_list (queue.push sink))]))
- semaphore)]
- (with_expansions [<go_ahead> (sink [])
- <get_in_line> (wrap false)]
- (if (|> state' <had_open_position?>)
- <go_ahead>
- <get_in_line>)))))
- signal)))
-
- (exception: #export (semaphore_is_maxed_out {max_positions Nat})
- (exception.report
- ["Max Positions" (%.nat max_positions)]))
-
- (def: #export (signal semaphore)
- (Ex [k] (-> Semaphore (Promise (Try Int))))
- (let [semaphore (:representation semaphore)]
- (promise.future
- (do {! io.monad}
- [[pre post] (atom.update (function (_ state)
- (if (i.= (.int (get@ #max_positions state))
- (get@ #open_positions state))
- state
- (|> state
- (update@ #open_positions inc)
- (update@ #waiting_list queue.pop))))
- semaphore)]
- (if (is? pre post)
- (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)]))
- (do !
- [_ (case (queue.peek (get@ #waiting_list pre))
- #.None
- (wrap true)
-
- (#.Some sink)
- (sink []))]
- (wrap (#try.Success (get@ #open_positions post)))))))))
- )
-
-(abstract: #export Mutex
- Semaphore
-
- {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."}
-
- (def: #export (mutex _)
- (-> Any Mutex)
- (:abstraction (semaphore 1)))
-
- (def: acquire
- (-> Mutex (Promise Any))
- (|>> :representation ..wait))
-
- (def: release
- (-> Mutex (Promise Any))
- (|>> :representation ..signal))
-
- (def: #export (synchronize mutex procedure)
- (All [a] (-> Mutex (IO (Promise a)) (Promise a)))
- (do promise.monad
- [_ (..acquire mutex)
- output (io.run procedure)
- _ (..release mutex)]
- (wrap output)))
- )
-
-(def: #export limit
- (refinement.refinement (n.> 0)))
-
-(type: #export Limit
- (:~ (refinement.type limit)))
-
-(abstract: #export Barrier
- {#limit Limit
- #count (Atom Nat)
- #start_turnstile Semaphore
- #end_turnstile Semaphore}
-
- {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
-
- (def: #export (barrier limit)
- (-> Limit Barrier)
- (:abstraction {#limit limit
- #count (atom.atom 0)
- #start_turnstile (..semaphore 0)
- #end_turnstile (..semaphore 0)}))
-
- (def: (un_block times turnstile)
- (-> Nat Semaphore (Promise Any))
- (loop [step 0]
- (if (n.< times step)
- (do promise.monad
- [outcome (..signal turnstile)]
- (recur (inc step)))
- (\ promise.monad wrap []))))
-
- (template [<phase> <update> <goal> <turnstile>]
- [(def: (<phase> (^:representation barrier))
- (-> Barrier (Promise Any))
- (do promise.monad
- [#let [limit (refinement.un_refine (get@ #limit barrier))
- goal <goal>
- [_ count] (io.run (atom.update <update> (get@ #count barrier)))
- reached? (n.= goal count)]]
- (if reached?
- (..un_block (dec limit) (get@ <turnstile> barrier))
- (..wait (get@ <turnstile> barrier)))))]
-
- [start inc limit #start_turnstile]
- [end dec 0 #end_turnstile]
- )
-
- (def: #export (block barrier)
- (-> Barrier (Promise Any))
- (do promise.monad
- [_ (..start barrier)]
- (..end barrier)))
- )