From 43cca9e6b4f25a0a731ba4262e1ee7fae0c6dab7 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sat, 15 Dec 2018 20:52:26 -0400 Subject: Separated reading promises and writing to promises. --- stdlib/source/lux/control/concurrency/actor.lux | 84 ++++++++++------- stdlib/source/lux/control/concurrency/promise.lux | 100 +++++++++++---------- stdlib/source/lux/control/concurrency/stm.lux | 86 ++++++++++-------- stdlib/source/lux/control/concurrency/task.lux | 26 +++--- .../test/test/lux/control/concurrency/promise.lux | 4 - 5 files changed, 165 insertions(+), 135 deletions(-) (limited to 'stdlib') diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index 6f4ddf2ad..9b20dcfde 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -23,7 +23,7 @@ abstract]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise promise) ("promise/." Monad)] + ["." promise (#+ Promise Resolver) ("promise/." Monad)] ["." task (#+ Task)]]) (exception: #export poisoned) @@ -37,11 +37,17 @@ (with-expansions [ (as-is (-> s (Actor s) (Task s))) (as-is [Text s (List )]) - (as-is (Rec Mailbox (Promise [ Mailbox])))] - - (def: (obituary mailbox) - (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) - (case (promise.poll mailbox) + (as-is (Rec Mailbox + [(Promise [ Mailbox]) + (Resolver [ Mailbox])]))] + + (def: (obituary [read write]) + (All [a] + (-> (Rec Mailbox + [(Promise [a Mailbox]) + (Resolver [a Mailbox])]) + (List a))) + (case (promise.poll read) (#.Some [head tail]) (#.Cons head (obituary tail)) @@ -50,12 +56,18 @@ (abstract: #export (Actor s) {#.doc "An actor, defined as all the necessities it requires."} + {#mailbox (Atom ) - #obituary (Promise )} + #obituary [(Promise ) + (Resolver )]} ## TODO: Delete after new-luxc becomes the new standard compiler. (def: (actor mailbox obituary) - (All [s] (-> (Atom ) (Promise ) (Actor s))) + (All [s] + (-> (Atom ) + [(Promise ) + (Resolver )] + (Actor s))) (:abstraction {#mailbox mailbox #obituary obituary})) @@ -74,10 +86,10 @@ {#.doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s] (-> (Behavior s) s (IO (Actor s)))) (io (let [[handle end] behavior - self (actor (atom (promise #.None)) - (promise #.None)) + self (actor (atom (promise.promise [])) + (promise.promise [])) process (loop [state init - |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))] + [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))] (do promise.Monad [[head tail] |mailbox| ?state' (handle head state self)] @@ -85,9 +97,9 @@ (#e.Failure error) (do @ [_ (end error state)] - (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] - (get@ #obituary (:representation self)))) - (wrap []))) + (let [[_ resolve] (get@ #obituary (:representation self))] + (exec (io.run (resolve [error state (#.Cons head (..obituary tail))])) + (wrap [])))) (#e.Success state') (recur state' tail))))] @@ -95,35 +107,37 @@ (def: #export (alive? actor) (All [s] (-> (Actor s) Bit)) - (case (promise.poll (get@ #obituary (:representation actor))) - #.None - #1 + (let [[obituary _] (get@ #obituary (:representation actor))] + (case (promise.poll obituary) + #.None + #1 - _ - #0)) + _ + #0))) (def: #export (send message actor) {#.doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bit))) (if (alive? actor) - (let [entry [message (promise #.None)]] + (let [entry [message (promise.promise [])]] (do Monad - [|mailbox| (atom.read (get@ #mailbox (:representation actor)))] - (loop [|mailbox| |mailbox|] + [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))] + (loop [[|mailbox| resolve] |mailbox|&resolve] (case (promise.poll |mailbox|) #.None (do @ - [resolved? (promise.resolve entry |mailbox|)] + [resolved? (resolve entry)] (if resolved? (do @ [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] (wrap #1)) - (recur |mailbox|))) + (recur |mailbox|&resolve))) (#.Some [_ |mailbox|']) (recur |mailbox|'))))) (io/wrap #0))) - )) + ) + ) ## [Values] (def: (default-handle message state self) @@ -153,7 +167,7 @@ (-> Name cs.Annotations cs.Annotations) (|>> (#.Cons [(name-of ) (code.tag name)]))) - + (def: #export ( name) (-> Name (Meta Name)) (do Monad @@ -211,16 +225,16 @@ {#.doc (doc "Defines an actor, with its behavior and internal state." (actor: #export Counter Nat - + ((stop cause state) (:: promise.Monad wrap (log! (if (ex.match? ..poisoned cause) (format "Counter was poisoned: " (%n state)) cause))))) - + (actor: #export (Stack a) (List a) - + ((handle message state self) (do task.Monad [#let [_ (log! "BEFORE")] @@ -317,7 +331,7 @@ (push [value a] state self (List a)) (let [state' (#.Cons value state)] (task.return [state' state']))))} - (with-gensyms [g!_ g!return g!error g!task g!sent?] + (with-gensyms [g!_ g!return g!error g!task g!sent? g!resolve] (do @ [current-module macro.current-module-name actor-name (resolve-actor actor-name) @@ -350,7 +364,9 @@ (with-message actor-name) csw.annotations)) (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) - (let [(~ g!task) (task.task (~ g!outputT))] + (let [[(~ g!task) (~ g!resolve)] (: [(task.Task (~ g!outputT)) + (task.Resolver (~ g!outputT))] + (task.task []))] (io.run (do io.Monad [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) (do promise.Monad @@ -361,11 +377,11 @@ (~ body)))] (case (~ g!return) (#.Right [(~ g!state) (~ g!return)]) - (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) + (exec (io.run ((~ g!resolve) (#.Right (~ g!return)))) (task.return (~ g!state))) - + (#.Left (~ g!error)) - (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) + (exec (io.run ((~ g!resolve) (#.Left (~ g!error)))) (task.fail (~ g!error)))) )) (~ g!self))] diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index 1a471022f..2530d6080 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -18,9 +18,38 @@ {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} (Atom [(Maybe a) (List (-> a (IO Any)))]) - (def: #export (promise ?value) - (All [a] (-> (Maybe a) (Promise a))) - (:abstraction (atom [?value (list)]))) + (type: #export (Resolver a) + (-> a (IO Bit))) + + (def: (resolver (^:representation promise)) + {#.doc "Sets an promise's value if it has not been done yet."} + (All [a] (-> (Promise a) (Resolver a))) + (function (resolve value) + (do io.Monad + [(^@ old [_value _observers]) (atom.read promise)] + (case _value + (#.Some _) + (wrap #0) + + #.None + (do @ + [#let [new [(#.Some value) #.None]] + succeeded? (atom.compare-and-swap old new promise)] + (if succeeded? + (do @ + [_ (monad.map @ (function (_ f) (f value)) + _observers)] + (wrap #1)) + (resolve value))))))) + + (def: #export (resolved value) + (All [a] (-> a (Promise a))) + (:abstraction (atom [(#.Some value) (list)]))) + + (def: #export (promise _) + (All [a] (-> Any [(Promise a) (Resolver a)])) + (let [promise (:abstraction (atom [#.None (list)]))] + [promise (..resolver promise)])) (def: #export (poll (^:representation promise)) {#.doc "Polls a promise's value."} @@ -29,37 +58,17 @@ io.run product.left)) - (def: #export (resolve value (^:representation promise)) - {#.doc "Sets an promise's value if it has not been done yet."} - (All [a] (-> a (Promise a) (IO Bit))) - (do io.Monad - [(^@ old [_value _observers]) (atom.read promise)] - (case _value - (#.Some _) - (wrap #0) - - #.None - (do @ - [#let [new [(#.Some value) #.None]] - succeeded? (atom.compare-and-swap old new promise)] - (if succeeded? - (do @ - [_ (monad.map @ (function (_ f) (f value)) - _observers)] - (wrap #1)) - (resolve value (:abstraction promise))))))) - (def: #export (await f (^:representation promise)) - (All [a] (-> (-> a (IO Any)) (Promise a) Any)) + (All [a] (-> (-> a (IO Any)) (Promise a) (IO Any))) (let [(^@ old [_value _observers]) (io.run (atom.read promise))] (case _value (#.Some value) - (io.run (f value)) + (f value) #.None (let [new [_value (#.Cons f _observers)]] (if (io.run (atom.compare-and-swap old new promise)) - [] + (io.io []) (await f (:abstraction promise))))))) ) @@ -75,34 +84,31 @@ (structure: #export _ (Functor Promise) (def: (map f fa) - (let [fb (promise #.None)] - (exec (await (function (_ a) (resolve (f a) fb)) - fa) + (let [[fb resolve] (..promise [])] + (exec (io.run (await (|>> f resolve) fa)) fb)))) (structure: #export _ (Apply Promise) (def: functor Functor) (def: (apply ff fa) - (let [fb (promise #.None)] - (exec (await (function (_ f) - (io (await (function (_ a) (resolve (f a) fb)) - fa))) - ff) + (let [[fb resolve] (..promise [])] + (exec (io.run (await (function (_ f) + (await (|>> f resolve) fa)) + ff)) fb)))) (structure: #export _ (Monad Promise) (def: functor Functor) (def: (wrap a) - (promise (#.Some a))) + (..resolved a)) (def: (join mma) - (let [ma (promise #.None)] - (exec (await (function (_ ma') - (io (await (function (_ a') (resolve a' ma)) - ma'))) - mma) + (let [[ma resolve] (promise [])] + (exec (io.run (await (function (_ ma') + (await resolve ma')) + mma)) ma)))) (def: #export (and left right) @@ -116,11 +122,10 @@ (def: #export (or left right) {#.doc "Heterogeneous alternative combinator."} (All [a b] (-> (Promise a) (Promise b) (Promise (| a b)))) - (let [a|b (promise #.None)] + (let [[a|b resolve] (..promise [])] (with-expansions [ (do-template [ ] - [(await (function (_ value) (resolve ( value) a|b)) - )] + [(io.run (await (|>> resolve) ))] [left #.Left] [right #.Right] @@ -131,10 +136,9 @@ (def: #export (either left right) {#.doc "Homogeneous alternative combinator."} (All [a] (-> (Promise a) (Promise a) (Promise a))) - (let [left||right (promise #.None)] + (let [[left||right resolve] (..promise [])] (`` (exec (~~ (do-template [] - [(await (function (_ value) (resolve value left||right)) - )] + [(io.run (await resolve ))] [left] [right])) @@ -144,10 +148,10 @@ {#.doc (doc "Runs an I/O computation on its own process (after a specified delay)." "Returns a Promise that will eventually host its result.")} (All [a] (-> Nat (IO a) (Promise a))) - (let [!out (promise #.None)] + (let [[!out resolve] (..promise [])] (exec (|> (do io.Monad [value computation] - (resolve value !out)) + (resolve value)) (process.schedule millis-delay) io.run) !out))) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 3203b2d52..f54e16baf 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -9,12 +9,12 @@ ["." product] ["." maybe] [collection - ["." list ("list/." Functor Fold)]]] + ["." list]]] [type abstract]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise promise)] + ["." promise (#+ Promise Resolver)] ["." frp ("frp/." Functor)]]) (type: #export (Observer a) @@ -177,45 +177,54 @@ tx)) (def: (commit-var! [_var _original _current]) - (-> (Ex [a] (Tx-Frame a)) Any) + (-> (Ex [a] (Tx-Frame a)) (IO Any)) (if (is? _original _current) - [] - (io.run (write! _current _var)))) + (io []) + (write! _current _var))) (def: fresh-tx Tx (list)) -(type: Commit (Ex [a] [(STM a) (Promise a)])) +(type: (Commit a) + [(STM a) + (Promise a) + (Resolver a)]) (def: pending-commits - (Atom (Rec Commits (Promise [Commit Commits]))) - (atom (promise #.None))) + (Atom (Rec Commits + [(Promise [(Ex [a] (Commit a)) Commits]) + (Resolver [(Ex [a] (Commit a)) Commits])])) + (atom (promise.promise []))) (def: commit-processor-flag (Atom Bit) (atom #0)) (def: (issue-commit commit) - (-> Commit (IO Any)) - (let [entry [commit (promise #.None)]] - (loop [|commits| (io.run (atom.read pending-commits))] - (case (promise.poll |commits|) - #.None - (do io.Monad - [resolved? (promise.resolve entry |commits|)] - (if resolved? - (atom.write (product.right entry) pending-commits) - (recur |commits|))) - - (#.Some [head tail]) - (recur tail))))) - -(def: (process-commit [stm-proc output]) - (-> [(STM Any) (Promise Any)] Any) - (let [[finished-tx value] (stm-proc fresh-tx)] - (io.run (if (can-commit? finished-tx) - (exec (list/map commit-var! finished-tx) - (promise.resolve value output)) - (issue-commit [stm-proc output]))))) + (All [a] (-> (Commit a) (IO Any))) + (let [entry [commit (promise.promise [])]] + (do io.Monad + [|commits|&resolve (atom.read pending-commits)] + (loop [[|commits| resolve] |commits|&resolve] + (case (promise.poll |commits|) + #.None + (do io.Monad + [resolved? (resolve entry)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|&resolve))) + + (#.Some [head tail]) + (recur tail)))))) + +(def: (process-commit commit) + (All [a] (-> (Commit a) (IO Any))) + (let [[stm-proc output resolve] commit + [finished-tx value] (stm-proc fresh-tx)] + (if (can-commit? finished-tx) + (do io.Monad + [_ (monad.map @ commit-var! finished-tx)] + (resolve value)) + (issue-commit commit)))) (def: init-processor! (IO Any) @@ -226,11 +235,13 @@ (do @ [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] (if was-first? - (exec (|> (io.run (atom.read pending-commits)) - (promise.await (function (recur [head tail]) - (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head)) - (promise.await recur tail)))))) - (wrap [])) + (do @ + [[promise resolve] (atom.read pending-commits)] + (promise.await (function (recur [head [tail _resolve]]) + (do @ + [_ (process-commit head)] + (promise.await recur tail))) + promise)) (wrap []))) ))) @@ -239,7 +250,8 @@ "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} (All [a] (-> (STM a) (Promise a))) - (let [output (promise #.None)] - (exec (io.run init-processor!) - (io.run (issue-commit [stm-proc output])) + (let [[output resolver] (promise.promise [])] + (exec (io.run (do io.Monad + [_ init-processor!] + (issue-commit [stm-proc output resolver]))) output))) diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux index 96bc40f0a..a5bf17819 100644 --- a/stdlib/source/lux/control/concurrency/task.lux +++ b/stdlib/source/lux/control/concurrency/task.lux @@ -15,19 +15,23 @@ (type: #export (Task a) (Promise (Error a))) -(def: #export (fail error) - (All [a] (-> Text (Task a))) - (:: promise.Monad wrap (#error.Failure error))) +(type: #export (Resolver a) + (promise.Resolver (Error a))) + +(do-template [ ] + [(def: #export + (All [a] (-> (Task a))) + (|>> promise.resolved))] + + [return a #error.Success] + [fail Text #error.Failure] + ) (def: #export (throw exception message) (All [e a] (-> (Exception e) e (Task a))) (:: promise.Monad wrap (ex.throw exception message))) -(def: #export (return value) - (All [a] (-> a (Task a))) - (:: promise.Monad wrap (#error.Success value))) - (def: #export (try computation) (All [a] (-> (Task a) (Task (Error a)))) (:: promise.Functor map (|>> #error.Success) computation)) @@ -71,11 +75,9 @@ (#error.Success ma) ma)))) -(syntax: #export (task {type s.any}) - {#.doc (doc "Makes an uninitialized Task (in this example, of Any)." - (task Any))} - (wrap (list (` (: (..Task (~ type)) - (promise.promise #.None)))))) +(def: #export task + (All [a] (-> Any [(Task a) (Resolver a)])) + promise.promise) (def: #export (from-promise promise) (All [a] (-> (Promise a) (Task a))) diff --git a/stdlib/test/test/lux/control/concurrency/promise.lux b/stdlib/test/test/lux/control/concurrency/promise.lux index d666f3b31..0ea05c46a 100644 --- a/stdlib/test/test/lux/control/concurrency/promise.lux +++ b/stdlib/test/test/lux/control/concurrency/promise.lux @@ -55,10 +55,6 @@ (|> (&.poll (&.delay 200 #1)) (case> #.None #1 _ #0)))) - (test "Cannot re-resolve a resolved promise." - (and (not (io.run (&.resolve #0 (&/wrap #1)))) - (io.run (&.resolve #1 (: (&.Promise Bit) (&.promise #.None)))))) - (wrap (do &.Monad [?none (&.time-out 100 (&.delay 200 #1)) ?some (&.time-out 200 (&.delay 100 #1))] -- cgit v1.2.3