aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux89
-rw-r--r--stdlib/source/lux/control/concurrency/task.lux83
-rw-r--r--stdlib/source/lux/world/net/tcp.jvm.lux1
-rw-r--r--stdlib/source/lux/world/net/udp.jvm.lux3
-rw-r--r--stdlib/source/test/lux/control/concurrency/actor.lux83
5 files changed, 90 insertions, 169 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index 397a2fdb4..b47785142 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -7,7 +7,7 @@
["." io (#+ IO io) ("#/." monad)]
[data
["." product]
- ["." error]
+ ["." error (#+ Error)]
[text
format]
[collection
@@ -24,8 +24,7 @@
abstract]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver) ("#/." monad)]
- ["." task (#+ Task)]])
+ ["." promise (#+ Promise Resolver) ("#/." monad)]])
(exception: #export poisoned)
@@ -35,7 +34,7 @@
["Message" message-name]))
(with-expansions
- [<Message> (as-is (-> s (Actor s) (Task s)))
+ [<Message> (as-is (-> s (Actor s) (Promise (Error s))))
<Obituary> (as-is [Text s (List <Message>)])
<Mailbox> (as-is (Rec Mailbox
[(Promise [<Message> Mailbox])
@@ -79,7 +78,7 @@
(type: #export (Behavior s)
{#.doc "An actor's behavior when messages are received."}
- {#handle (-> (Message s) s (Actor s) (Task s))
+ {#handle (-> (Message s) s (Actor s) (Promise (Error s)))
#end (-> Text s (Promise Any))})
(def: #export (spawn behavior init)
@@ -140,7 +139,7 @@
)
(def: (default-handle message state self)
- (All [s] (-> (Message s) s (Actor s) (Task s)))
+ (All [s] (-> (Message s) s (Actor s) (Promise (Error s))))
(message state self))
(def: (default-end cause state)
@@ -157,7 +156,7 @@
"but allows the actor to handle previous messages.")}
(All [s] (-> (Actor s) (IO Bit)))
(send (function (_ state self)
- (task.throw poisoned []))
+ (promise.resolved (ex.throw ..poisoned [])))
actor))
(do-template [<with> <resolve> <tag> <desc>]
@@ -234,7 +233,7 @@
(List a)
((handle message state self)
- (do task.monad
+ (do (error.with promise.monad)
[#let [_ (log! "BEFORE")]
output (message state self)
#let [_ (log! "AFTER")]]
@@ -266,9 +265,9 @@
(~ (code.local-identifier messageN))
(~ (code.local-identifier stateN))
(~ (code.local-identifier selfN)))
- (do task.monad
- []
- (~ bodyC))))))
+ ((~! do) ((~! error.with) (~! promise.monad))
+ []
+ (~ bodyC))))))
#..end (~ (case ?stop
#.None
(` (~! ..default-end))
@@ -277,9 +276,9 @@
(` (function ((~ g!_)
(~ (code.local-identifier causeN))
(~ (code.local-identifier stateN)))
- (do promise.monad
- []
- (~ bodyC))))))}))
+ ((~! do) (~! promise.monad)
+ []
+ (~ bodyC))))))}))
(` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init))
(All [(~+ g!vars)]
(-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars)))))
@@ -323,12 +322,12 @@
(message: #export Counter
(count! [increment Nat] state self Nat)
(let [state' (n/+ increment state)]
- (task.return [state' state'])))
+ (promise.resolved (#error.Success [state' state']))))
(message: #export (Stack a)
(push [value a] state self (List a))
(let [state' (#.Cons value state)]
- (task.return [state' state']))))}
+ (promise.resolved (#error.Success [state' state'])))))}
(with-gensyms [g!_ g!return g!error g!task g!sent? g!resolve]
(do @
[current-module macro.current-module-name
@@ -361,30 +360,36 @@
(~ (|> annotations
(with-message actor-name)
csw.annotations))
- (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature)))))
- (let [[(~ g!task) (~ g!resolve)] (: [(task.Task (~ g!outputT))
- (task.Resolver (~ g!outputT))]
- (task.task []))]
- (io.run (do io.monad
- [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self))
- (do promise.monad
- [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs))
- (~ g!outputT)])
- (do task.monad
- []
- (~ body)))]
- (case (~ g!return)
- (#.Right [(~ g!state) (~ g!return)])
- (exec (io.run ((~ g!resolve) (#.Right (~ g!return))))
- (task.return (~ g!state)))
-
- (#.Left (~ g!error))
- (exec (io.run ((~ g!resolve) (#.Left (~ g!error))))
- (task.fail (~ g!error))))
- ))
- (~ g!self))]
- (if (~ g!sent?)
- ((~' wrap) (~ g!task))
- ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name)))
- (~ (code.text (%name message-name)))]))))))))
+ (All [(~+ g!all-vars)]
+ (-> (~+ g!inputsT)
+ (~ actorC)
+ ((~! promise.Promise) ((~! error.Error) (~ (get@ #output signature))))))
+ (let [[(~ g!task) (~ g!resolve)] (: [((~! promise.Promise) ((~! error.Error) (~ g!outputT)))
+ (promise.Resolver ((~! error.Error) (~ g!outputT)))]
+ (promise.promise []))]
+ ((~! io.run) ((~! do) (~! io.monad)
+ [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self))
+ ((~! do) (~! promise.monad)
+ [(~ g!return) (: ((~! promise.Promise)
+ ((~! error.Error)
+ [((~ g!type) (~+ g!actor-refs))
+ (~ g!outputT)]))
+ ((~! do) ((~! error.with) (~! promise.monad))
+ []
+ (~ body)))]
+ (case (~ g!return)
+ (#error.Success [(~ g!state) (~ g!return)])
+ (exec ((~! io.run) ((~ g!resolve) (#error.Success (~ g!return))))
+ ((~! promise.resolved) (#error.Success (~ g!state))))
+
+ (#error.Failure (~ g!error))
+ (exec ((~! io.run) ((~ g!resolve) (#error.Failure (~ g!error))))
+ ((~! promise.resolved) (#error.Failure (~ g!error)))))
+ ))
+ (~ g!self))]
+ (if (~ g!sent?)
+ ((~' wrap) (~ g!task))
+ ((~' wrap) ((~! promise.resolved)
+ ((~! ex.throw) ..dead [(~ (code.text (%name actor-name)))
+ (~ (code.text (%name message-name)))])))))))))
)))))
diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux
deleted file mode 100644
index 1f16da8ca..000000000
--- a/stdlib/source/lux/control/concurrency/task.lux
+++ /dev/null
@@ -1,83 +0,0 @@
-(.module:
- [lux #*
- [control
- [functor (#+ Functor)]
- [apply (#+ Apply)]
- [monad (#+ Monad do)]
- ["ex" exception (#+ Exception)]]
- [data
- ["." error (#+ Error)]]
- ["." macro
- ["s" syntax (#+ syntax: Syntax)]]]
- [//
- ["." promise (#+ Promise)]])
-
-(type: #export (Task a)
- (Promise (Error a)))
-
-(type: #export (Resolver a)
- (promise.Resolver (Error a)))
-
-(do-template [<name> <input> <tag>]
- [(def: #export <name>
- (All [a] (-> <input> (Task a)))
- (|>> <tag> promise.resolved))]
-
- [return a #error.Success]
- [fail Text #error.Failure]
- )
-
-(def: #export (throw exception message)
- (All [e a] (-> (Exception e) e (Task a)))
- (:: promise.monad wrap (ex.throw exception message)))
-
-(def: #export (try computation)
- (All [a] (-> (Task a) (Task (Error a))))
- (:: promise.functor map (|>> #error.Success) computation))
-
-(structure: #export functor (Functor Task)
- (def: (map f fa)
- (:: promise.functor map
- (function (_ fa')
- (case fa'
- (#error.Failure error)
- (#error.Failure error)
-
- (#error.Success a)
- (#error.Success (f a))))
- fa)))
-
-(structure: #export apply (Apply Task)
- (def: &functor ..functor)
-
- (def: (apply ff fa)
- (do promise.monad
- [ff' ff
- fa' fa]
- (wrap (do error.monad
- [f ff'
- a fa']
- (wrap (f a)))))))
-
-(structure: #export monad (Monad Task)
- (def: &functor ..functor)
-
- (def: wrap return)
-
- (def: (join mma)
- (do promise.monad
- [mma' mma]
- (case mma'
- (#error.Failure error)
- (wrap (#error.Failure error))
-
- (#error.Success ma)
- ma))))
-
-(def: #export task
- (All [a] (-> Any [(Task a) (Resolver a)]))
- promise.promise)
-
-(def: #export (from-promise promise)
- (All [a] (-> (Promise a) (Task a)))
- (:: promise.functor map (|>> #error.Success) promise))
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux
index fbdf47720..35b2e31f0 100644
--- a/stdlib/source/lux/world/net/tcp.jvm.lux
+++ b/stdlib/source/lux/world/net/tcp.jvm.lux
@@ -4,7 +4,6 @@
monad
[concurrency
["." promise (#+ Promise promise)]
- [task (#+ Task)]
["." frp]]]
[data
["." error (#+ Error)]]
diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux
index 09726c5ae..833b72e08 100644
--- a/stdlib/source/lux/world/net/udp.jvm.lux
+++ b/stdlib/source/lux/world/net/udp.jvm.lux
@@ -4,8 +4,7 @@
monad
["ex" exception (#+ exception:)]
[concurrency
- ["." promise (#+ Promise)]
- [task (#+ Task)]]]
+ ["." promise (#+ Promise)]]]
[data
["." error (#+ Error)]
["." maybe]
diff --git a/stdlib/source/test/lux/control/concurrency/actor.lux b/stdlib/source/test/lux/control/concurrency/actor.lux
index dba286b22..90c88744c 100644
--- a/stdlib/source/test/lux/control/concurrency/actor.lux
+++ b/stdlib/source/test/lux/control/concurrency/actor.lux
@@ -6,8 +6,7 @@
["M" monad (#+ do Monad)]
["ex" exception]
[concurrency
- ["." promise ("#/." monad)]
- ["T" task]]]
+ ["." promise ("#/." monad)]]]
[data
["." error]
[text
@@ -21,7 +20,7 @@
Nat
((handle message state self)
- (do T.monad
+ (do (error.with promise.monad)
[#let [_ (log! "BEFORE")]
output (message state self)
#let [_ (log! "AFTER")]]
@@ -35,47 +34,49 @@
(message: #export Counter
(count! {increment Nat} state self Nat)
(let [state' (n/+ increment state)]
- (T.return [state' state'])))
+ (promise/wrap (#error.Success [state' state']))))
(def: #export test
Test
- (<| (_.context (%name (name-of /.Actor)))
- ($_ _.and
- (_.test "Can check if an actor is alive."
- (io.run (do io.monad
- [counter (new@Counter 0)]
- (wrap (/.alive? counter)))))
+ (do r.monad
+ [_ (wrap [])]
+ (<| (_.context (%name (name-of /.Actor)))
+ ($_ _.and
+ (_.test "Can check if an actor is alive."
+ (io.run (do io.monad
+ [counter (new@Counter 0)]
+ (wrap (/.alive? counter)))))
- (_.test "Can poison actors."
- (io.run (do io.monad
- [counter (new@Counter 0)
- poisoned? (/.poison counter)]
- (wrap (and poisoned?
- (not (/.alive? counter)))))))
-
- (_.test "Cannot poison an already dead actor."
- (io.run (do io.monad
- [counter (new@Counter 0)
- first-time (/.poison counter)
- second-time (/.poison counter)]
- (wrap (and first-time
- (not second-time))))))
+ (_.test "Can poison actors."
+ (io.run (do io.monad
+ [counter (new@Counter 0)
+ poisoned? (/.poison counter)]
+ (wrap (and poisoned?
+ (not (/.alive? counter)))))))
+
+ (_.test "Cannot poison an already dead actor."
+ (io.run (do io.monad
+ [counter (new@Counter 0)
+ first-time (/.poison counter)
+ second-time (/.poison counter)]
+ (wrap (and first-time
+ (not second-time))))))
- (:: r.monad wrap
- (do promise.monad
- [result (do T.monad
- [#let [counter (io.run (new@Counter 0))]
- output-1 (count! 1 counter)
- output-2 (count! 1 counter)
- output-3 (count! 1 counter)]
- (wrap (and (n/= 1 output-1)
- (n/= 2 output-2)
- (n/= 3 output-3))))]
- (_.assert "Can send messages to actors."
- (case result
- (#error.Success outcome)
- outcome
+ (:: r.monad wrap
+ (do promise.monad
+ [result (do (error.with promise.monad)
+ [#let [counter (io.run (new@Counter 0))]
+ output-1 (count! 1 counter)
+ output-2 (count! 1 counter)
+ output-3 (count! 1 counter)]
+ (wrap (and (n/= 1 output-1)
+ (n/= 2 output-2)
+ (n/= 3 output-3))))]
+ (_.assert "Can send messages to actors."
+ (case result
+ (#error.Success outcome)
+ outcome
- (#error.Failure error)
- #0))))
- )))
+ (#error.Failure error)
+ #0))))
+ ))))