diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 18 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 28 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 8 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 6 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 24 |
5 files changed, 42 insertions, 42 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index f8458caf3..3c423692a 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -53,11 +53,11 @@ [(Promise [a Mailbox]) (Resolver [a Mailbox])]) (IO (List a)))) - (do {@ io.monad} + (do {! io.monad} [current (promise.poll read)] (case current (#.Some [head tail]) - (:: @ map (|>> (#.Cons head)) + (:: ! map (|>> (#.Cons head)) (pending tail)) #.None @@ -99,12 +99,12 @@ (promise.promise [])) process (loop [state init [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))] - (do {@ promise.monad} + (do {! promise.monad} [[head tail] |mailbox| ?state' (handle head state self)] (case ?state' (#try.Failure error) - (do @ + (do ! [_ (end error state)] (let [[_ resolve] (get@ #obituary (:representation self))] (exec (io.run @@ -137,21 +137,21 @@ (def: #export (send message actor) {#.doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bit))) - (do {@ io.monad} + (do {! io.monad} [alive? (..alive? actor)] (if alive? (let [entry [message (promise.promise [])]] - (do @ + (do ! [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))] (loop [[|mailbox| resolve] |mailbox|&resolve] - (do @ + (do ! [|mailbox| (promise.poll |mailbox|)] (case |mailbox| #.None - (do @ + (do ! [resolved? (resolve entry)] (if resolved? - (do @ + (do ! [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] (wrap true)) (recur |mailbox|&resolve))) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index 2850f454f..50c26e769 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -40,14 +40,14 @@ (structure (def: close (loop [_ []] - (do {@ io.monad} + (do {! io.monad} [current (atom.read sink) stopped? (current #.None)] (if stopped? ## I closed the sink. (wrap (exception.return [])) ## Someone else interacted with the sink. - (do @ + (do ! [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink. @@ -57,7 +57,7 @@ (def: (feed value) (loop [_ []] - (do {@ io.monad} + (do {! io.monad} [current (atom.read sink) #let [[next resolve-next] (:share [a] {(promise.Resolver (Maybe [a (Channel a)])) @@ -68,11 +68,11 @@ fed? (current (#.Some [value next]))] (if fed? ## I fed the sink. - (do @ + (do ! [_ (atom.compare-and-swap current resolve-next sink)] (wrap (exception.return []))) ## Someone else interacted with the sink. - (do @ + (do ! [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink while I was feeding it. @@ -124,13 +124,13 @@ (let [[output sink] (channel [])] (exec (: (Promise Any) (loop [mma mma] - (do {@ promise.monad} + (do {! promise.monad} [?mma mma] (case ?mma (#.Some [ma mma']) - (do @ + (do ! [_ (loop [ma ma] - (do @ + (do ! [?ma ma] (case ?ma (#.Some [a ma']) @@ -185,14 +185,14 @@ (All [a b] (-> (-> b a (Promise a)) a (Channel b) (Promise a))) - (do {@ promise.monad} + (do {! promise.monad} [cons channel] (case cons #.None (wrap init) (#.Some [head tail]) - (do @ + (do ! [init' (f head init)] (fold f init' tail))))) @@ -201,14 +201,14 @@ (All [a b] (-> (-> b a (Promise a)) a (Channel b) (Channel a))) - (do {@ promise.monad} + (do {! promise.monad} [cons channel] (case cons #.None (wrap (#.Some [init (wrap #.None)])) (#.Some [head tail]) - (do @ + (do ! [init' (f head init)] (wrap (#.Some [init (folds f init' tail)])))))) @@ -265,11 +265,11 @@ (def: #export (consume channel) {#.doc "Reads the entirety of a channel's content and returns it as a list."} (All [a] (-> (Channel a) (Promise (List a)))) - (do {@ promise.monad} + (do {! promise.monad} [cons channel] (case cons (#.Some [head tail]) - (:: @ map (|>> (#.Cons head)) + (:: ! map (|>> (#.Cons head)) (consume tail)) #.None diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index e396b0769..3b6341cf1 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -29,19 +29,19 @@ (All [a] (-> (Promise a) (Resolver a))) (function (resolve value) (let [promise (:representation promise)] - (do {@ io.monad} + (do {! io.monad} [(^@ old [_value _observers]) (atom.read promise)] (case _value (#.Some _) (wrap #0) #.None - (do @ + (do ! [#let [new [(#.Some value) #.None]] succeeded? (atom.compare-and-swap old new promise)] (if succeeded? - (do @ - [_ (monad.map @ (function (_ f) (f value)) + (do ! + [_ (monad.map ! (function (_ f) (f value)) _observers)] (wrap #1)) (resolve value)))))))) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index 83e5ad005..36ac7cd34 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -75,7 +75,7 @@ (let [semaphore (:representation semaphore)] (promise.future (loop [_ []] - (do {@ io.monad} + (do {! io.monad} [state (atom.read semaphore) #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit] (case (queue.peek (get@ #waiting-list state)) @@ -97,11 +97,11 @@ false]))]] (if maxed-out? (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)])) - (do @ + (do ! [#let [open-positions (get@ #open-positions state')] success? (atom.compare-and-swap state state' semaphore)] (if success? - (do @ + (do ! [_ (case ?sink #.None (wrap true) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 0743a0720..259511eb7 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -46,14 +46,14 @@ (def: (write! new-value var) (All [a] (-> a (Var a) (IO Any))) - (do {@ io.monad} + (do {! io.monad} [#let [var' (:representation var)] (^@ old [old-value observers]) (atom.read var') succeeded? (atom.compare-and-swap old [new-value observers] var')] (if succeeded? - (do @ - [_ (monad.map @ (function (_ sink) - (do @ + (do ! + [_ (monad.map ! (function (_ sink) + (do ! [result (:: sink feed new-value)] (case result (#try.Success _) @@ -217,10 +217,10 @@ (def: (issue-commit commit) (All [a] (-> (Commit a) (IO Any))) (let [entry [commit (promise.promise [])]] - (do {@ io.monad} + (do {! io.monad} [|commits|&resolve (atom.read pending-commits)] (loop [[|commits| resolve] |commits|&resolve] - (do @ + (do ! [|commits| (promise.poll |commits|)] (case |commits| #.None @@ -238,24 +238,24 @@ (let [[stm-proc output resolve] commit [finished-tx value] (stm-proc fresh-tx)] (if (can-commit? finished-tx) - (do {@ io.monad} - [_ (monad.map @ commit-var! finished-tx)] + (do {! io.monad} + [_ (monad.map ! commit-var! finished-tx)] (resolve value)) (issue-commit commit)))) (def: init-processor! (IO Any) - (do {@ io.monad} + (do {! io.monad} [flag (atom.read commit-processor-flag)] (if flag (wrap []) - (do @ + (do ! [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] (if was-first? - (do @ + (do ! [[promise resolve] (atom.read pending-commits)] (promise.await (function (recur [head [tail _resolve]]) - (do @ + (do ! [_ (process-commit head)] (promise.await recur tail))) promise)) |