diff options
author | Eduardo Julian | 2018-01-30 01:13:07 -0400 |
---|---|---|
committer | Eduardo Julian | 2018-01-30 01:13:07 -0400 |
commit | 927694bdd07f25105f28649cf3c93a4275321a12 (patch) | |
tree | 8ff4b270b71497e1ceed7d618b13ff5b4951323f /stdlib/source | |
parent | dc0bddf24a9c016756700b84e1905886fed1050b (diff) |
- Implemented semaphores, mutexes and barriers.
- Fixed a bug when checking "last-index-of" for Text.
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/concurrency/atom.lux | 14 | ||||
-rw-r--r-- | stdlib/source/lux/concurrency/frp.lux | 16 | ||||
-rw-r--r-- | stdlib/source/lux/concurrency/semaphore.lux | 147 | ||||
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 6 | ||||
-rw-r--r-- | stdlib/source/lux/data/text.lux | 12 |
5 files changed, 174 insertions, 21 deletions
diff --git a/stdlib/source/lux/concurrency/atom.lux b/stdlib/source/lux/concurrency/atom.lux index bd3041979..c60edd01e 100644 --- a/stdlib/source/lux/concurrency/atom.lux +++ b/stdlib/source/lux/concurrency/atom.lux @@ -28,12 +28,14 @@ If it fails to update it (because some other process wrote to it first), it will retry until it succeeds. The retries will be done with the new values of the atom, as they show up."} - (All [a] (-> (-> a a) (Atom a) (IO Unit))) - (io (let [old ("lux atom read" atom)] - (if ("lux atom compare-and-swap" atom old (f old)) - [] - (io.run (update f atom)))))) + (All [a] (-> (-> a a) (Atom a) (IO a))) + (io (loop [_ []] + (let [old ("lux atom read" atom) + new (f old)] + (if ("lux atom compare-and-swap" atom old new) + new + (recur [])))))) (def: #export (write value atom) - (All [a] (-> a (Atom a) (IO Unit))) + (All [a] (-> a (Atom a) (IO Top))) (update (function.const value) atom)) diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux index fbe55efee..98de28d39 100644 --- a/stdlib/source/lux/concurrency/frp.lux +++ b/stdlib/source/lux/concurrency/frp.lux @@ -19,16 +19,18 @@ (@abstraction (atom (list)))) (def: #export (listen listener (^@representation channel)) - (All [a] (-> (-> a (IO Top)) (Channel a) (IO Unit))) - (atom.update (|>> (#.Cons listener)) channel)) + (All [a] (-> (-> a (IO Top)) (Channel a) (IO Top))) + ## TODO: Simplify when possible. + (do io.Monad<IO> + [_ (atom.update (|>> (#.Cons listener)) channel)] + (wrap []))) (def: #export (publish (^@representation channel) value) {#.doc "Publish to a channel."} - (All [a] (-> (Channel a) a (IO Unit))) + (All [a] (-> (Channel a) a (IO Top))) (do io.Monad<IO> - [listeners (atom.read channel) - _ (monad.map @ (function [listener] (listener value)) listeners)] - (wrap []))) + [listeners (atom.read channel)] + (monad.map @ (function [listener] (listener value)) listeners))) ) ## [Values] @@ -44,7 +46,7 @@ (def: #export (pipe output input) {#.doc "Copy/pipe the contents of a channel on to another."} - (All [a] (-> (Channel a) (Channel a) (IO Unit))) + (All [a] (-> (Channel a) (Channel a) (IO Top))) (listen (publish output) input)) diff --git a/stdlib/source/lux/concurrency/semaphore.lux b/stdlib/source/lux/concurrency/semaphore.lux new file mode 100644 index 000000000..c4391fbc9 --- /dev/null +++ b/stdlib/source/lux/concurrency/semaphore.lux @@ -0,0 +1,147 @@ +(.module: + lux + (lux (control [monad #+ do]) + (concurrency [atom #+ Atom] + [promise #+ Promise]) + [io #+ IO] + (type abstract + [refinement]))) + +(type: State + {#open-positions Nat + #waiting-list (List (Promise Unit))}) + +(abstract: #export Semaphore + {#.doc "A tool for controlling access to resources by multiple concurrent processes."} + + (Atom State) + + (def: #export (semaphore init-open-positions) + (-> Nat Semaphore) + (@abstraction (atom.atom {#open-positions init-open-positions + #waiting-list (list)}))) + + (def: #export (wait semaphore) + (Ex [k] (-> Semaphore (Promise Unit))) + (let [semaphore (@representation semaphore)] + (io.run + (loop [signal (: (Promise Unit) + (promise.promise #.None))] + (do io.Monad<IO> + [state (atom.read semaphore) + #let [[ready? state'] (: [Bool State] + (case (get@ #open-positions state) + +0 [false (update@ #waiting-list (|>> (#.Cons signal)) + state)] + _ [true (update@ #open-positions n/dec + state)]))] + success? (atom.compare-and-swap state state' semaphore) + _ (if ready? + (promise.resolve [] signal) + (wrap false))] + (if success? + (wrap signal) + (recur signal))))))) + + (def: #export (signal semaphore) + (Ex [k] (-> Semaphore (Promise Unit))) + (let [semaphore (@representation semaphore)] + (promise.future + (loop [_ []] + (do io.Monad<IO> + [state (atom.read semaphore) + #let [[?signal state'] (: [(Maybe (Promise Unit)) State] + (case (get@ #waiting-list state) + #.Nil + [#.None (update@ #open-positions n/inc state)] + + (#.Cons head tail) + [(#.Some head) (set@ #waiting-list tail state)]))] + success? (atom.compare-and-swap state state' semaphore)] + (if success? + (do @ + [_ (case ?signal + #.None + (wrap true) + + (#.Some signal) + (promise.resolve [] signal))] + (wrap [])) + (recur []))))))) + ) + +(abstract: #export Mutex + {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} + + Semaphore + + (def: #export (mutex _) + (-> Top Mutex) + (@abstraction (semaphore +1))) + + (def: (acquire mutex) + (-> Mutex (Promise Unit)) + (wait (@representation mutex))) + + (def: (release mutex) + (-> Mutex (Promise Unit)) + (signal (@representation mutex))) + + (def: #export (synchronize mutex procedure) + (All [a] (-> Mutex (IO (Promise a)) (Promise a))) + (do promise.Monad<Promise> + [_ (acquire mutex) + output (io.run procedure) + _ (release mutex)] + (wrap output))) + ) + +(def: #export limit (refinement.refinement (n/> +0))) +(type: #export Limit (~ (refinement.type limit))) + +(abstract: #export Barrier + {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} + + {#limit Limit + #count (Atom Nat) + #start-turnstile Semaphore + #end-turnstile Semaphore} + + (def: #export (barrier limit) + (-> Limit Barrier) + (@abstraction {#limit limit + #count (atom.atom +0) + #start-turnstile (semaphore +0) + #end-turnstile (semaphore +0)})) + + (def: (un-block times turnstile) + (-> Nat Semaphore (Promise Top)) + (loop [step +0] + (if (n/< times step) + (do promise.Monad<Promise> + [_ (signal turnstile)] + (recur (n/inc step))) + (:: promise.Monad<Promise> wrap [])))) + + (do-template [<phase> <update> <goal> <turnstile>] + [(def: (<phase> (^@representation barrier)) + (-> Barrier (Promise Unit)) + (do promise.Monad<Promise> + [#let [limit (refinement.un-refine (get@ #limit barrier)) + goal <goal> + count (io.run (atom.update <update> (get@ #count barrier)))] + _ (if (n/= goal count) + (un-block limit (get@ <turnstile> barrier)) + (wrap []))] + (wait (get@ <turnstile> barrier))))] + + [start n/inc limit #start-turnstile] + [end n/dec +0 #end-turnstile] + ) + + (def: #export (block barrier) + (-> Barrier (Promise Unit)) + (do promise.Monad<Promise> + [_ (start barrier)] + (end barrier))) + ) diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 4e2d82905..69a36461a 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -14,7 +14,8 @@ (abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom [a (List (-> a (IO Unit)))]) + + (Atom [a (List (-> a (IO Top)))]) (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} @@ -43,6 +44,7 @@ (wrap [])) (write! new-value (@abstraction var))))) + ## TODO: Remove when possible (def: (helper|follow var) (All [a] (-> (Var a) (frp.Channel a))) (frp.channel [])) @@ -185,7 +187,7 @@ (atom false)) (def: (issue-commit commit) - (-> Commit (IO Unit)) + (-> Commit (IO Top)) (let [entry [commit (promise #.None)]] (loop [|commits| (io.run (atom.read pending-commits))] (case (promise.poll |commits|) diff --git a/stdlib/source/lux/data/text.lux b/stdlib/source/lux/data/text.lux index 0fdbb376f..aeaf858cc 100644 --- a/stdlib/source/lux/data/text.lux +++ b/stdlib/source/lux/data/text.lux @@ -56,20 +56,20 @@ (-> Text Text (Maybe Nat)) ("lux text index" input pattern +0)) -(def: (last-index-of'' part part-size since text) - (-> Text Nat Nat Text (Maybe Nat)) - (case ("lux text index" text part (n/+ part-size since)) +(def: (last-index-of'' part since text) + (-> Text Nat Text (Maybe Nat)) + (case ("lux text index" text part (n/inc since)) #.None (#.Some since) (#.Some since') - (last-index-of'' part part-size since' text))) + (last-index-of'' part since' text))) (def: #export (last-index-of' part from text) (-> Text Nat Text (Maybe Nat)) (case ("lux text index" text part from) (#.Some since) - (last-index-of'' part ("lux text size" part) since text) + (last-index-of'' part since text) #.None #.None)) @@ -78,7 +78,7 @@ (-> Text Text (Maybe Nat)) (case ("lux text index" text part +0) (#.Some since) - (last-index-of'' part ("lux text size" part) since text) + (last-index-of'' part since text) #.None #.None)) |