diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 63 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 37 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 22 |
3 files changed, 71 insertions, 51 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index a4c345967..0f38c4c3d 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -3,6 +3,7 @@ [abstract monad] [control + [pipe (#+ case>)] ["." function] ["." try (#+ Try)] ["." exception (#+ exception:)] @@ -49,13 +50,16 @@ (-> (Rec Mailbox [(Promise [a Mailbox]) (Resolver [a Mailbox])]) - (List a))) - (case (promise.poll read) - (#.Some [head tail]) - (#.Cons head (pending tail)) - - #.None - #.Nil)) + (IO (List a)))) + (do io.monad + [current (promise.poll read)] + (case current + (#.Some [head tail]) + (:: @ map (|>> (#.Cons head)) + (pending tail)) + + #.None + (wrap #.Nil)))) (abstract: #export (Actor s) {#.doc "An actor, defined as all the necessities it requires."} @@ -101,7 +105,10 @@ (do @ [_ (end error state)] (let [[_ resolve] (get@ #obituary (:representation self))] - (exec (io.run (resolve [error state (#.Cons head (..pending tail))])) + (exec (io.run + (do io.monad + [pending (..pending tail)] + (resolve [error state (#.Cons head pending)]))) (wrap [])))) (#try.Success state') @@ -111,17 +118,19 @@ (def: #export (alive? actor) (All [s] (-> (Actor s) (IO Bit))) (let [[obituary _] (get@ #obituary (:representation actor))] - (io.io (case (promise.poll obituary) - #.None - yes + (|> obituary + promise.poll + (:: io.functor map + (|>> (case> #.None + yes - _ - no)))) + _ + no)))))) (def: #export (obituary actor) (All [s] (-> (Actor s) (IO (Maybe (Obituary s))))) (let [[obituary _] (get@ #obituary (:representation actor))] - (io.io (promise.poll obituary)))) + (promise.poll obituary))) (def: #export (send message actor) {#.doc "Communicate with an actor through message passing."} @@ -133,18 +142,20 @@ (do @ [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))] (loop [[|mailbox| resolve] |mailbox|&resolve] - (case (promise.poll |mailbox|) - #.None - (do @ - [resolved? (resolve entry)] - (if resolved? - (do @ - [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] - (wrap true)) - (recur |mailbox|&resolve))) - - (#.Some [_ |mailbox|']) - (recur |mailbox|'))))) + (do @ + [|mailbox| (promise.poll |mailbox|)] + (case |mailbox| + #.None + (do @ + [resolved? (resolve entry)] + (if resolved? + (do @ + [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] + (wrap true)) + (recur |mailbox|&resolve))) + + (#.Some [_ |mailbox|']) + (recur |mailbox|')))))) (wrap false)))) ) ) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index 49d4247b4..def999622 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -5,6 +5,7 @@ [apply (#+ Apply)] ["." monad (#+ Monad do)]] [control + [pipe (#+ case>)] ["." function] ["." io (#+ IO io)]] [data @@ -55,11 +56,10 @@ (def: #export poll {#.doc "Polls a promise's value."} - (All [a] (-> (Promise a) (Maybe a))) + (All [a] (-> (Promise a) (IO (Maybe a)))) (|>> :representation atom.read - io.run - product.left)) + (:: io.functor map product.left))) (def: #export (await f promise) (All [a] (-> (-> a (IO Any)) (Promise a) (IO Any))) @@ -76,23 +76,28 @@ (await f (:abstraction promise))))))) ) -(def: #export (resolved? promise) +(def: #export resolved? {#.doc "Checks whether a promise's value has already been resolved."} - (All [a] (-> (Promise a) Bit)) - (case (poll promise) - #.None - #0 - - (#.Some _) - #1)) - -(structure: #export functor (Functor Promise) + (All [a] (-> (Promise a) (IO Bit))) + (|>> ..poll + (:: io.functor map + (|>> (case> #.None + #0 + + (#.Some _) + #1))))) + +(structure: #export functor + (Functor Promise) + (def: (map f fa) (let [[fb resolve] (..promise [])] (exec (io.run (await (|>> f resolve) fa)) fb)))) -(structure: #export apply (Apply Promise) +(structure: #export apply + (Apply Promise) + (def: &functor ..functor) (def: (apply ff fa) @@ -102,7 +107,9 @@ ff)) fb)))) -(structure: #export monad (Monad Promise) +(structure: #export monad + (Monad Promise) + (def: &functor ..functor) (def: wrap ..resolved) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 9d97b389f..783bc2117 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -202,16 +202,18 @@ (do io.monad [|commits|&resolve (atom.read pending-commits)] (loop [[|commits| resolve] |commits|&resolve] - (case (promise.poll |commits|) - #.None - (do io.monad - [resolved? (resolve entry)] - (if resolved? - (atom.write (product.right entry) pending-commits) - (recur |commits|&resolve))) - - (#.Some [head tail]) - (recur tail)))))) + (do @ + [|commits| (promise.poll |commits|)] + (case |commits| + #.None + (do io.monad + [resolved? (resolve entry)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|&resolve))) + + (#.Some [head tail]) + (recur tail))))))) (def: (process-commit commit) (All [a] (-> (Commit a) (IO Any))) |