aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux63
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux37
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux22
3 files changed, 71 insertions, 51 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index a4c345967..0f38c4c3d 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -3,6 +3,7 @@
[abstract
monad]
[control
+ [pipe (#+ case>)]
["." function]
["." try (#+ Try)]
["." exception (#+ exception:)]
@@ -49,13 +50,16 @@
(-> (Rec Mailbox
[(Promise [a Mailbox])
(Resolver [a Mailbox])])
- (List a)))
- (case (promise.poll read)
- (#.Some [head tail])
- (#.Cons head (pending tail))
-
- #.None
- #.Nil))
+ (IO (List a))))
+ (do io.monad
+ [current (promise.poll read)]
+ (case current
+ (#.Some [head tail])
+ (:: @ map (|>> (#.Cons head))
+ (pending tail))
+
+ #.None
+ (wrap #.Nil))))
(abstract: #export (Actor s)
{#.doc "An actor, defined as all the necessities it requires."}
@@ -101,7 +105,10 @@
(do @
[_ (end error state)]
(let [[_ resolve] (get@ #obituary (:representation self))]
- (exec (io.run (resolve [error state (#.Cons head (..pending tail))]))
+ (exec (io.run
+ (do io.monad
+ [pending (..pending tail)]
+ (resolve [error state (#.Cons head pending)])))
(wrap []))))
(#try.Success state')
@@ -111,17 +118,19 @@
(def: #export (alive? actor)
(All [s] (-> (Actor s) (IO Bit)))
(let [[obituary _] (get@ #obituary (:representation actor))]
- (io.io (case (promise.poll obituary)
- #.None
- yes
+ (|> obituary
+ promise.poll
+ (:: io.functor map
+ (|>> (case> #.None
+ yes
- _
- no))))
+ _
+ no))))))
(def: #export (obituary actor)
(All [s] (-> (Actor s) (IO (Maybe (Obituary s)))))
(let [[obituary _] (get@ #obituary (:representation actor))]
- (io.io (promise.poll obituary))))
+ (promise.poll obituary)))
(def: #export (send message actor)
{#.doc "Communicate with an actor through message passing."}
@@ -133,18 +142,20 @@
(do @
[|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))]
(loop [[|mailbox| resolve] |mailbox|&resolve]
- (case (promise.poll |mailbox|)
- #.None
- (do @
- [resolved? (resolve entry)]
- (if resolved?
- (do @
- [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))]
- (wrap true))
- (recur |mailbox|&resolve)))
-
- (#.Some [_ |mailbox|'])
- (recur |mailbox|')))))
+ (do @
+ [|mailbox| (promise.poll |mailbox|)]
+ (case |mailbox|
+ #.None
+ (do @
+ [resolved? (resolve entry)]
+ (if resolved?
+ (do @
+ [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))]
+ (wrap true))
+ (recur |mailbox|&resolve)))
+
+ (#.Some [_ |mailbox|'])
+ (recur |mailbox|'))))))
(wrap false))))
)
)
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
index 49d4247b4..def999622 100644
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -5,6 +5,7 @@
[apply (#+ Apply)]
["." monad (#+ Monad do)]]
[control
+ [pipe (#+ case>)]
["." function]
["." io (#+ IO io)]]
[data
@@ -55,11 +56,10 @@
(def: #export poll
{#.doc "Polls a promise's value."}
- (All [a] (-> (Promise a) (Maybe a)))
+ (All [a] (-> (Promise a) (IO (Maybe a))))
(|>> :representation
atom.read
- io.run
- product.left))
+ (:: io.functor map product.left)))
(def: #export (await f promise)
(All [a] (-> (-> a (IO Any)) (Promise a) (IO Any)))
@@ -76,23 +76,28 @@
(await f (:abstraction promise)))))))
)
-(def: #export (resolved? promise)
+(def: #export resolved?
{#.doc "Checks whether a promise's value has already been resolved."}
- (All [a] (-> (Promise a) Bit))
- (case (poll promise)
- #.None
- #0
-
- (#.Some _)
- #1))
-
-(structure: #export functor (Functor Promise)
+ (All [a] (-> (Promise a) (IO Bit)))
+ (|>> ..poll
+ (:: io.functor map
+ (|>> (case> #.None
+ #0
+
+ (#.Some _)
+ #1)))))
+
+(structure: #export functor
+ (Functor Promise)
+
(def: (map f fa)
(let [[fb resolve] (..promise [])]
(exec (io.run (await (|>> f resolve) fa))
fb))))
-(structure: #export apply (Apply Promise)
+(structure: #export apply
+ (Apply Promise)
+
(def: &functor ..functor)
(def: (apply ff fa)
@@ -102,7 +107,9 @@
ff))
fb))))
-(structure: #export monad (Monad Promise)
+(structure: #export monad
+ (Monad Promise)
+
(def: &functor ..functor)
(def: wrap ..resolved)
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 9d97b389f..783bc2117 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -202,16 +202,18 @@
(do io.monad
[|commits|&resolve (atom.read pending-commits)]
(loop [[|commits| resolve] |commits|&resolve]
- (case (promise.poll |commits|)
- #.None
- (do io.monad
- [resolved? (resolve entry)]
- (if resolved?
- (atom.write (product.right entry) pending-commits)
- (recur |commits|&resolve)))
-
- (#.Some [head tail])
- (recur tail))))))
+ (do @
+ [|commits| (promise.poll |commits|)]
+ (case |commits|
+ #.None
+ (do io.monad
+ [resolved? (resolve entry)]
+ (if resolved?
+ (atom.write (product.right entry) pending-commits)
+ (recur |commits|&resolve)))
+
+ (#.Some [head tail])
+ (recur tail)))))))
(def: (process-commit commit)
(All [a] (-> (Commit a) (IO Any)))