aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--stdlib/source/lux/concurrency/atom.lux14
-rw-r--r--stdlib/source/lux/concurrency/frp.lux16
-rw-r--r--stdlib/source/lux/concurrency/semaphore.lux147
-rw-r--r--stdlib/source/lux/concurrency/stm.lux6
-rw-r--r--stdlib/source/lux/data/text.lux12
-rw-r--r--stdlib/test/test/lux/concurrency/frp.lux5
-rw-r--r--stdlib/test/test/lux/concurrency/semaphore.lux138
-rw-r--r--stdlib/test/test/lux/concurrency/stm.lux5
8 files changed, 320 insertions, 23 deletions
diff --git a/stdlib/source/lux/concurrency/atom.lux b/stdlib/source/lux/concurrency/atom.lux
index bd3041979..c60edd01e 100644
--- a/stdlib/source/lux/concurrency/atom.lux
+++ b/stdlib/source/lux/concurrency/atom.lux
@@ -28,12 +28,14 @@
If it fails to update it (because some other process wrote to it first), it will retry until it succeeds.
The retries will be done with the new values of the atom, as they show up."}
- (All [a] (-> (-> a a) (Atom a) (IO Unit)))
- (io (let [old ("lux atom read" atom)]
- (if ("lux atom compare-and-swap" atom old (f old))
- []
- (io.run (update f atom))))))
+ (All [a] (-> (-> a a) (Atom a) (IO a)))
+ (io (loop [_ []]
+ (let [old ("lux atom read" atom)
+ new (f old)]
+ (if ("lux atom compare-and-swap" atom old new)
+ new
+ (recur []))))))
(def: #export (write value atom)
- (All [a] (-> a (Atom a) (IO Unit)))
+ (All [a] (-> a (Atom a) (IO Top)))
(update (function.const value) atom))
diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux
index fbe55efee..98de28d39 100644
--- a/stdlib/source/lux/concurrency/frp.lux
+++ b/stdlib/source/lux/concurrency/frp.lux
@@ -19,16 +19,18 @@
(@abstraction (atom (list))))
(def: #export (listen listener (^@representation channel))
- (All [a] (-> (-> a (IO Top)) (Channel a) (IO Unit)))
- (atom.update (|>> (#.Cons listener)) channel))
+ (All [a] (-> (-> a (IO Top)) (Channel a) (IO Top)))
+ ## TODO: Simplify when possible.
+ (do io.Monad<IO>
+ [_ (atom.update (|>> (#.Cons listener)) channel)]
+ (wrap [])))
(def: #export (publish (^@representation channel) value)
{#.doc "Publish to a channel."}
- (All [a] (-> (Channel a) a (IO Unit)))
+ (All [a] (-> (Channel a) a (IO Top)))
(do io.Monad<IO>
- [listeners (atom.read channel)
- _ (monad.map @ (function [listener] (listener value)) listeners)]
- (wrap [])))
+ [listeners (atom.read channel)]
+ (monad.map @ (function [listener] (listener value)) listeners)))
)
## [Values]
@@ -44,7 +46,7 @@
(def: #export (pipe output input)
{#.doc "Copy/pipe the contents of a channel on to another."}
- (All [a] (-> (Channel a) (Channel a) (IO Unit)))
+ (All [a] (-> (Channel a) (Channel a) (IO Top)))
(listen (publish output)
input))
diff --git a/stdlib/source/lux/concurrency/semaphore.lux b/stdlib/source/lux/concurrency/semaphore.lux
new file mode 100644
index 000000000..c4391fbc9
--- /dev/null
+++ b/stdlib/source/lux/concurrency/semaphore.lux
@@ -0,0 +1,147 @@
+(.module:
+ lux
+ (lux (control [monad #+ do])
+ (concurrency [atom #+ Atom]
+ [promise #+ Promise])
+ [io #+ IO]
+ (type abstract
+ [refinement])))
+
+(type: State
+ {#open-positions Nat
+ #waiting-list (List (Promise Unit))})
+
+(abstract: #export Semaphore
+ {#.doc "A tool for controlling access to resources by multiple concurrent processes."}
+
+ (Atom State)
+
+ (def: #export (semaphore init-open-positions)
+ (-> Nat Semaphore)
+ (@abstraction (atom.atom {#open-positions init-open-positions
+ #waiting-list (list)})))
+
+ (def: #export (wait semaphore)
+ (Ex [k] (-> Semaphore (Promise Unit)))
+ (let [semaphore (@representation semaphore)]
+ (io.run
+ (loop [signal (: (Promise Unit)
+ (promise.promise #.None))]
+ (do io.Monad<IO>
+ [state (atom.read semaphore)
+ #let [[ready? state'] (: [Bool State]
+ (case (get@ #open-positions state)
+ +0 [false (update@ #waiting-list (|>> (#.Cons signal))
+ state)]
+ _ [true (update@ #open-positions n/dec
+ state)]))]
+ success? (atom.compare-and-swap state state' semaphore)
+ _ (if ready?
+ (promise.resolve [] signal)
+ (wrap false))]
+ (if success?
+ (wrap signal)
+ (recur signal)))))))
+
+ (def: #export (signal semaphore)
+ (Ex [k] (-> Semaphore (Promise Unit)))
+ (let [semaphore (@representation semaphore)]
+ (promise.future
+ (loop [_ []]
+ (do io.Monad<IO>
+ [state (atom.read semaphore)
+ #let [[?signal state'] (: [(Maybe (Promise Unit)) State]
+ (case (get@ #waiting-list state)
+ #.Nil
+ [#.None (update@ #open-positions n/inc state)]
+
+ (#.Cons head tail)
+ [(#.Some head) (set@ #waiting-list tail state)]))]
+ success? (atom.compare-and-swap state state' semaphore)]
+ (if success?
+ (do @
+ [_ (case ?signal
+ #.None
+ (wrap true)
+
+ (#.Some signal)
+ (promise.resolve [] signal))]
+ (wrap []))
+ (recur [])))))))
+ )
+
+(abstract: #export Mutex
+ {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."}
+
+ Semaphore
+
+ (def: #export (mutex _)
+ (-> Top Mutex)
+ (@abstraction (semaphore +1)))
+
+ (def: (acquire mutex)
+ (-> Mutex (Promise Unit))
+ (wait (@representation mutex)))
+
+ (def: (release mutex)
+ (-> Mutex (Promise Unit))
+ (signal (@representation mutex)))
+
+ (def: #export (synchronize mutex procedure)
+ (All [a] (-> Mutex (IO (Promise a)) (Promise a)))
+ (do promise.Monad<Promise>
+ [_ (acquire mutex)
+ output (io.run procedure)
+ _ (release mutex)]
+ (wrap output)))
+ )
+
+(def: #export limit (refinement.refinement (n/> +0)))
+(type: #export Limit (~ (refinement.type limit)))
+
+(abstract: #export Barrier
+ {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
+
+ {#limit Limit
+ #count (Atom Nat)
+ #start-turnstile Semaphore
+ #end-turnstile Semaphore}
+
+ (def: #export (barrier limit)
+ (-> Limit Barrier)
+ (@abstraction {#limit limit
+ #count (atom.atom +0)
+ #start-turnstile (semaphore +0)
+ #end-turnstile (semaphore +0)}))
+
+ (def: (un-block times turnstile)
+ (-> Nat Semaphore (Promise Top))
+ (loop [step +0]
+ (if (n/< times step)
+ (do promise.Monad<Promise>
+ [_ (signal turnstile)]
+ (recur (n/inc step)))
+ (:: promise.Monad<Promise> wrap []))))
+
+ (do-template [<phase> <update> <goal> <turnstile>]
+ [(def: (<phase> (^@representation barrier))
+ (-> Barrier (Promise Unit))
+ (do promise.Monad<Promise>
+ [#let [limit (refinement.un-refine (get@ #limit barrier))
+ goal <goal>
+ count (io.run (atom.update <update> (get@ #count barrier)))]
+ _ (if (n/= goal count)
+ (un-block limit (get@ <turnstile> barrier))
+ (wrap []))]
+ (wait (get@ <turnstile> barrier))))]
+
+ [start n/inc limit #start-turnstile]
+ [end n/dec +0 #end-turnstile]
+ )
+
+ (def: #export (block barrier)
+ (-> Barrier (Promise Unit))
+ (do promise.Monad<Promise>
+ [_ (start barrier)]
+ (end barrier)))
+ )
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 4e2d82905..69a36461a 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -14,7 +14,8 @@
(abstract: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom [a (List (-> a (IO Unit)))])
+
+ (Atom [a (List (-> a (IO Top)))])
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
@@ -43,6 +44,7 @@
(wrap []))
(write! new-value (@abstraction var)))))
+ ## TODO: Remove when possible
(def: (helper|follow var)
(All [a] (-> (Var a) (frp.Channel a)))
(frp.channel []))
@@ -185,7 +187,7 @@
(atom false))
(def: (issue-commit commit)
- (-> Commit (IO Unit))
+ (-> Commit (IO Top))
(let [entry [commit (promise #.None)]]
(loop [|commits| (io.run (atom.read pending-commits))]
(case (promise.poll |commits|)
diff --git a/stdlib/source/lux/data/text.lux b/stdlib/source/lux/data/text.lux
index 0fdbb376f..aeaf858cc 100644
--- a/stdlib/source/lux/data/text.lux
+++ b/stdlib/source/lux/data/text.lux
@@ -56,20 +56,20 @@
(-> Text Text (Maybe Nat))
("lux text index" input pattern +0))
-(def: (last-index-of'' part part-size since text)
- (-> Text Nat Nat Text (Maybe Nat))
- (case ("lux text index" text part (n/+ part-size since))
+(def: (last-index-of'' part since text)
+ (-> Text Nat Text (Maybe Nat))
+ (case ("lux text index" text part (n/inc since))
#.None
(#.Some since)
(#.Some since')
- (last-index-of'' part part-size since' text)))
+ (last-index-of'' part since' text)))
(def: #export (last-index-of' part from text)
(-> Text Nat Text (Maybe Nat))
(case ("lux text index" text part from)
(#.Some since)
- (last-index-of'' part ("lux text size" part) since text)
+ (last-index-of'' part since text)
#.None
#.None))
@@ -78,7 +78,7 @@
(-> Text Text (Maybe Nat))
(case ("lux text index" text part +0)
(#.Some since)
- (last-index-of'' part ("lux text size" part) since text)
+ (last-index-of'' part since text)
#.None
#.None))
diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux
index 5360dcda2..5f7245d0f 100644
--- a/stdlib/test/test/lux/concurrency/frp.lux
+++ b/stdlib/test/test/lux/concurrency/frp.lux
@@ -21,7 +21,10 @@
(do io.Monad<IO>
[#let [output (atom (list))]
_ (frp.listen (function [value]
- (atom.update (|>> (#.Cons value)) output))
+ ## TODO: Simplify when possible.
+ (do @
+ [_ (atom.update (|>> (#.Cons value)) output)]
+ (wrap [])))
channel)]
(wrap output)))
diff --git a/stdlib/test/test/lux/concurrency/semaphore.lux b/stdlib/test/test/lux/concurrency/semaphore.lux
new file mode 100644
index 000000000..59a9618fa
--- /dev/null
+++ b/stdlib/test/test/lux/concurrency/semaphore.lux
@@ -0,0 +1,138 @@
+(.module:
+ lux
+ (lux (control [monad #+ do])
+ (data [maybe]
+ [text "text/" Eq<Text> Monoid<Text>]
+ text/format
+ (coll [list "list/" Functor<List>]))
+ (concurrency ["/" semaphore]
+ [promise #+ Promise]
+ [atom #+ Atom])
+ [io]
+ ["r" math/random])
+ lux/test)
+
+(def: (wait-many-times times semaphore)
+ (-> Nat /.Semaphore (Promise Top))
+ (loop [steps times]
+ (if (n/> +0 steps)
+ (do promise.Monad<Promise>
+ [_ (/.wait semaphore)]
+ (recur (n/dec steps)))
+ (:: promise.Monad<Promise> wrap []))))
+
+(context: "Semaphore."
+ (<| (times +100)
+ (do @
+ [open-positions (|> r.nat (:: @ map (|>> (n/% +10) (n/max +1))))]
+ ($_ seq
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [_ (wait-many-times open-positions semaphore)]
+ (assert "Can wait on a semaphore up to the number of open positions without blocking."
+ true))))
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [result (<| (promise.time-out +100)
+ (wait-many-times (n/inc open-positions) semaphore))]
+ (assert "Waiting on a semaphore more than the number of open positions blocks the process."
+ (case result
+ (#.Some _)
+ false
+
+ #.None
+ true)))))
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [_ (: (Promise Top)
+ (loop [steps (n/* +2 open-positions)]
+ (if (n/> +0 steps)
+ (do @
+ [_ (/.wait semaphore)
+ _ (/.signal semaphore)]
+ (recur (n/dec steps)))
+ (wrap []))))]
+ (assert "Signaling a semaphore replenishes its open positions."
+ true))))
+ (let [semaphore (/.semaphore open-positions)]
+ (wrap (do promise.Monad<Promise>
+ [#let [resource (atom.atom "")
+ blocked (do @
+ [_ (wait-many-times open-positions semaphore)
+ _ (/.wait semaphore)
+ #let [_ (io.run (atom.update (|>> (format "B"))
+ resource))]]
+ (wrap []))]
+ _ (promise.wait +100)
+ _ (exec (io.run (atom.update (|>> (format "A"))
+ resource))
+ (/.signal semaphore))
+ _ blocked]
+ (assert "A blocked process can be un-blocked by a signal somewhere else."
+ (text/= "BA"
+ (io.run (atom.read resource)))))))
+ ))))
+
+(context: "Mutex."
+ (<| (times +100)
+ (do @
+ [repetitions (|> r.nat (:: @ map (|>> (n/% +100) (n/max +10))))]
+ ($_ seq
+ (let [mutex (/.mutex [])]
+ (wrap (do promise.Monad<Promise>
+ [#let [resource (atom.atom "")
+ expected-As (text.join-with "" (list.repeat repetitions "A"))
+ expected-Bs (text.join-with "" (list.repeat repetitions "B"))
+ processA (<| (/.synchronize mutex)
+ io.io
+ promise.future
+ (do io.Monad<IO>
+ [_ (<| (monad.seq @)
+ (list.repeat repetitions)
+ (atom.update (|>> (format "A")) resource))]
+ (wrap [])))
+ processB (<| (/.synchronize mutex)
+ io.io
+ promise.future
+ (do io.Monad<IO>
+ [_ (<| (monad.seq @)
+ (list.repeat repetitions)
+ (atom.update (|>> (format "B")) resource))]
+ (wrap [])))]
+ _ processA
+ _ processB
+ #let [outcome (io.run (atom.read resource))]]
+ (assert "Mutexes only allow one process to execute at a time."
+ (or (text/= (format expected-As expected-Bs)
+ outcome)
+ (text/= (format expected-Bs expected-As)
+ outcome))))))
+ ))))
+
+(def: (waiter resource barrier id)
+ (-> (Atom Text) /.Barrier Nat (Promise Top))
+ (do promise.Monad<Promise>
+ [_ (/.block barrier)
+ #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]]
+ (wrap [])))
+
+(context: "Barrier."
+ (let [limit +10
+ barrier (/.barrier (maybe.assume (/.limit limit)))
+ resource (atom.atom "")]
+ ($_ seq
+ (wrap (do promise.Monad<Promise>
+ [#let [ids (list.n/range +0 (n/dec limit))
+ waiters (list/map (function [id]
+ (let [process (waiter resource barrier id)]
+ (exec (io.run (atom.update (|>> (format "_")) resource))
+ process)))
+ ids)]
+ _ (monad.seq @ waiters)
+ #let [outcome (io.run (atom.read resource))]]
+ (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all."
+ (and (text.ends-with? "__________" outcome)
+ (list.every? (function [id]
+ (text.contains? (%n id) outcome))
+ ids)
+ )))))))
diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux
index 1ca5482bf..d7764dfa2 100644
--- a/stdlib/test/test/lux/concurrency/stm.lux
+++ b/stdlib/test/test/lux/concurrency/stm.lux
@@ -18,7 +18,10 @@
(do io.Monad<IO>
[#let [output (atom (list))]
_ (frp.listen (function [value]
- (atom.update (|>> (#.Cons value)) output))
+ ## TODO: Simplify when possible.
+ (do @
+ [_ (atom.update (|>> (#.Cons value)) output)]
+ (wrap [])))
channel)]
(wrap output)))