aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEduardo Julian2018-12-15 20:52:26 -0400
committerEduardo Julian2018-12-15 20:52:26 -0400
commit43cca9e6b4f25a0a731ba4262e1ee7fae0c6dab7 (patch)
treeb4c49855a827493e0f11654f1c63218836ec2329
parent00b9d6913fd39266208c7c8b61250967bb0f6636 (diff)
Separated reading promises and writing to promises.
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux84
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux100
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux86
-rw-r--r--stdlib/source/lux/control/concurrency/task.lux26
-rw-r--r--stdlib/test/test/lux/control/concurrency/promise.lux4
5 files changed, 165 insertions, 135 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index 6f4ddf2ad..9b20dcfde 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -23,7 +23,7 @@
abstract]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise promise) ("promise/." Monad<Promise>)]
+ ["." promise (#+ Promise Resolver) ("promise/." Monad<Promise>)]
["." task (#+ Task)]])
(exception: #export poisoned)
@@ -37,11 +37,17 @@
(with-expansions
[<Message> (as-is (-> s (Actor s) (Task s)))
<Obituary> (as-is [Text s (List <Message>)])
- <Mailbox> (as-is (Rec Mailbox (Promise [<Message> Mailbox])))]
-
- (def: (obituary mailbox)
- (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a)))
- (case (promise.poll mailbox)
+ <Mailbox> (as-is (Rec Mailbox
+ [(Promise [<Message> Mailbox])
+ (Resolver [<Message> Mailbox])]))]
+
+ (def: (obituary [read write])
+ (All [a]
+ (-> (Rec Mailbox
+ [(Promise [a Mailbox])
+ (Resolver [a Mailbox])])
+ (List a)))
+ (case (promise.poll read)
(#.Some [head tail])
(#.Cons head (obituary tail))
@@ -50,12 +56,18 @@
(abstract: #export (Actor s)
{#.doc "An actor, defined as all the necessities it requires."}
+
{#mailbox (Atom <Mailbox>)
- #obituary (Promise <Obituary>)}
+ #obituary [(Promise <Obituary>)
+ (Resolver <Obituary>)]}
## TODO: Delete after new-luxc becomes the new standard compiler.
(def: (actor mailbox obituary)
- (All [s] (-> (Atom <Mailbox>) (Promise <Obituary>) (Actor s)))
+ (All [s]
+ (-> (Atom <Mailbox>)
+ [(Promise <Obituary>)
+ (Resolver <Obituary>)]
+ (Actor s)))
(:abstraction {#mailbox mailbox
#obituary obituary}))
@@ -74,10 +86,10 @@
{#.doc "Given a behavior and initial state, spawns an actor and returns it."}
(All [s] (-> (Behavior s) s (IO (Actor s))))
(io (let [[handle end] behavior
- self (actor (atom (promise #.None))
- (promise #.None))
+ self (actor (atom (promise.promise []))
+ (promise.promise []))
process (loop [state init
- |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))]
+ [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]
(do promise.Monad<Promise>
[[head tail] |mailbox|
?state' (handle head state self)]
@@ -85,9 +97,9 @@
(#e.Failure error)
(do @
[_ (end error state)]
- (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))]
- (get@ #obituary (:representation self))))
- (wrap [])))
+ (let [[_ resolve] (get@ #obituary (:representation self))]
+ (exec (io.run (resolve [error state (#.Cons head (..obituary tail))]))
+ (wrap []))))
(#e.Success state')
(recur state' tail))))]
@@ -95,35 +107,37 @@
(def: #export (alive? actor)
(All [s] (-> (Actor s) Bit))
- (case (promise.poll (get@ #obituary (:representation actor)))
- #.None
- #1
+ (let [[obituary _] (get@ #obituary (:representation actor))]
+ (case (promise.poll obituary)
+ #.None
+ #1
- _
- #0))
+ _
+ #0)))
(def: #export (send message actor)
{#.doc "Communicate with an actor through message passing."}
(All [s] (-> (Message s) (Actor s) (IO Bit)))
(if (alive? actor)
- (let [entry [message (promise #.None)]]
+ (let [entry [message (promise.promise [])]]
(do Monad<IO>
- [|mailbox| (atom.read (get@ #mailbox (:representation actor)))]
- (loop [|mailbox| |mailbox|]
+ [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))]
+ (loop [[|mailbox| resolve] |mailbox|&resolve]
(case (promise.poll |mailbox|)
#.None
(do @
- [resolved? (promise.resolve entry |mailbox|)]
+ [resolved? (resolve entry)]
(if resolved?
(do @
[_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))]
(wrap #1))
- (recur |mailbox|)))
+ (recur |mailbox|&resolve)))
(#.Some [_ |mailbox|'])
(recur |mailbox|')))))
(io/wrap #0)))
- ))
+ )
+ )
## [Values]
(def: (default-handle message state self)
@@ -153,7 +167,7 @@
(-> Name cs.Annotations cs.Annotations)
(|>> (#.Cons [(name-of <tag>)
(code.tag name)])))
-
+
(def: #export (<resolve> name)
(-> Name (Meta Name))
(do Monad<Meta>
@@ -211,16 +225,16 @@
{#.doc (doc "Defines an actor, with its behavior and internal state."
(actor: #export Counter
Nat
-
+
((stop cause state)
(:: promise.Monad<Promise> wrap
(log! (if (ex.match? ..poisoned cause)
(format "Counter was poisoned: " (%n state))
cause)))))
-
+
(actor: #export (Stack a)
(List a)
-
+
((handle message state self)
(do task.Monad<Task>
[#let [_ (log! "BEFORE")]
@@ -317,7 +331,7 @@
(push [value a] state self (List a))
(let [state' (#.Cons value state)]
(task.return [state' state']))))}
- (with-gensyms [g!_ g!return g!error g!task g!sent?]
+ (with-gensyms [g!_ g!return g!error g!task g!sent? g!resolve]
(do @
[current-module macro.current-module-name
actor-name (resolve-actor actor-name)
@@ -350,7 +364,9 @@
(with-message actor-name)
csw.annotations))
(All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature)))))
- (let [(~ g!task) (task.task (~ g!outputT))]
+ (let [[(~ g!task) (~ g!resolve)] (: [(task.Task (~ g!outputT))
+ (task.Resolver (~ g!outputT))]
+ (task.task []))]
(io.run (do io.Monad<IO>
[(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self))
(do promise.Monad<Promise>
@@ -361,11 +377,11 @@
(~ body)))]
(case (~ g!return)
(#.Right [(~ g!state) (~ g!return)])
- (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task)))
+ (exec (io.run ((~ g!resolve) (#.Right (~ g!return))))
(task.return (~ g!state)))
-
+
(#.Left (~ g!error))
- (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task)))
+ (exec (io.run ((~ g!resolve) (#.Left (~ g!error))))
(task.fail (~ g!error))))
))
(~ g!self))]
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
index 1a471022f..2530d6080 100644
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -18,9 +18,38 @@
{#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."}
(Atom [(Maybe a) (List (-> a (IO Any)))])
- (def: #export (promise ?value)
- (All [a] (-> (Maybe a) (Promise a)))
- (:abstraction (atom [?value (list)])))
+ (type: #export (Resolver a)
+ (-> a (IO Bit)))
+
+ (def: (resolver (^:representation promise))
+ {#.doc "Sets an promise's value if it has not been done yet."}
+ (All [a] (-> (Promise a) (Resolver a)))
+ (function (resolve value)
+ (do io.Monad<IO>
+ [(^@ old [_value _observers]) (atom.read promise)]
+ (case _value
+ (#.Some _)
+ (wrap #0)
+
+ #.None
+ (do @
+ [#let [new [(#.Some value) #.None]]
+ succeeded? (atom.compare-and-swap old new promise)]
+ (if succeeded?
+ (do @
+ [_ (monad.map @ (function (_ f) (f value))
+ _observers)]
+ (wrap #1))
+ (resolve value)))))))
+
+ (def: #export (resolved value)
+ (All [a] (-> a (Promise a)))
+ (:abstraction (atom [(#.Some value) (list)])))
+
+ (def: #export (promise _)
+ (All [a] (-> Any [(Promise a) (Resolver a)]))
+ (let [promise (:abstraction (atom [#.None (list)]))]
+ [promise (..resolver promise)]))
(def: #export (poll (^:representation promise))
{#.doc "Polls a promise's value."}
@@ -29,37 +58,17 @@
io.run
product.left))
- (def: #export (resolve value (^:representation promise))
- {#.doc "Sets an promise's value if it has not been done yet."}
- (All [a] (-> a (Promise a) (IO Bit)))
- (do io.Monad<IO>
- [(^@ old [_value _observers]) (atom.read promise)]
- (case _value
- (#.Some _)
- (wrap #0)
-
- #.None
- (do @
- [#let [new [(#.Some value) #.None]]
- succeeded? (atom.compare-and-swap old new promise)]
- (if succeeded?
- (do @
- [_ (monad.map @ (function (_ f) (f value))
- _observers)]
- (wrap #1))
- (resolve value (:abstraction promise)))))))
-
(def: #export (await f (^:representation promise))
- (All [a] (-> (-> a (IO Any)) (Promise a) Any))
+ (All [a] (-> (-> a (IO Any)) (Promise a) (IO Any)))
(let [(^@ old [_value _observers]) (io.run (atom.read promise))]
(case _value
(#.Some value)
- (io.run (f value))
+ (f value)
#.None
(let [new [_value (#.Cons f _observers)]]
(if (io.run (atom.compare-and-swap old new promise))
- []
+ (io.io [])
(await f (:abstraction promise)))))))
)
@@ -75,34 +84,31 @@
(structure: #export _ (Functor Promise)
(def: (map f fa)
- (let [fb (promise #.None)]
- (exec (await (function (_ a) (resolve (f a) fb))
- fa)
+ (let [[fb resolve] (..promise [])]
+ (exec (io.run (await (|>> f resolve) fa))
fb))))
(structure: #export _ (Apply Promise)
(def: functor Functor<Promise>)
(def: (apply ff fa)
- (let [fb (promise #.None)]
- (exec (await (function (_ f)
- (io (await (function (_ a) (resolve (f a) fb))
- fa)))
- ff)
+ (let [[fb resolve] (..promise [])]
+ (exec (io.run (await (function (_ f)
+ (await (|>> f resolve) fa))
+ ff))
fb))))
(structure: #export _ (Monad Promise)
(def: functor Functor<Promise>)
(def: (wrap a)
- (promise (#.Some a)))
+ (..resolved a))
(def: (join mma)
- (let [ma (promise #.None)]
- (exec (await (function (_ ma')
- (io (await (function (_ a') (resolve a' ma))
- ma')))
- mma)
+ (let [[ma resolve] (promise [])]
+ (exec (io.run (await (function (_ ma')
+ (await resolve ma'))
+ mma))
ma))))
(def: #export (and left right)
@@ -116,11 +122,10 @@
(def: #export (or left right)
{#.doc "Heterogeneous alternative combinator."}
(All [a b] (-> (Promise a) (Promise b) (Promise (| a b))))
- (let [a|b (promise #.None)]
+ (let [[a|b resolve] (..promise [])]
(with-expansions
[<sides> (do-template [<promise> <tag>]
- [(await (function (_ value) (resolve (<tag> value) a|b))
- <promise>)]
+ [(io.run (await (|>> <tag> resolve) <promise>))]
[left #.Left]
[right #.Right]
@@ -131,10 +136,9 @@
(def: #export (either left right)
{#.doc "Homogeneous alternative combinator."}
(All [a] (-> (Promise a) (Promise a) (Promise a)))
- (let [left||right (promise #.None)]
+ (let [[left||right resolve] (..promise [])]
(`` (exec (~~ (do-template [<promise>]
- [(await (function (_ value) (resolve value left||right))
- <promise>)]
+ [(io.run (await resolve <promise>))]
[left]
[right]))
@@ -144,10 +148,10 @@
{#.doc (doc "Runs an I/O computation on its own process (after a specified delay)."
"Returns a Promise that will eventually host its result.")}
(All [a] (-> Nat (IO a) (Promise a)))
- (let [!out (promise #.None)]
+ (let [[!out resolve] (..promise [])]
(exec (|> (do io.Monad<IO>
[value computation]
- (resolve value !out))
+ (resolve value))
(process.schedule millis-delay)
io.run)
!out)))
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 3203b2d52..f54e16baf 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -9,12 +9,12 @@
["." product]
["." maybe]
[collection
- ["." list ("list/." Functor<List> Fold<List>)]]]
+ ["." list]]]
[type
abstract]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise promise)]
+ ["." promise (#+ Promise Resolver)]
["." frp ("frp/." Functor<Channel>)]])
(type: #export (Observer a)
@@ -177,45 +177,54 @@
tx))
(def: (commit-var! [_var _original _current])
- (-> (Ex [a] (Tx-Frame a)) Any)
+ (-> (Ex [a] (Tx-Frame a)) (IO Any))
(if (is? _original _current)
- []
- (io.run (write! _current _var))))
+ (io [])
+ (write! _current _var)))
(def: fresh-tx Tx (list))
-(type: Commit (Ex [a] [(STM a) (Promise a)]))
+(type: (Commit a)
+ [(STM a)
+ (Promise a)
+ (Resolver a)])
(def: pending-commits
- (Atom (Rec Commits (Promise [Commit Commits])))
- (atom (promise #.None)))
+ (Atom (Rec Commits
+ [(Promise [(Ex [a] (Commit a)) Commits])
+ (Resolver [(Ex [a] (Commit a)) Commits])]))
+ (atom (promise.promise [])))
(def: commit-processor-flag
(Atom Bit)
(atom #0))
(def: (issue-commit commit)
- (-> Commit (IO Any))
- (let [entry [commit (promise #.None)]]
- (loop [|commits| (io.run (atom.read pending-commits))]
- (case (promise.poll |commits|)
- #.None
- (do io.Monad<IO>
- [resolved? (promise.resolve entry |commits|)]
- (if resolved?
- (atom.write (product.right entry) pending-commits)
- (recur |commits|)))
-
- (#.Some [head tail])
- (recur tail)))))
-
-(def: (process-commit [stm-proc output])
- (-> [(STM Any) (Promise Any)] Any)
- (let [[finished-tx value] (stm-proc fresh-tx)]
- (io.run (if (can-commit? finished-tx)
- (exec (list/map commit-var! finished-tx)
- (promise.resolve value output))
- (issue-commit [stm-proc output])))))
+ (All [a] (-> (Commit a) (IO Any)))
+ (let [entry [commit (promise.promise [])]]
+ (do io.Monad<IO>
+ [|commits|&resolve (atom.read pending-commits)]
+ (loop [[|commits| resolve] |commits|&resolve]
+ (case (promise.poll |commits|)
+ #.None
+ (do io.Monad<IO>
+ [resolved? (resolve entry)]
+ (if resolved?
+ (atom.write (product.right entry) pending-commits)
+ (recur |commits|&resolve)))
+
+ (#.Some [head tail])
+ (recur tail))))))
+
+(def: (process-commit commit)
+ (All [a] (-> (Commit a) (IO Any)))
+ (let [[stm-proc output resolve] commit
+ [finished-tx value] (stm-proc fresh-tx)]
+ (if (can-commit? finished-tx)
+ (do io.Monad<IO>
+ [_ (monad.map @ commit-var! finished-tx)]
+ (resolve value))
+ (issue-commit commit))))
(def: init-processor!
(IO Any)
@@ -226,11 +235,13 @@
(do @
[was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
(if was-first?
- (exec (|> (io.run (atom.read pending-commits))
- (promise.await (function (recur [head tail])
- (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head))
- (promise.await recur tail))))))
- (wrap []))
+ (do @
+ [[promise resolve] (atom.read pending-commits)]
+ (promise.await (function (recur [head [tail _resolve]])
+ (do @
+ [_ (process-commit head)]
+ (promise.await recur tail)))
+ promise))
(wrap [])))
)))
@@ -239,7 +250,8 @@
"Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
"For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
(All [a] (-> (STM a) (Promise a)))
- (let [output (promise #.None)]
- (exec (io.run init-processor!)
- (io.run (issue-commit [stm-proc output]))
+ (let [[output resolver] (promise.promise [])]
+ (exec (io.run (do io.Monad<IO>
+ [_ init-processor!]
+ (issue-commit [stm-proc output resolver])))
output)))
diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux
index 96bc40f0a..a5bf17819 100644
--- a/stdlib/source/lux/control/concurrency/task.lux
+++ b/stdlib/source/lux/control/concurrency/task.lux
@@ -15,19 +15,23 @@
(type: #export (Task a)
(Promise (Error a)))
-(def: #export (fail error)
- (All [a] (-> Text (Task a)))
- (:: promise.Monad<Promise> wrap (#error.Failure error)))
+(type: #export (Resolver a)
+ (promise.Resolver (Error a)))
+
+(do-template [<name> <input> <tag>]
+ [(def: #export <name>
+ (All [a] (-> <input> (Task a)))
+ (|>> <tag> promise.resolved))]
+
+ [return a #error.Success]
+ [fail Text #error.Failure]
+ )
(def: #export (throw exception message)
(All [e a] (-> (Exception e) e (Task a)))
(:: promise.Monad<Promise> wrap
(ex.throw exception message)))
-(def: #export (return value)
- (All [a] (-> a (Task a)))
- (:: promise.Monad<Promise> wrap (#error.Success value)))
-
(def: #export (try computation)
(All [a] (-> (Task a) (Task (Error a))))
(:: promise.Functor<Promise> map (|>> #error.Success) computation))
@@ -71,11 +75,9 @@
(#error.Success ma)
ma))))
-(syntax: #export (task {type s.any})
- {#.doc (doc "Makes an uninitialized Task (in this example, of Any)."
- (task Any))}
- (wrap (list (` (: (..Task (~ type))
- (promise.promise #.None))))))
+(def: #export task
+ (All [a] (-> Any [(Task a) (Resolver a)]))
+ promise.promise)
(def: #export (from-promise promise)
(All [a] (-> (Promise a) (Task a)))
diff --git a/stdlib/test/test/lux/control/concurrency/promise.lux b/stdlib/test/test/lux/control/concurrency/promise.lux
index d666f3b31..0ea05c46a 100644
--- a/stdlib/test/test/lux/control/concurrency/promise.lux
+++ b/stdlib/test/test/lux/control/concurrency/promise.lux
@@ -55,10 +55,6 @@
(|> (&.poll (&.delay 200 #1))
(case> #.None #1 _ #0))))
- (test "Cannot re-resolve a resolved promise."
- (and (not (io.run (&.resolve #0 (&/wrap #1))))
- (io.run (&.resolve #1 (: (&.Promise Bit) (&.promise #.None))))))
-
(wrap (do &.Monad<Promise>
[?none (&.time-out 100 (&.delay 200 #1))
?some (&.time-out 200 (&.delay 100 #1))]