aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux18
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux28
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux8
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux6
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux24
5 files changed, 42 insertions, 42 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index f8458caf3..3c423692a 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -53,11 +53,11 @@
[(Promise [a Mailbox])
(Resolver [a Mailbox])])
(IO (List a))))
- (do {@ io.monad}
+ (do {! io.monad}
[current (promise.poll read)]
(case current
(#.Some [head tail])
- (:: @ map (|>> (#.Cons head))
+ (:: ! map (|>> (#.Cons head))
(pending tail))
#.None
@@ -99,12 +99,12 @@
(promise.promise []))
process (loop [state init
[|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]
- (do {@ promise.monad}
+ (do {! promise.monad}
[[head tail] |mailbox|
?state' (handle head state self)]
(case ?state'
(#try.Failure error)
- (do @
+ (do !
[_ (end error state)]
(let [[_ resolve] (get@ #obituary (:representation self))]
(exec (io.run
@@ -137,21 +137,21 @@
(def: #export (send message actor)
{#.doc "Communicate with an actor through message passing."}
(All [s] (-> (Message s) (Actor s) (IO Bit)))
- (do {@ io.monad}
+ (do {! io.monad}
[alive? (..alive? actor)]
(if alive?
(let [entry [message (promise.promise [])]]
- (do @
+ (do !
[|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))]
(loop [[|mailbox| resolve] |mailbox|&resolve]
- (do @
+ (do !
[|mailbox| (promise.poll |mailbox|)]
(case |mailbox|
#.None
- (do @
+ (do !
[resolved? (resolve entry)]
(if resolved?
- (do @
+ (do !
[_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))]
(wrap true))
(recur |mailbox|&resolve)))
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index 2850f454f..50c26e769 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -40,14 +40,14 @@
(structure
(def: close
(loop [_ []]
- (do {@ io.monad}
+ (do {! io.monad}
[current (atom.read sink)
stopped? (current #.None)]
(if stopped?
## I closed the sink.
(wrap (exception.return []))
## Someone else interacted with the sink.
- (do @
+ (do !
[latter (atom.read sink)]
(if (is? current latter)
## Someone else closed the sink.
@@ -57,7 +57,7 @@
(def: (feed value)
(loop [_ []]
- (do {@ io.monad}
+ (do {! io.monad}
[current (atom.read sink)
#let [[next resolve-next] (:share [a]
{(promise.Resolver (Maybe [a (Channel a)]))
@@ -68,11 +68,11 @@
fed? (current (#.Some [value next]))]
(if fed?
## I fed the sink.
- (do @
+ (do !
[_ (atom.compare-and-swap current resolve-next sink)]
(wrap (exception.return [])))
## Someone else interacted with the sink.
- (do @
+ (do !
[latter (atom.read sink)]
(if (is? current latter)
## Someone else closed the sink while I was feeding it.
@@ -124,13 +124,13 @@
(let [[output sink] (channel [])]
(exec (: (Promise Any)
(loop [mma mma]
- (do {@ promise.monad}
+ (do {! promise.monad}
[?mma mma]
(case ?mma
(#.Some [ma mma'])
- (do @
+ (do !
[_ (loop [ma ma]
- (do @
+ (do !
[?ma ma]
(case ?ma
(#.Some [a ma'])
@@ -185,14 +185,14 @@
(All [a b]
(-> (-> b a (Promise a)) a (Channel b)
(Promise a)))
- (do {@ promise.monad}
+ (do {! promise.monad}
[cons channel]
(case cons
#.None
(wrap init)
(#.Some [head tail])
- (do @
+ (do !
[init' (f head init)]
(fold f init' tail)))))
@@ -201,14 +201,14 @@
(All [a b]
(-> (-> b a (Promise a)) a (Channel b)
(Channel a)))
- (do {@ promise.monad}
+ (do {! promise.monad}
[cons channel]
(case cons
#.None
(wrap (#.Some [init (wrap #.None)]))
(#.Some [head tail])
- (do @
+ (do !
[init' (f head init)]
(wrap (#.Some [init (folds f init' tail)]))))))
@@ -265,11 +265,11 @@
(def: #export (consume channel)
{#.doc "Reads the entirety of a channel's content and returns it as a list."}
(All [a] (-> (Channel a) (Promise (List a))))
- (do {@ promise.monad}
+ (do {! promise.monad}
[cons channel]
(case cons
(#.Some [head tail])
- (:: @ map (|>> (#.Cons head))
+ (:: ! map (|>> (#.Cons head))
(consume tail))
#.None
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
index e396b0769..3b6341cf1 100644
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -29,19 +29,19 @@
(All [a] (-> (Promise a) (Resolver a)))
(function (resolve value)
(let [promise (:representation promise)]
- (do {@ io.monad}
+ (do {! io.monad}
[(^@ old [_value _observers]) (atom.read promise)]
(case _value
(#.Some _)
(wrap #0)
#.None
- (do @
+ (do !
[#let [new [(#.Some value) #.None]]
succeeded? (atom.compare-and-swap old new promise)]
(if succeeded?
- (do @
- [_ (monad.map @ (function (_ f) (f value))
+ (do !
+ [_ (monad.map ! (function (_ f) (f value))
_observers)]
(wrap #1))
(resolve value))))))))
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
index 83e5ad005..36ac7cd34 100644
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -75,7 +75,7 @@
(let [semaphore (:representation semaphore)]
(promise.future
(loop [_ []]
- (do {@ io.monad}
+ (do {! io.monad}
[state (atom.read semaphore)
#let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit]
(case (queue.peek (get@ #waiting-list state))
@@ -97,11 +97,11 @@
false]))]]
(if maxed-out?
(wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)]))
- (do @
+ (do !
[#let [open-positions (get@ #open-positions state')]
success? (atom.compare-and-swap state state' semaphore)]
(if success?
- (do @
+ (do !
[_ (case ?sink
#.None
(wrap true)
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 0743a0720..259511eb7 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -46,14 +46,14 @@
(def: (write! new-value var)
(All [a] (-> a (Var a) (IO Any)))
- (do {@ io.monad}
+ (do {! io.monad}
[#let [var' (:representation var)]
(^@ old [old-value observers]) (atom.read var')
succeeded? (atom.compare-and-swap old [new-value observers] var')]
(if succeeded?
- (do @
- [_ (monad.map @ (function (_ sink)
- (do @
+ (do !
+ [_ (monad.map ! (function (_ sink)
+ (do !
[result (:: sink feed new-value)]
(case result
(#try.Success _)
@@ -217,10 +217,10 @@
(def: (issue-commit commit)
(All [a] (-> (Commit a) (IO Any)))
(let [entry [commit (promise.promise [])]]
- (do {@ io.monad}
+ (do {! io.monad}
[|commits|&resolve (atom.read pending-commits)]
(loop [[|commits| resolve] |commits|&resolve]
- (do @
+ (do !
[|commits| (promise.poll |commits|)]
(case |commits|
#.None
@@ -238,24 +238,24 @@
(let [[stm-proc output resolve] commit
[finished-tx value] (stm-proc fresh-tx)]
(if (can-commit? finished-tx)
- (do {@ io.monad}
- [_ (monad.map @ commit-var! finished-tx)]
+ (do {! io.monad}
+ [_ (monad.map ! commit-var! finished-tx)]
(resolve value))
(issue-commit commit))))
(def: init-processor!
(IO Any)
- (do {@ io.monad}
+ (do {! io.monad}
[flag (atom.read commit-processor-flag)]
(if flag
(wrap [])
- (do @
+ (do !
[was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
(if was-first?
- (do @
+ (do !
[[promise resolve] (atom.read pending-commits)]
(promise.await (function (recur [head [tail _resolve]])
- (do @
+ (do !
[_ (process-commit head)]
(promise.await recur tail)))
promise))