aboutsummaryrefslogtreecommitdiff
path: root/stdlib
diff options
context:
space:
mode:
authorEduardo Julian2017-06-24 23:58:46 -0400
committerEduardo Julian2017-06-24 23:58:46 -0400
commitfed9d0eb94a8808fe119f39fddf882754cd58788 (patch)
treee4ae053453913c63b80f2fe2b120b67405df42bb /stdlib
parent6b04fc4a718b7eb40e72fdb02a8fa4f7cf4ea64a (diff)
- Re-designed actors so that their messages are now functions with access to the actor's state, and to the actor itself.
- When creating channels and promises, the type is now mandatory. - Minor refactorings.
Diffstat (limited to 'stdlib')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux353
-rw-r--r--stdlib/source/lux/concurrency/frp.lux34
-rw-r--r--stdlib/source/lux/concurrency/promise.lux16
-rw-r--r--stdlib/source/lux/concurrency/task.lux26
-rw-r--r--stdlib/source/lux/control/exception.lux15
-rw-r--r--stdlib/test/test/lux/concurrency/actor.lux133
-rw-r--r--stdlib/test/test/lux/concurrency/frp.lux46
-rw-r--r--stdlib/test/test/lux/concurrency/promise.lux10
8 files changed, 311 insertions, 322 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index 5f75dc912..0b9c1032f 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -1,14 +1,12 @@
(;module: {#;doc "The actor model of concurrency."}
lux
(lux (control monad
- ["p" parser])
- [io #- run]
- function
- (data ["R" result]
- text/format
- (coll [list "List/" Monoid<List> Monad<List>])
- [product]
- [number "Nat/" Codec<Text,Nat>])
+ ["p" parser]
+ ["ex" exception #+ exception:])
+ [io #- run "io/" Monad<IO>]
+ (data text/format
+ (coll [list "L/" Monoid<List> Monad<List>])
+ [product])
[macro #+ with-gensyms]
(macro [code]
["s" syntax #+ syntax: Syntax]
@@ -16,82 +14,86 @@
(common ["csr" reader]
["csw" writer])))
[type])
- (.. ["P" promise #+ Monad<Promise>]
+ (.. ["A" atom]
+ ["P" promise "P/" Monad<Promise>]
+ ["T" task]
[stm #+ Monad<STM>]
[frp]))
+(exception: #export Poisoned)
+(exception: #export Killed)
+
## [Types]
-(type: #export (Actor s m)
- {#;doc "An actor, defined as all the necessities it requires."}
- {#mailbox (stm;Var m)
- #kill-signal (P;Promise Unit)
- #obituary (P;Promise [(Maybe Text) s (List m)])})
+(with-expansions
+ [<Message> (as-is (-> s (Actor s) (T;Task s)))
+ <Obituary> (as-is [Text s (List <Message>)])]
+ (type: #export (Actor s)
+ {#;doc "An actor, defined as all the necessities it requires."}
+ {#mailbox (stm;Var <Message>)
+ #kill-switch (P;Promise Unit)
+ #obituary (P;Promise <Obituary>)})
+
+ (type: #export (Message s)
+ <Message>)
+
+ (type: #export (Obituary s)
+ <Obituary>))
-(type: #export (Behavior s m)
+(type: #export (Behavior s)
{#;doc "An actor's behavior when messages are received."}
- {#step (-> (Actor s m) (-> m s (P;Promise (R;Result s))))
- #end (-> (Maybe Text) s (P;Promise Unit))})
+ {#step (-> (Message s) s (Actor s) (T;Task s))
+ #end (-> Text s (P;Promise Unit))})
## [Values]
(def: #export (spawn init behavior)
{#;doc "Given a behavior and initial state, spawns an actor and returns it."}
- (All [s m] (-> s (Behavior s m) (IO (Actor s m))))
- (io (let [[step on-death] behavior
- mailbox (stm;var (:! ($ +1) []))
- kill-signal (P;promise Unit)
- obituary (P;promise [(Maybe Text) ($ +0) (List ($ +1))])
- self {#mailbox mailbox
- #kill-signal kill-signal
- #obituary obituary}
- mailbox-chan (io;run (stm;follow mailbox))
- step (step self)
+ (All [s] (-> s (Behavior s) (IO (Actor s))))
+ (io (let [[step end] behavior
+ self (: (Actor ($ +0))
+ {#mailbox (stm;var (:! (Message ($ +0)) []))
+ #kill-switch (P;promise Unit)
+ #obituary (P;promise (Obituary ($ +0)))})
+ mailbox-chan (io;run (stm;follow (get@ #mailbox self)))
|mailbox| (stm;var mailbox-chan)
- _ (:: Monad<Promise> map
- (function [_]
- (io;run (do Monad<IO>
- [mb (stm;read! |mailbox|)]
- (frp;close mb))))
- kill-signal)
+ _ (P/map (function [_]
+ (io;run (do Monad<IO>
+ [mb (stm;read! |mailbox|)]
+ (frp;close mb))))
+ (get@ #kill-switch self))
process (loop [state init
messages mailbox-chan]
- (do Monad<Promise>
+ (do P;Monad<Promise>
[?messages+ messages]
(case ?messages+
- ## No kill-signal so far, so I may proceed...
+ ## No kill-switch so far, so I may proceed...
(#;Some [message messages'])
- (do Monad<Promise>
+ (do P;Monad<Promise>
[#let [_ (io;run (stm;write! messages' |mailbox|))]
- ?state' (step message state)]
+ ?state' (step message state self)]
(case ?state'
(#;Left error)
(do @
- [#let [_ (io;run (P;resolve [] kill-signal))
- _ (io;run (frp;close messages'))
- death-message (#;Some error)]
- _ (on-death death-message state)
+ [#let [_ (io;run (P;resolve [] (get@ #kill-switch self)))
+ _ (io;run (frp;close messages'))]
+ _ (end error state)
remaining-messages (frp;consume messages')]
- (wrap [death-message state (#;Cons message remaining-messages)]))
+ (wrap [error state (#;Cons message remaining-messages)]))
(#;Right state')
(recur state' messages')))
## Otherwise, clean-up and return current state.
#;None
- (do Monad<Promise>
+ (do P;Monad<Promise>
[#let [_ (io;run (frp;close messages))
- death-message #;None]
- _ (on-death death-message state)]
+ death-message (Killed "")]
+ _ (end death-message state)]
(wrap [death-message state (list)])))))]
self)))
-(def: #export poison
- {#;doc "Immediately kills the given actor (if it's not already dead)."}
- (All [s m] (-> (Actor s m) (io;IO Bool)))
- (|>. (get@ #kill-signal) (P;resolve [])))
-
(def: #export (alive? actor)
- (All [s m] (-> (Actor s m) Bool))
- (case [(P;poll (get@ #kill-signal actor))
+ (All [s] (-> (Actor s) Bool))
+ (case [(P;poll (get@ #kill-switch actor))
(P;poll (get@ #obituary actor))]
[#;None #;None]
true
@@ -101,76 +103,62 @@
(def: #export (send message actor)
{#;doc "Communicate with an actor through message passing."}
- (All [s m] (-> m (Actor s m) (P;Promise Bool)))
+ (All [s] (-> (Message s) (Actor s) (IO Bool)))
(if (alive? actor)
(exec (io;run (stm;write! message (get@ #mailbox actor)))
- (:: Monad<Promise> wrap true))
- (:: Monad<Promise> wrap false)))
+ (io/wrap true))
+ (io/wrap false)))
-(def: #export (keep-alive init behavior)
- {#;doc "Given initial-state and a behavior, spawns an actor that will reboot if it dies of errors.
+(def: #export (kill actor)
+ {#;doc "Immediately kills the given actor (if it is not already dead)."}
+ (All [s] (-> (Actor s) (io;IO Bool)))
+ (if (alive? actor)
+ (|> actor (get@ #kill-switch) (P;resolve []))
+ (io/wrap false)))
- However, if it is killed, it will not re-spawn."}
- (All [s m] (-> s (Behavior s m) (IO (Actor s m))))
- (io (let [ka-actor (: (Actor (Actor ($ +0) ($ +1)) ($ +1))
- (io;run (spawn (io;run (spawn init behavior))
- {#step (function [*self* message server]
- (do Monad<Promise>
- [was-sent? (send message server)]
- (if was-sent?
- (wrap (#;Right server))
- (do @
- [[?cause state unprocessed-messages] (get@ #obituary server)]
- (exec (log! (format "ACTOR DIED:" "\n"
- (default "" ?cause) "\n"
- "RESTARTING" "\n"))
- (do @
- [#let [new-server (io;run (spawn state behavior))
- mailbox (get@ #mailbox new-server)]
- _ (P;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))]
- (wrap (#;Right new-server))))
- ))))
- #end (function [_ server] (exec (io;run (poison server))
- (:: Monad<Promise> wrap [])))})))]
- (update@ #obituary (: (-> (P;Promise [(Maybe Text) (Actor ($ +0) ($ +1)) (List ($ +1))])
- (P;Promise [(Maybe Text) ($ +0) (List ($ +1))]))
- (function [process]
- (do Monad<Promise>
- [[_ server unprocessed-messages-0] process
- [cause state unprocessed-messages-1] (get@ #obituary server)]
- (wrap [cause state (List/append unprocessed-messages-0 unprocessed-messages-1)]))))
- ka-actor))))
+(def: #export (poison actor)
+ {#;doc "Kills the actor by sending a message that will kill it upon processing,
+ but allows the actor to handle previous messages."}
+ (All [s] (-> (Actor s) (IO Bool)))
+ (send (function [state self]
+ (T;throw Poisoned ""))
+ actor))
## [Syntax]
(type: Method
{#name Text
#vars (List Text)
#args (List [Text Code])
+ #state Text
+ #self Text
#return Code
#body Code})
(def: method^
(Syntax Method)
(s;form (do p;Monad<Parser>
- [_ (s;this (' method:))
- vars (p;default (list) (s;tuple (p;some s;local-symbol)))
- [name args] (s;form ($_ p;seq
- s;local-symbol
- (p;many csr;typed-input)
- ))
+ [vars (p;default (list) (s;tuple (p;some s;local-symbol)))
+ [name args state self] (s;form ($_ p;seq
+ s;local-symbol
+ (p;some csr;typed-input)
+ s;local-symbol
+ s;local-symbol
+ ))
return s;any
body s;any]
(wrap {#name name
#vars vars
#args args
+ #state state
+ #self self
#return return
#body body}))))
(def: stop^
- (Syntax Code)
- (s;form (do p;Monad<Parser>
- [_ (s;this (' stop:))]
- s;any)))
+ (Syntax [[Text Text] Code])
+ (s;form (p;seq (s;tuple (p;seq s;local-symbol
+ s;local-symbol))
+ s;any)))
(def: actor-decl^
(Syntax [(List Text) Text (List [Text Code])])
@@ -178,105 +166,96 @@
(p;either (s;form (p;seq s;local-symbol (p;many csr;typed-input)))
(p;seq s;local-symbol (:: p;Monad<Parser> wrap (list))))))
-(def: (actor-def-decl [_vars _name _args] return-type)
- (-> [(List Text) Text (List [Text Code])] Code (List Code))
- (let [decl (` ((~ (code;symbol ["" (format _name "//new")])) (~@ (List/map (|>. product;left [""] code;symbol) _args))))
- base-type (` (-> (~@ (List/map product;right _args))
- (~ return-type)))
- type (case _vars
- #;Nil
- base-type
-
- _
- (` (All [(~@ (List/map (|>. [""] code;symbol) _vars))]
- (~ base-type))))]
- (list decl
- type)))
-
-(syntax: #export (actor: [_ex-lev csr;export]
- [(^@ decl [_vars _name _args]) actor-decl^]
+(syntax: #export (actor: [export csr;export]
+ [[_vars _name _args] actor-decl^]
state-type
- [methods (p;many method^)]
+ [methods (p;some method^)]
[?stop (p;opt stop^)])
- {#;doc (doc "Allows defining an actor, with a pice of state and a set of methods that can be called on it."
- "A method can access the actor's state through the *state* variable."
- "A method can also access the actor itself through the *self* variable."
+ {#;doc (doc "Defines an actor, with internal state and methods that can be called on it."
+ "A method can access the actor's state through the state parameter."
+ "A method can also access the actor itself through the self parameter."
"A method may succeed or fail (in case of failure, the actor dies). This is handled through the Either type."
"A method's output must be a promise containing a 2-tuple with the updated state and a return value."
"All methods are run implicitly within the Promise monad."
- (actor: #export Adder
+ (actor: #export Counter
Int
- (method: (count! [to-add Int])
- [Int Int]
- (if (i.>= 0 to-add)
- (let [new-state (i.+ to-add *state*)]
- (wrap (#;Right [new-state [*state* new-state]])))
- (wrap (#;Left "Cannot add negative numbers!"))))
+ ((count! [increment Int] state self)
+ [Int Int]
+ (if (i.>= 0 increment)
+ (let [state' (i.+ increment state)]
+ (T;return [state' [state state']]))
+ (T;fail "Cannot add negative numbers!")))
+
+ ([cause state]
+ (:: P;Monad<Promise> wrap
+ (log! (if (ex;match? ;;Killed cause)
+ (format "Counter was killed: " (%i state))
+ cause))))
))}
- (with-gensyms [g!message g!error g!return g!error g!output]
- (let [g!state-name (code;symbol ["" (format _name "//STATE")])
- g!protocol-name (code;symbol ["" (format _name "//PROTOCOL")])
- g!self (code;symbol ["" "*self*"])
- g!state (code;symbol ["" "*state*"])
- g!cause (code;symbol ["" "*cause*"])
- g!stop-body (default (` (:: P;Monad<Promise> (~' wrap) [])) ?stop)
- protocol (List/map (function [(^slots [#name #vars #args #return #body])]
- (` ((~ (code;tag ["" name])) [(~@ (List/map product;right args))] (P;Promise (~ return)))))
- methods)
- protocol-pm (List/map (: (-> Method [Code Code])
- (function [(^slots [#name #vars #args #return #body])]
- (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] code;symbol)))
- body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (P;Promise (R;Result [(~ g!state-name) (~ return)])))
- (function (~ (code;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] code;symbol) args))]
- (do P;Monad<Promise>
- []
- (~ body)))))]
- [(` [[(~@ arg-names)] (~ g!return)])
- (` (do P;Monad<Promise>
- [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))]
- (case (~ g!output)
- (#;Right [(~ g!state) (~ g!output)])
- (exec (io;run (P;resolve (~ g!output) (~ g!return)))
- ((~' wrap) (#;Right (~ g!state))))
+ (with-gensyms [g!message g!self g!state g!init g!error g!return g!output]
+ (let [g!state-type (code;symbol ["" (format _name "//Actor:State")])
+ g!behavior (code;symbol ["" (format _name "//Actor:Behavior")])
+ g!actor (code;symbol ["" _name])
+ g!methods (L/map (: (-> Method Code)
+ (function [(^slots [#name #vars #args #state #self #return #body])]
+ (let [g!method (code;symbol ["" name])
+ g!vars (L/map (|>. [""] code;symbol) vars)
+ g!var-refs (: (List Code)
+ (if (list;empty? vars)
+ (list)
+ (|> vars list;size n.dec
+ (list;n.range +0) (L/map (|>. code;nat (~) ($) (`))))))
+ g!args-names (L/map (|>. product;left [""] code;symbol) args)
+ g!arg-types (L/map product;right args)
+ g!state (code;symbol ["" state])
+ g!self (code;symbol ["" self])]
+ (` (def: (~@ (csw;export export)) ((~ g!method) (~@ g!args-names) (~ g!self))
+ (All [(~@ g!vars)]
+ (-> (~@ g!arg-types) (~ g!actor) (T;Task (~ return))))
+ (let [(~ g!output) (T;task (~ return))]
+ (exec (;;send (function [(~ g!state) (~ g!self)]
+ (do P;Monad<Promise>
+ [(~ g!return) (: (T;Task [((~ g!state-type) (~@ g!var-refs))
+ (~ return)])
+ (~ body))]
+ (case (~ g!return)
+ (#;Right [(~ g!state) (~ g!return)])
+ (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!output)))
+ (T;return (~ g!state)))
+
+ (#;Left (~ g!error))
+ (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!output)))
+ (T;fail (~ g!error))))))
+ (~ g!self))
+ (~ g!output))))))))
+ methods)
+ g!new (code;symbol ["" (format "new-" _name)])
+ g!vars (L/map (|>. [""] code;symbol) _vars)]
+ (wrap (list& (` (type: (~@ (csw;export export)) ((~ g!state-type) (~@ g!vars))
+ (~ state-type)))
+ (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars))
+ (;;Actor ((~ g!state-type) (~@ g!vars)))))
+ (` (def: (~@ (csw;export export)) (~ g!behavior)
+ (All [(~@ g!vars)]
+ (Behavior ((~ g!state-type) (~@ g!vars))))
+ {#step (function [(~' message) (~' state) (~' self)]
+ ((~' message) (~' state) (~' self)))
+ #end (~ (case ?stop
+ (#;Some [[cause state] body])
+ (let [g!cause (code;symbol ["" cause])
+ g!state (code;symbol ["" state])]
+ (` (function [(~ g!cause) (~ g!state)]
+ (do P;Monad<Promise>
+ []
+ (~ body)))))
- (#;Left (~ g!error))
- ((~' wrap) (#;Left (~ g!error))))
- ))])))
- methods)
- g!behavior (` {#step (function [(~ g!self) (~ g!message) (~ g!state)]
- (case (~ g!message)
- (~@ (if (n.= +1 (list;size protocol-pm))
- (List/join (List/map (function [[pattern clause]]
- (list pattern clause))
- protocol-pm))
- (List/join (List/map (function [[method [pattern clause]]]
- (list (` ((~ (code;tag ["" (get@ #name method)])) (~ pattern)))
- clause))
- (list;zip2 methods protocol-pm)))))
- ))
- #end (function [(~ g!cause) (~ g!state)]
- (do P;Monad<Promise>
- []
- (~ g!stop-body)))})
- g!actor-name (code;symbol ["" _name])
- g!methods (List/map (: (-> Method Code)
- (function [(^slots [#name #vars #args #return #body])]
- (let [arg-names (|> (list;size args) (list;n.range +1) (List/map (|>. Nat/encode [""] code;symbol)))
- type (` (-> (~@ (List/map product;right args))
- (~ g!actor-name)
- (P;Promise (~ return))))]
- (` (def: (~@ (csw;export _ex-lev)) ((~ (code;symbol ["" name])) (~@ arg-names) (~ g!self))
- (~ type)
- (let [(~ g!output) (P;promise (~ return))]
- (exec (send ((~ (code;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self))
- (~ g!output))))))))
- methods)]
- (wrap (list& (` (type: (~@ (csw;export _ex-lev)) (~ g!state-name) (~ state-type)))
- (` (type: (~@ (csw;export _ex-lev)) (~ g!protocol-name) (~@ protocol)))
- (` (type: (~@ (csw;export _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name))))
- (` (def: (~@ (csw;export _ex-lev)) (~@ (actor-def-decl decl (` (Behavior (~ g!state-name) (~ g!protocol-name)))))
- (~ g!behavior)))
+ #;None
+ (` (:: P;Monad<Promise> (~' wrap) []))))}))
+ (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init))
+ (All [(~@ g!vars)]
+ (-> ((~ g!state-type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars)))))
+ (;;spawn (~ g!init) (~ g!behavior))))
g!methods))
)))
diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux
index f71cf1797..a646f2b6e 100644
--- a/stdlib/source/lux/concurrency/frp.lux
+++ b/stdlib/source/lux/concurrency/frp.lux
@@ -20,19 +20,11 @@
(&;Promise (Maybe [a (Chan a)])))
## [Syntax]
-(syntax: #export (chan [?type (p;opt s;any)])
+(syntax: #export (chan [type s;any])
{#;doc (doc "Makes an uninitialized Chan (in this case, of Unit)."
- (chan Unit)
-
- "The type is optional."
- (chan))}
- (case ?type
- (#;Some type)
- (wrap (list (` (: (Chan (~ type))
- (&;promise)))))
-
- #;None
- (wrap (list (` (&;promise))))))
+ (chan Unit))}
+ (wrap (list (` (: (Chan (~ type))
+ (&;promise' #;None))))))
## [Values]
(def: #export (filter p xs)
@@ -45,28 +37,28 @@
(wrap (#;Some [x (filter p xs')]))
(filter p xs')))))
-(def: #export (write value chan)
+(def: #export (write value target)
{#;doc "Write to a channel, so long as it's still open."}
(All [a] (-> a (Chan a) (IO (Maybe (Chan a)))))
- (case (&;poll chan)
+ (case (&;poll target)
(^template [<case> <chan-to-write>]
<case>
(do Monad<IO>
- [#let [new-tail (&;promise)]
+ [#let [new-tail (chan ($ +0))]
done? (&;resolve (#;Some [value new-tail]) <chan-to-write>)]
(if done?
(wrap (#;Some new-tail))
(write value <chan-to-write>))))
- ([#;None chan]
- [(#;Some (#;Some [_ chan'])) chan'])
+ ([#;None target]
+ [(#;Some (#;Some [_ target'])) target'])
_
(:: Monad<IO> wrap #;None)
))
-(def: #export (close chan)
+(def: #export (close target)
(All [a] (-> (Chan a) (IO Bool)))
- (case (&;poll chan)
+ (case (&;poll target)
(^template [<case> <chan-to-write>]
<case>
(do Monad<IO>
@@ -74,8 +66,8 @@
(if done?
(wrap true)
(close <chan-to-write>))))
- ([#;None chan]
- [(#;Some (#;Some [_ chan'])) chan'])
+ ([#;None target]
+ [(#;Some (#;Some [_ target'])) target'])
_
(:: Monad<IO> wrap false)
diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux
index f2a7ffc05..4492f955e 100644
--- a/stdlib/source/lux/concurrency/promise.lux
+++ b/stdlib/source/lux/concurrency/promise.lux
@@ -31,19 +31,11 @@
(atom {#value ?value
#observers (list)}))
-(syntax: #export (promise [?type (p;opt s;any)])
+(syntax: #export (promise [type s;any])
{#;doc (doc "Makes an uninitialized Promise (in this example, of Unit)."
- (promise Unit)
-
- "The type is optional."
- (promise))}
- (case ?type
- (#;Some type)
- (wrap (list (` (: (Promise (~ type))
- (promise' #;None)))))
-
- #;None
- (wrap (list (` (promise' #;None))))))
+ (promise Unit))}
+ (wrap (list (` (: (Promise (~ type))
+ (promise' #;None))))))
(def: #export (poll promise)
{#;doc "Polls a Promise's value."}
diff --git a/stdlib/source/lux/concurrency/task.lux b/stdlib/source/lux/concurrency/task.lux
index 239572bff..f46d1f0da 100644
--- a/stdlib/source/lux/concurrency/task.lux
+++ b/stdlib/source/lux/concurrency/task.lux
@@ -3,13 +3,28 @@
(lux (data ["R" result])
(control functor
applicative
- monad)
+ monad
+ ["ex" exception #+ Exception])
(concurrency ["P" promise])
+ [macro]
+ (macro ["s" syntax #+ syntax: Syntax])
))
(type: #export (Task a)
(P;Promise (R;Result a)))
+(def: #export (fail error)
+ (All [a] (-> Text (Task a)))
+ (:: P;Applicative<Promise> wrap (#R;Error error)))
+
+(def: #export (throw exception message)
+ (All [a] (-> Exception Text (Task a)))
+ (fail (exception message)))
+
+(def: #export (return value)
+ (All [a] (-> a (Task a)))
+ (:: P;Applicative<Promise> wrap (#R;Success value)))
+
(struct: #export _ (Functor Task)
(def: (map f fa)
(:: P;Functor<Promise> map
@@ -25,8 +40,7 @@
(struct: #export _ (Applicative Task)
(def: functor Functor<Task>)
- (def: (wrap a)
- (:: P;Applicative<Promise> wrap (#R;Success a)))
+ (def: wrap return)
(def: (apply ff fa)
(do P;Monad<Promise>
@@ -49,3 +63,9 @@
(#R;Success ma)
ma))))
+
+(syntax: #export (task [type s;any])
+ {#;doc (doc "Makes an uninitialized Task (in this example, of Unit)."
+ (task Unit))}
+ (wrap (list (` (: (;;Task (~ type))
+ (P;promise' #;None))))))
diff --git a/stdlib/source/lux/control/exception.lux b/stdlib/source/lux/control/exception.lux
index ef0419b98..401a3057c 100644
--- a/stdlib/source/lux/control/exception.lux
+++ b/stdlib/source/lux/control/exception.lux
@@ -2,7 +2,7 @@
lux
(lux (control monad)
(data ["R" result #- fail]
- [text])
+ [text "T/" Monoid<Text>])
[macro]
(macro [code]
["s" syntax #+ syntax: Syntax]
@@ -16,9 +16,13 @@
(-> Text Text))
## [Values]
-(def: #hidden _Text/append_
+(def: #hidden _T/append_
(-> Text Text Text)
- (:: text;Monoid<Text> append))
+ T/append)
+
+(def: #export (match? exception error)
+ (-> Exception Text Bool)
+ (text;starts-with? (exception "") error))
(def: #export (catch exception then try)
{#;doc "If a particular exception is detected on a possibly-erroneous value, handle it.
@@ -67,7 +71,8 @@
(exception: #export Some-Exception))}
(do @
[current-module macro;current-module-name
- #let [g!message (code;symbol ["" "message"])]]
+ #let [descriptor ($_ T/append "{" current-module ";" name "}" "\n")
+ g!message (code;symbol ["" "message"])]]
(wrap (list (` (def: (~@ (csw;export _ex-lev)) ((~ (code;symbol ["" name])) (~ g!message))
Exception
- ($_ _Text/append_ "[" (~ (code;text current-module)) ";" (~ (code;text name)) "]\t" (~ g!message))))))))
+ (_T/append_ (~ (code;text descriptor)) (~ g!message))))))))
diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux
index a8f6ed7fb..8ec792baf 100644
--- a/stdlib/test/test/lux/concurrency/actor.lux
+++ b/stdlib/test/test/lux/concurrency/actor.lux
@@ -1,79 +1,80 @@
(;module:
lux
(lux [io #- run]
- (control monad)
+ (control monad
+ ["ex" exception])
(data [number]
text/format
["R" result])
- (concurrency [promise #+ Promise Monad<Promise> "Promise/" Monad<Promise>]
+ (concurrency ["P" promise "P/" Monad<Promise>]
+ ["T" task]
["&" actor #+ actor:]))
lux/test)
-(actor: Adder
- Int
+(actor: Counter
+ Nat
- (method: (add! [offset Int])
- [Int Int]
- (let [*state*' (i.+ offset *state*)]
- (wrap (#;Right [*state*' [*state* *state*']]))))
-
- (stop:
- (wrap [])))
+ ((count! state self)
+ Nat
+ (let [state' (n.inc state)]
+ (T;return [state' state'])))
+
+ ([cause state]
+ (P/wrap (log! (if (ex;match? &;Killed cause)
+ (format "Counter was killed: " (%n state))
+ cause)))))
(context: "Actors"
- (let [counter-proc (: (&;Behavior Int (Promise Int))
- [(function [self output state]
- (let [state' (i.inc state)]
- (exec (io;run (promise;resolve state' output))
- (Promise/wrap (#;Right state')))))
- (function [?error state] (Promise/wrap []))])]
- ($_ seq
- (test "Can check where an actor is alive."
- (let [counter (: (&;Actor Int (Promise Int))
- (io;run (&;spawn 0 counter-proc)))]
- (&;alive? counter)))
-
- (test "Can poison/kill actors."
- (let [counter (: (&;Actor Int (Promise Int))
- (io;run (&;spawn 0 counter-proc)))]
- (and (io;run (&;poison counter))
- (not (&;alive? counter)))))
-
- (test "Can't poison an already poisoned actor."
- (let [counter (: (&;Actor Int (Promise Int))
- (io;run (&;spawn 0 counter-proc)))]
- (and (io;run (&;poison counter))
- (not (io;run (&;poison counter))))))
-
- (do Monad<Promise>
- [#let [counter (: (&;Actor Int (Promise Int))
- (io;run (&;spawn 0 counter-proc)))
- output-1 (: (Promise Int) (promise;promise))
- output-2 (: (Promise Int) (promise;promise))
- output-3 (: (Promise Int) (promise;promise))]
- _ (&;send output-1 counter)
- _ (&;send output-2 counter)
- _ (&;send output-3 counter)
- =1 output-1
- =2 output-2
- =3 output-3]
- (test "Can send messages to actors."
- (and (i.= 1 =1)
- (i.= 2 =2)
- (i.= 3 =3))))
-
- (do Monad<Promise>
- [#let [adder (: Adder
- (io;run (&;spawn 0 Adder//new)))]
- t1 (add! 1 adder)
- t2 (add! 2 adder)
- t3 (add! 3 adder)
- #let [_ (io;run (&;poison adder))]]
- (test "Can use custom-defined actors."
- (case [t1 t2 t3]
- [[0 1] [1 3] [3 6]]
- true
+ ($_ seq
+ (test "Can check where an actor is alive."
+ (io;run (do Monad<IO>
+ [counter (new-Counter +0)]
+ (wrap (&;alive? counter)))))
+
+ (test "Can kill actors."
+ (io;run (do Monad<IO>
+ [counter (new-Counter +0)
+ killed? (&;kill counter)]
+ (wrap (and killed?
+ (not (&;alive? counter)))))))
+
+ (test "Can poison actors."
+ (io;run (do Monad<IO>
+ [counter (new-Counter +0)
+ poisoned? (&;poison counter)]
+ (wrap (and poisoned?
+ (not (&;alive? counter)))))))
+
+ (test "Cannot kill an already dead actor."
+ (io;run (do Monad<IO>
+ [counter (new-Counter +0)
+ first-time (&;kill counter)
+ second-time (&;kill counter)]
+ (wrap (and first-time
+ (not second-time))))))
+
+ (test "Cannot poison an already dead actor."
+ (io;run (do Monad<IO>
+ [counter (new-Counter +0)
+ first-time (&;kill counter)
+ second-time (&;poison counter)]
+ (wrap (and first-time
+ (not second-time))))))
+
+ (do P;Monad<Promise>
+ [result (do T;Monad<Task>
+ [#let [counter (io;run (new-Counter +0))]
+ output-1 (count! counter)
+ output-2 (count! counter)
+ output-3 (count! counter)]
+ (wrap (and (n.= +1 output-1)
+ (n.= +2 output-2)
+ (n.= +3 output-3))))]
+ (test "Can send messages to actors."
+ (case result
+ (#R;Success outcome)
+ outcome
- _
- false)))
- )))
+ (#R;Error error)
+ false)))
+ ))
diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux
index 2d9a45167..3447a55b2 100644
--- a/stdlib/test/test/lux/concurrency/frp.lux
+++ b/stdlib/test/test/lux/concurrency/frp.lux
@@ -4,13 +4,13 @@
(control monad)
(data [number]
text/format)
- (concurrency [promise #+ Promise Monad<Promise> "Promise/" Monad<Promise>]
+ (concurrency ["P" promise #+ "P/" Monad<Promise>]
["&" frp]))
lux/test)
-(def: (List->Chan values)
+(def: (to-channel values)
(-> (List Int) (&;Chan Int))
- (let [_chan (: (&;Chan Int) (&;chan))]
+ (let [_chan (&;chan Int)]
(io;run (do Monad<IO>
[_ (mapM @ (function [value] (&;write value _chan))
values)
@@ -19,8 +19,8 @@
(context: "FRP"
($_ seq
- (do Monad<Promise>
- [elems (&;consume (List->Chan (list 0 1 2 3 4 5)))]
+ (do P;Monad<Promise>
+ [elems (&;consume (to-channel (list 0 1 2 3 4 5)))]
(test "Can consume a chan into a list."
(case elems
(^ (list 0 1 2 3 4 5))
@@ -29,9 +29,9 @@
_
false)))
- (do Monad<Promise>
- [elems (&;consume (let [input (List->Chan (list 0 1 2 3 4 5))
- output (: (&;Chan Int) (&;chan))]
+ (do P;Monad<Promise>
+ [elems (&;consume (let [input (to-channel (list 0 1 2 3 4 5))
+ output (&;chan Int)]
(exec (&;pipe input output)
output)))]
(test "Can pipe one channel into another."
@@ -42,8 +42,8 @@
_
false)))
- (do Monad<Promise>
- [elems (&;consume (&;filter i.even? (List->Chan (list 0 1 2 3 4 5))))]
+ (do P;Monad<Promise>
+ [elems (&;consume (&;filter i.even? (to-channel (list 0 1 2 3 4 5))))]
(test "Can filter a channel's elements."
(case elems
(^ (list 0 2 4))
@@ -52,9 +52,9 @@
_
false)))
- (do Monad<Promise>
- [elems (&;consume (&;merge (list (List->Chan (list 0 1 2 3 4 5))
- (List->Chan (list 0 -1 -2 -3 -4 -5)))))]
+ (do P;Monad<Promise>
+ [elems (&;consume (&;merge (list (to-channel (list 0 1 2 3 4 5))
+ (to-channel (list 0 -1 -2 -3 -4 -5)))))]
(test "Can merge channels."
(case elems
(^ (list 0 1 2 3 4 5 0 -1 -2 -3 -4 -5))
@@ -63,13 +63,13 @@
_
false)))
- (do Monad<Promise>
- [output (&;fold (function [base input] (Promise/wrap (i.+ input base))) 0 (List->Chan (list 0 1 2 3 4 5)))]
+ (do P;Monad<Promise>
+ [output (&;fold (function [base input] (P/wrap (i.+ input base))) 0 (to-channel (list 0 1 2 3 4 5)))]
(test "Can fold over a channel."
(i.= 15 output)))
- (do Monad<Promise>
- [elems (&;consume (&;distinct number;Eq<Int> (List->Chan (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))]
+ (do P;Monad<Promise>
+ [elems (&;consume (&;distinct number;Eq<Int> (to-channel (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))]
(test "Can avoid immediate repetition in the channel."
(case elems
(^ (list 0 1 2 3 4 5))
@@ -78,8 +78,8 @@
_
false)))
- (do Monad<Promise>
- [elems (&;consume (&;once (:: promise;Monad<Promise> wrap 12345)))]
+ (do P;Monad<Promise>
+ [elems (&;consume (&;once (:: @ wrap 12345)))]
(test "Can convert a promise into a single-value channel."
(case elems
(^ (list 12345))
@@ -88,8 +88,8 @@
_
false)))
- (do Monad<Promise>
- [elems (&;consume (:: &;Functor<Chan> map i.inc (List->Chan (list 0 1 2 3 4 5))))]
+ (do P;Monad<Promise>
+ [elems (&;consume (:: &;Functor<Chan> map i.inc (to-channel (list 0 1 2 3 4 5))))]
(test "Functor goes over every element in a channel."
(case elems
(^ (list 1 2 3 4 5 6))
@@ -98,7 +98,7 @@
_
false)))
- (do Monad<Promise>
+ (do P;Monad<Promise>
[elems (&;consume (let [(^open) &;Applicative<Chan>]
(apply (wrap i.inc) (wrap 12345))))]
(test "Applicative works over all channel values."
@@ -109,7 +109,7 @@
_
false)))
- (do Monad<Promise>
+ (do P;Monad<Promise>
[elems (&;consume (do &;Monad<Chan>
[f (wrap i.inc)
a (wrap 12345)]
diff --git a/stdlib/test/test/lux/concurrency/promise.lux b/stdlib/test/test/lux/concurrency/promise.lux
index 30802085b..305cfe0f9 100644
--- a/stdlib/test/test/lux/concurrency/promise.lux
+++ b/stdlib/test/test/lux/concurrency/promise.lux
@@ -5,7 +5,7 @@
pipe)
(data [number]
text/format)
- (concurrency ["&" promise])
+ (concurrency ["&" promise "&/" Monad<Promise>])
["R" math/random])
lux/test)
@@ -49,14 +49,14 @@
(and ?left (not ?right))))
(test "Can poll a promise for its value."
- (and (|> (&;poll (:: &;Monad<Promise> wrap true))
+ (and (|> (&;poll (&/wrap true))
(case> (#;Some true) true _ false))
(|> (&;poll (&;delay +200 true))
(case> #;None true _ false))))
- (test "Cant re-resolve a resolved promise."
- (and (not (io;run (&;resolve false (:: &;Monad<Promise> wrap true))))
- (io;run (&;resolve true (: (&;Promise Bool) (&;promise))))))
+ (test "Cannot re-resolve a resolved promise."
+ (and (not (io;run (&;resolve false (&/wrap true))))
+ (io;run (&;resolve true (&;promise Bool)))))
(do &;Monad<Promise>
[?none (&;time-out +100 (&;delay +200 true))