diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 89 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/task.lux | 83 | ||||
-rw-r--r-- | stdlib/source/lux/world/net/tcp.jvm.lux | 1 | ||||
-rw-r--r-- | stdlib/source/lux/world/net/udp.jvm.lux | 3 | ||||
-rw-r--r-- | stdlib/source/test/lux/control/concurrency/actor.lux | 83 |
5 files changed, 90 insertions, 169 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index 397a2fdb4..b47785142 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -7,7 +7,7 @@ ["." io (#+ IO io) ("#/." monad)] [data ["." product] - ["." error] + ["." error (#+ Error)] [text format] [collection @@ -24,8 +24,7 @@ abstract]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver) ("#/." monad)] - ["." task (#+ Task)]]) + ["." promise (#+ Promise Resolver) ("#/." monad)]]) (exception: #export poisoned) @@ -35,7 +34,7 @@ ["Message" message-name])) (with-expansions - [<Message> (as-is (-> s (Actor s) (Task s))) + [<Message> (as-is (-> s (Actor s) (Promise (Error s)))) <Obituary> (as-is [Text s (List <Message>)]) <Mailbox> (as-is (Rec Mailbox [(Promise [<Message> Mailbox]) @@ -79,7 +78,7 @@ (type: #export (Behavior s) {#.doc "An actor's behavior when messages are received."} - {#handle (-> (Message s) s (Actor s) (Task s)) + {#handle (-> (Message s) s (Actor s) (Promise (Error s))) #end (-> Text s (Promise Any))}) (def: #export (spawn behavior init) @@ -140,7 +139,7 @@ ) (def: (default-handle message state self) - (All [s] (-> (Message s) s (Actor s) (Task s))) + (All [s] (-> (Message s) s (Actor s) (Promise (Error s)))) (message state self)) (def: (default-end cause state) @@ -157,7 +156,7 @@ "but allows the actor to handle previous messages.")} (All [s] (-> (Actor s) (IO Bit))) (send (function (_ state self) - (task.throw poisoned [])) + (promise.resolved (ex.throw ..poisoned []))) actor)) (do-template [<with> <resolve> <tag> <desc>] @@ -234,7 +233,7 @@ (List a) ((handle message state self) - (do task.monad + (do (error.with promise.monad) [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] @@ -266,9 +265,9 @@ (~ (code.local-identifier messageN)) (~ (code.local-identifier stateN)) (~ (code.local-identifier selfN))) - (do task.monad - [] - (~ bodyC)))))) + ((~! do) ((~! error.with) (~! promise.monad)) + [] + (~ bodyC)))))) #..end (~ (case ?stop #.None (` (~! ..default-end)) @@ -277,9 +276,9 @@ (` (function ((~ g!_) (~ (code.local-identifier causeN)) (~ (code.local-identifier stateN))) - (do promise.monad - [] - (~ bodyC))))))})) + ((~! do) (~! promise.monad) + [] + (~ bodyC))))))})) (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) (All [(~+ g!vars)] (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) @@ -323,12 +322,12 @@ (message: #export Counter (count! [increment Nat] state self Nat) (let [state' (n/+ increment state)] - (task.return [state' state']))) + (promise.resolved (#error.Success [state' state'])))) (message: #export (Stack a) (push [value a] state self (List a)) (let [state' (#.Cons value state)] - (task.return [state' state']))))} + (promise.resolved (#error.Success [state' state'])))))} (with-gensyms [g!_ g!return g!error g!task g!sent? g!resolve] (do @ [current-module macro.current-module-name @@ -361,30 +360,36 @@ (~ (|> annotations (with-message actor-name) csw.annotations)) - (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) - (let [[(~ g!task) (~ g!resolve)] (: [(task.Task (~ g!outputT)) - (task.Resolver (~ g!outputT))] - (task.task []))] - (io.run (do io.monad - [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) - (do promise.monad - [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) - (~ g!outputT)]) - (do task.monad - [] - (~ body)))] - (case (~ g!return) - (#.Right [(~ g!state) (~ g!return)]) - (exec (io.run ((~ g!resolve) (#.Right (~ g!return)))) - (task.return (~ g!state))) - - (#.Left (~ g!error)) - (exec (io.run ((~ g!resolve) (#.Left (~ g!error)))) - (task.fail (~ g!error)))) - )) - (~ g!self))] - (if (~ g!sent?) - ((~' wrap) (~ g!task)) - ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name))) - (~ (code.text (%name message-name)))])))))))) + (All [(~+ g!all-vars)] + (-> (~+ g!inputsT) + (~ actorC) + ((~! promise.Promise) ((~! error.Error) (~ (get@ #output signature)))))) + (let [[(~ g!task) (~ g!resolve)] (: [((~! promise.Promise) ((~! error.Error) (~ g!outputT))) + (promise.Resolver ((~! error.Error) (~ g!outputT)))] + (promise.promise []))] + ((~! io.run) ((~! do) (~! io.monad) + [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) + ((~! do) (~! promise.monad) + [(~ g!return) (: ((~! promise.Promise) + ((~! error.Error) + [((~ g!type) (~+ g!actor-refs)) + (~ g!outputT)])) + ((~! do) ((~! error.with) (~! promise.monad)) + [] + (~ body)))] + (case (~ g!return) + (#error.Success [(~ g!state) (~ g!return)]) + (exec ((~! io.run) ((~ g!resolve) (#error.Success (~ g!return)))) + ((~! promise.resolved) (#error.Success (~ g!state)))) + + (#error.Failure (~ g!error)) + (exec ((~! io.run) ((~ g!resolve) (#error.Failure (~ g!error)))) + ((~! promise.resolved) (#error.Failure (~ g!error))))) + )) + (~ g!self))] + (if (~ g!sent?) + ((~' wrap) (~ g!task)) + ((~' wrap) ((~! promise.resolved) + ((~! ex.throw) ..dead [(~ (code.text (%name actor-name))) + (~ (code.text (%name message-name)))]))))))))) ))))) diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux deleted file mode 100644 index 1f16da8ca..000000000 --- a/stdlib/source/lux/control/concurrency/task.lux +++ /dev/null @@ -1,83 +0,0 @@ -(.module: - [lux #* - [control - [functor (#+ Functor)] - [apply (#+ Apply)] - [monad (#+ Monad do)] - ["ex" exception (#+ Exception)]] - [data - ["." error (#+ Error)]] - ["." macro - ["s" syntax (#+ syntax: Syntax)]]] - [// - ["." promise (#+ Promise)]]) - -(type: #export (Task a) - (Promise (Error a))) - -(type: #export (Resolver a) - (promise.Resolver (Error a))) - -(do-template [<name> <input> <tag>] - [(def: #export <name> - (All [a] (-> <input> (Task a))) - (|>> <tag> promise.resolved))] - - [return a #error.Success] - [fail Text #error.Failure] - ) - -(def: #export (throw exception message) - (All [e a] (-> (Exception e) e (Task a))) - (:: promise.monad wrap (ex.throw exception message))) - -(def: #export (try computation) - (All [a] (-> (Task a) (Task (Error a)))) - (:: promise.functor map (|>> #error.Success) computation)) - -(structure: #export functor (Functor Task) - (def: (map f fa) - (:: promise.functor map - (function (_ fa') - (case fa' - (#error.Failure error) - (#error.Failure error) - - (#error.Success a) - (#error.Success (f a)))) - fa))) - -(structure: #export apply (Apply Task) - (def: &functor ..functor) - - (def: (apply ff fa) - (do promise.monad - [ff' ff - fa' fa] - (wrap (do error.monad - [f ff' - a fa'] - (wrap (f a))))))) - -(structure: #export monad (Monad Task) - (def: &functor ..functor) - - (def: wrap return) - - (def: (join mma) - (do promise.monad - [mma' mma] - (case mma' - (#error.Failure error) - (wrap (#error.Failure error)) - - (#error.Success ma) - ma)))) - -(def: #export task - (All [a] (-> Any [(Task a) (Resolver a)])) - promise.promise) - -(def: #export (from-promise promise) - (All [a] (-> (Promise a) (Task a))) - (:: promise.functor map (|>> #error.Success) promise)) diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux index fbdf47720..35b2e31f0 100644 --- a/stdlib/source/lux/world/net/tcp.jvm.lux +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -4,7 +4,6 @@ monad [concurrency ["." promise (#+ Promise promise)] - [task (#+ Task)] ["." frp]]] [data ["." error (#+ Error)]] diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux index 09726c5ae..833b72e08 100644 --- a/stdlib/source/lux/world/net/udp.jvm.lux +++ b/stdlib/source/lux/world/net/udp.jvm.lux @@ -4,8 +4,7 @@ monad ["ex" exception (#+ exception:)] [concurrency - ["." promise (#+ Promise)] - [task (#+ Task)]]] + ["." promise (#+ Promise)]]] [data ["." error (#+ Error)] ["." maybe] diff --git a/stdlib/source/test/lux/control/concurrency/actor.lux b/stdlib/source/test/lux/control/concurrency/actor.lux index dba286b22..90c88744c 100644 --- a/stdlib/source/test/lux/control/concurrency/actor.lux +++ b/stdlib/source/test/lux/control/concurrency/actor.lux @@ -6,8 +6,7 @@ ["M" monad (#+ do Monad)] ["ex" exception] [concurrency - ["." promise ("#/." monad)] - ["T" task]]] + ["." promise ("#/." monad)]]] [data ["." error] [text @@ -21,7 +20,7 @@ Nat ((handle message state self) - (do T.monad + (do (error.with promise.monad) [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] @@ -35,47 +34,49 @@ (message: #export Counter (count! {increment Nat} state self Nat) (let [state' (n/+ increment state)] - (T.return [state' state']))) + (promise/wrap (#error.Success [state' state'])))) (def: #export test Test - (<| (_.context (%name (name-of /.Actor))) - ($_ _.and - (_.test "Can check if an actor is alive." - (io.run (do io.monad - [counter (new@Counter 0)] - (wrap (/.alive? counter))))) + (do r.monad + [_ (wrap [])] + (<| (_.context (%name (name-of /.Actor))) + ($_ _.and + (_.test "Can check if an actor is alive." + (io.run (do io.monad + [counter (new@Counter 0)] + (wrap (/.alive? counter))))) - (_.test "Can poison actors." - (io.run (do io.monad - [counter (new@Counter 0) - poisoned? (/.poison counter)] - (wrap (and poisoned? - (not (/.alive? counter))))))) - - (_.test "Cannot poison an already dead actor." - (io.run (do io.monad - [counter (new@Counter 0) - first-time (/.poison counter) - second-time (/.poison counter)] - (wrap (and first-time - (not second-time)))))) + (_.test "Can poison actors." + (io.run (do io.monad + [counter (new@Counter 0) + poisoned? (/.poison counter)] + (wrap (and poisoned? + (not (/.alive? counter))))))) + + (_.test "Cannot poison an already dead actor." + (io.run (do io.monad + [counter (new@Counter 0) + first-time (/.poison counter) + second-time (/.poison counter)] + (wrap (and first-time + (not second-time)))))) - (:: r.monad wrap - (do promise.monad - [result (do T.monad - [#let [counter (io.run (new@Counter 0))] - output-1 (count! 1 counter) - output-2 (count! 1 counter) - output-3 (count! 1 counter)] - (wrap (and (n/= 1 output-1) - (n/= 2 output-2) - (n/= 3 output-3))))] - (_.assert "Can send messages to actors." - (case result - (#error.Success outcome) - outcome + (:: r.monad wrap + (do promise.monad + [result (do (error.with promise.monad) + [#let [counter (io.run (new@Counter 0))] + output-1 (count! 1 counter) + output-2 (count! 1 counter) + output-3 (count! 1 counter)] + (wrap (and (n/= 1 output-1) + (n/= 2 output-2) + (n/= 3 output-3))))] + (_.assert "Can send messages to actors." + (case result + (#error.Success outcome) + outcome - (#error.Failure error) - #0)))) - ))) + (#error.Failure error) + #0)))) + )))) |