aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux197
-rw-r--r--stdlib/source/lux/concurrency/promise.lux29
-rw-r--r--stdlib/source/lux/concurrency/stm.lux66
-rw-r--r--stdlib/source/lux/test.lux7
-rw-r--r--stdlib/test/test/lux/concurrency/actor.lux21
5 files changed, 138 insertions, 182 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index f91b3c57f..3a032e00f 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -4,36 +4,49 @@
["p" parser]
["ex" exception #+ exception:])
[io #- run "io/" Monad<IO>]
- (data text/format
- (coll [list "list/" Monoid<List> Monad<List> Fold<List>])
- [product])
+ (data [product]
+ ["e" error]
+ text/format
+ (coll [list "list/" Monoid<List> Monad<List> Fold<List>]))
[macro #+ with-gensyms Monad<Meta>]
(macro [code]
["s" syntax #+ syntax: Syntax]
(syntax ["cs" common]
(common ["csr" reader]
["csw" writer])))
- (type abstract)
- (lang [type]))
- (// ["A" atom]
- ["P" promise "P/" Monad<Promise>]
- ["T" task]
- [stm #+ Monad<STM>]
- [frp]))
+ (type abstract))
+ (// [atom #+ Atom atom]
+ [promise #+ Promise promise "promise/" Monad<Promise>]
+ [task #+ Task]))
(exception: #export Poisoned)
-(exception: #export Killed)
(exception: #export Dead)
## [Types]
(with-expansions
- [<Message> (as-is (-> s (Actor s) (T.Task s)))
- <Obituary> (as-is [Text s (List <Message>)])]
+ [<Message> (as-is (-> s (Actor s) (Task s)))
+ <Obituary> (as-is [Text s (List <Message>)])
+ <Mailbox> (as-is (Rec Mailbox (Promise [<Message> Mailbox])))]
+
+ (def: (obituary mailbox)
+ (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a)))
+ (case (promise.poll mailbox)
+ (#.Some [head tail])
+ (#.Cons head (obituary tail))
+
+ #.None
+ #.Nil))
+
(abstract: #export (Actor s)
{#.doc "An actor, defined as all the necessities it requires."}
- {#mailbox (stm.Var <Message>)
- #kill-switch (P.Promise Unit)
- #obituary (P.Promise <Obituary>)}
+ {#mailbox (Atom <Mailbox>)
+ #obituary (Promise <Obituary>)}
+
+ ## TODO: Delete after new-luxc becomes the new standard compiler.
+ (def: (actor mailbox obituary)
+ (All [s] (-> (Atom <Mailbox>) (Promise <Obituary>) (Actor s)))
+ (@abstract {#mailbox mailbox
+ #obituary obituary}))
(type: #export (Message s)
<Message>)
@@ -43,60 +56,36 @@
(type: #export (Behavior s)
{#.doc "An actor's behavior when messages are received."}
- {#handle (-> (Message s) s (Actor s) (T.Task s))
- #end (-> Text s (P.Promise Unit))})
+ {#handle (-> (Message s) s (Actor s) (Task s))
+ #end (-> Text s (Promise Unit))})
(def: #export (spawn behavior init)
{#.doc "Given a behavior and initial state, spawns an actor and returns it."}
(All [s] (-> (Behavior s) s (IO (Actor s))))
(io (let [[handle end] behavior
- self (@abstract {#mailbox (stm.var (:! Message []))
- #kill-switch (P.promise #.None)
- #obituary (P.promise #.None)})
- mailbox-channel (io.run (stm.follow (get@ #mailbox (@repr self))))
- |mailbox| (stm.var mailbox-channel)
- _ (P/map (function [_]
- (io.run (do Monad<IO>
- [mb (stm.read! |mailbox|)]
- (frp.close mb))))
- (get@ #kill-switch (@repr self)))
+ self (actor (atom (promise #.None))
+ (promise #.None))
process (loop [state init
- messages mailbox-channel]
- (do P.Monad<Promise>
- [?messages+ messages]
- (case ?messages+
- ## No kill-switch so far, so I may proceed...
- (#.Some [message messages'])
- (do P.Monad<Promise>
- [#let [_ (io.run (stm.write! messages' |mailbox|))]
- ?state' (handle message state self)]
- (case ?state'
- (#.Left error)
- (do @
- [#let [_ (io.run (do Monad<IO>
- [_ (P.resolve [] (get@ #kill-switch (@repr self)))]
- (frp.close messages')))]
- _ (end error state)
- remaining-messages (frp.consume messages')]
- (wrap [error state (#.Cons message remaining-messages)]))
-
- (#.Right state')
- (recur state' messages')))
-
- ## Otherwise, clean-up and return current state.
- #.None
- (do P.Monad<Promise>
- [#let [_ (io.run (frp.close messages))
- death-message (Killed "")]
- _ (end death-message state)]
- (wrap [death-message state (list)])))))]
+ |mailbox| (io.run (atom.read (get@ #mailbox (@repr self))))]
+ (do promise.Monad<Promise>
+ [[head tail] |mailbox|
+ ?state' (handle head state self)]
+ (case ?state'
+ (#e.Error error)
+ (do @
+ [_ (end error state)]
+ (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))]
+ (get@ #obituary (@repr self))))
+ (wrap [])))
+
+ (#e.Success state')
+ (recur state' tail))))]
self)))
(def: #export (alive? actor)
(All [s] (-> (Actor s) Bool))
- (case [(P.poll (get@ #kill-switch (@repr actor)))
- (P.poll (get@ #obituary (@repr actor)))]
- [#.None #.None]
+ (case (promise.poll (get@ #obituary (@repr actor)))
+ #.None
true
_
@@ -106,27 +95,33 @@
{#.doc "Communicate with an actor through message passing."}
(All [s] (-> (Message s) (Actor s) (IO Bool)))
(if (alive? actor)
- (do Monad<IO>
- [_ (stm.write! message (get@ #mailbox (@repr actor)))]
- (wrap true))
- (io/wrap false)))
-
- (def: #export (kill actor)
- {#.doc "Immediately kills the given actor (if it is not already dead)."}
- (All [s] (-> (Actor s) (io.IO Bool)))
- (if (alive? actor)
- (|> actor @repr (get@ #kill-switch) (P.resolve []))
+ (let [entry [message (promise #.None)]]
+ (do Monad<IO>
+ [|mailbox| (atom.read (get@ #mailbox (@repr actor)))]
+ (loop [|mailbox| |mailbox|]
+ (case (promise.poll |mailbox|)
+ #.None
+ (do @
+ [resolved? (promise.resolve entry |mailbox|)]
+ (if resolved?
+ (do @
+ [_ (atom.write (product.right entry) (get@ #mailbox (@repr actor)))]
+ (wrap true))
+ (recur |mailbox|)))
+
+ (#.Some [_ |mailbox|'])
+ (recur |mailbox|')))))
(io/wrap false)))
))
## [Values]
-(def: #export (default-handle message state self)
- (All [s] (-> (Message s) s (Actor s) (T.Task s)))
+(def: (default-handle message state self)
+ (All [s] (-> (Message s) s (Actor s) (Task s)))
(message state self))
-(def: #export (default-end cause state)
- (All [s] (-> Text s (P.Promise Unit)))
- (P/wrap []))
+(def: (default-end cause state)
+ (All [s] (-> Text s (Promise Unit)))
+ (promise/wrap []))
(def: #export default-behavior
(All [s] (Behavior s))
@@ -138,7 +133,7 @@
but allows the actor to handle previous messages."}
(All [s] (-> (Actor s) (IO Bool)))
(send (function [state self]
- (T.throw Poisoned ""))
+ (task.throw Poisoned ""))
actor))
## [Syntax]
@@ -206,16 +201,16 @@
Nat
((stop cause state)
- (:: P.Monad<Promise> wrap
- (log! (if (ex.match? ..Killed cause)
- (format "Counter was killed: " (%n state))
+ (:: promise.Monad<Promise> wrap
+ (log! (if (ex.match? ..Poisoned cause)
+ (format "Counter was poisoned: " (%n state))
cause)))))
(actor: #export (Stack a)
(List a)
((handle message state self)
- (do T.Monad<Task>
+ (do task.Monad<Task>
[#let [_ (log! "BEFORE")]
output (message state self)
#let [_ (log! "AFTER")]]
@@ -240,23 +235,23 @@
(..Behavior ((~ g!type) (~+ g!vars))))
{#..handle (~ (case ?handle
#.None
- (` ..default-handle)
+ (` (~! ..default-handle))
(#.Some [[messageN stateN selfN] bodyC])
(` (function [(~ (code.local-symbol messageN))
(~ (code.local-symbol stateN))
(~ (code.local-symbol selfN))]
- (do T.Monad<Task>
+ (do task.Monad<Task>
[]
(~ bodyC))))))
#..end (~ (case ?stop
#.None
- (` ..default-end)
+ (` (~! ..default-end))
(#.Some [[causeN stateN] bodyC])
(` (function [(~ (code.local-symbol causeN))
(~ (code.local-symbol stateN))]
- (do P.Monad<Promise>
+ (do promise.Monad<Promise>
[]
(~ bodyC))))))}))
(` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init))
@@ -300,16 +295,18 @@
(message: #export Counter
(count! [increment Nat] state self Nat)
(let [state' (n/+ increment state)]
- (T.return [state' state'])))
+ (task.return [state' state'])))
(message: #export (Stack a)
(push [value a] state self (List a))
(let [state' (#.Cons value state)]
- (T.return [state' state']))))}
+ (task.return [state' state']))))}
(with-gensyms [g!return g!error g!task g!sent?]
(do @
- [actor-name (resolve-actor actor-name)
- #let [g!type (code.symbol (product.both id state-name actor-name))
+ [current-module macro.current-module-name
+ actor-name (resolve-actor actor-name)
+ #let [message-name [current-module (get@ #name signature)]
+ g!type (code.symbol (product.both id state-name actor-name))
g!message (code.local-symbol (get@ #name signature))
g!actor-vars (list/map code.local-symbol actor-vars)
actorC (` ((~ (code.symbol actor-name)) (~+ g!actor-vars)))
@@ -338,28 +335,30 @@
(~ (|> annotations
(with-message actor-name)
csw.annotations))
- (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (T.Task (~ (get@ #output signature)))))
- (let [(~ g!task) (T.task (~ g!outputT))]
+ (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature)))))
+ (let [(~ g!task) (task.task (~ g!outputT))]
(io.run (do io.Monad<IO>
[(~ g!sent?) (..send (function [(~ g!state) (~ g!self)]
- (do P.Monad<Promise>
- [(~ g!return) (: (T.Task [((~ g!type) (~+ g!actor-refs))
- (~ g!outputT)])
- (do T.Monad<Task>
+ (do promise.Monad<Promise>
+ [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs))
+ (~ g!outputT)])
+ (do task.Monad<Task>
[]
(~ body)))]
(case (~ g!return)
(#.Right [(~ g!state) (~ g!return)])
- (exec (io.run (P.resolve (#.Right (~ g!return)) (~ g!task)))
- (T.return (~ g!state)))
+ (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task)))
+ (task.return (~ g!state)))
(#.Left (~ g!error))
- (exec (io.run (P.resolve (#.Left (~ g!error)) (~ g!task)))
- (T.fail (~ g!error))))
+ (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task)))
+ (task.fail (~ g!error))))
))
(~ g!self))]
(if (~ g!sent?)
((~' wrap) (~ g!task))
- ((~' wrap) (T.throw ..Dead ""))))))))
+ ((~' wrap) (<| (task.throw ..Dead)
+ (~ (code.text (format " Actor: " (%ident actor-name) "\n"
+ "Message: " (%ident message-name) "\n")))))))))))
))
)))
diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux
index d45e8e55a..429a11931 100644
--- a/stdlib/source/lux/concurrency/promise.lux
+++ b/stdlib/source/lux/concurrency/promise.lux
@@ -5,31 +5,27 @@
(control [functor #+ Functor]
[applicative #+ Applicative]
[monad #+ do Monad])
+ (data [product])
(concurrency [atom #+ Atom atom])))
(def: #export concurrency-level
Nat
("lux process concurrency-level"))
-(type: (Promise-State a)
- {#value (Maybe a)
- #observers (List (-> a (IO Top)))})
-
(type: #export (Promise a)
{#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."}
- (Atom (Promise-State a)))
+ (Atom [(Maybe a) (List (-> a (IO Top)))]))
(def: #export (promise ?value)
(All [a] (-> (Maybe a) (Promise a)))
- (atom {#value ?value
- #observers (list)}))
+ (atom [?value (list)]))
(def: #export (poll promise)
{#.doc "Polls a promise's value."}
(All [a] (-> (Promise a) (Maybe a)))
(|> (atom.read promise)
io.run
- (get@ #value)))
+ product.left))
(def: #export (resolved? promise)
{#.doc "Checks whether a promise's value has already been resolved."}
@@ -45,31 +41,31 @@
{#.doc "Sets an promise's value if it has not been done yet."}
(All [a] (-> a (Promise a) (IO Bool)))
(do io.Monad<IO>
- [old (atom.read promise)]
- (case (get@ #value old)
+ [(^@ old [_value _observers]) (atom.read promise)]
+ (case _value
(#.Some _)
(wrap false)
#.None
(do @
- [#let [new (set@ #value (#.Some value) old)]
+ [#let [new [(#.Some value) _observers]]
succeeded? (atom.compare-and-swap old new promise)]
(if succeeded?
(do @
[_ (monad.map @ (function [f] (f value))
- (get@ #observers old))]
+ _observers)]
(wrap true))
(resolve value promise))))))
(def: #export (await f promise)
(All [a] (-> (-> a (IO Top)) (Promise a) Top))
- (let [old (io.run (atom.read promise))]
- (case (get@ #value old)
+ (let [(^@ old [_value _observers]) (io.run (atom.read promise))]
+ (case _value
(#.Some value)
(io.run (f value))
#.None
- (let [new (update@ #observers (|>> (#.Cons f)) old)]
+ (let [new [_value (#.Cons f _observers)]]
(if (io.run (atom.compare-and-swap old new promise))
[]
(await f promise))))))
@@ -85,8 +81,7 @@
(def: functor Functor<Promise>)
(def: (wrap a)
- (atom {#value (#.Some a)
- #observers (list)}))
+ (atom [(#.Some a) (list)]))
(def: (apply ff fa)
(let [fb (promise #.None)]
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 0fe9ee9df..09daa5e2d 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -4,28 +4,20 @@
[applicative #+ Applicative]
[monad #+ do Monad])
[io #- run]
- (data (coll [list "list/" Functor<List> Fold<List>]
- [dict #+ Dict])
- [product]
- [text]
- maybe
+ (data [product]
+ [maybe]
[number "nat/" Codec<Text,Nat>]
- text/format)
- [macro]
- (macro [code]
- ["s" syntax #+ syntax: Syntax])
+ [text]
+ (coll [list "list/" Functor<List> Fold<List>]
+ [dict #+ Dict]))
(concurrency [atom #+ Atom atom]
["P" promise]
[frp "frp/" Functor<Channel>])
))
-(type: (Var-State a)
- {#value a
- #observers (Dict Text (-> a (IO Unit)))})
-
(type: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom (Var-State a)))
+ (Atom [a (Dict Text (-> a (IO Unit)))]))
(type: (Tx-Frame a)
{#var (Var a)
@@ -42,12 +34,11 @@
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
- (atom.atom {#value value
- #observers (dict.new text.Hash<Text>)}))
+ (atom.atom [value (dict.new text.Hash<Text>)]))
(def: raw-read
(All [a] (-> (Var a) a))
- (|>> atom.read io.run (get@ #value)))
+ (|>> atom.read io.run product.left))
(def: (find-var-value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
@@ -55,8 +46,8 @@
(list.find (function [[_var _original _current]]
(is (:! (Var Unit) var)
(:! (Var Unit) _var))))
- (:: Monad<Maybe> map (function [[_var _original _current]]
- _current))
+ (:: maybe.Monad<Maybe> map (function [[_var _original _current]]
+ _current))
(:!!)
))
@@ -77,7 +68,7 @@
(All [a] (-> (Var a) (IO a)))
(|> var
atom.read
- (:: Functor<IO> map (get@ #value))))
+ (:: Functor<IO> map product.left)))
(def: (update-tx-value var value tx)
(All [a] (-> (Var a) a Tx Tx))
@@ -99,6 +90,7 @@
))
(def: #export (write value var)
+ {#.doc "Writes value to var."}
(All [a] (-> a (Var a) (STM Unit)))
(function [tx]
(case (find-var-value var tx)
@@ -110,18 +102,14 @@
[(#.Cons [var (raw-read var) value] tx)
[]])))
-(def: #export (write! new-value var)
- {#.doc "Writes value to var immediately, without going through a transaction."}
+(def: (write! new-value var)
(All [a] (-> a (Var a) (IO Unit)))
(do Monad<IO>
- [old (atom.read var)
- #let [old-value (get@ #value old)
- new (set@ #value new-value old)]
- succeeded? (atom.compare-and-swap old new var)]
+ [(^@ old [_value _observers]) (atom.read var)
+ succeeded? (atom.compare-and-swap old [new-value _observers] var)]
(if succeeded?
(do @
- [_ (|> old
- (get@ #observers)
+ [_ (|> _observers
dict.values
(monad.map @ (function [f] (f new-value))))]
(wrap []))
@@ -185,18 +173,6 @@
(let [[tx' ma] (mma tx)]
(ma tx')))))
-(def: #export (update! f var)
- {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
- (All [a] (-> (-> a a) (Var a) (IO [a a])))
- (io (loop [_ []]
- (let [(^@ state [value observers]) (io.run (atom.read var))
- value' (f value)]
- (if (io.run (atom.compare-and-swap state
- [value' observers]
- var))
- [value value']
- (recur []))))))
-
(def: #export (update f var)
{#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (STM [a a])))
@@ -212,7 +188,7 @@
(is _original (raw-read _var)))
tx))
-(def: (commit-var [_var _original _current])
+(def: (commit-var! [_var _original _current])
(-> (Ex [a] (Tx-Frame a)) Unit)
(if (is _original _current)
[]
@@ -231,10 +207,10 @@
(def: (process-commit [stm-proc output])
(-> [(STM Unit) (P.Promise Unit)] Top)
(let [[finished-tx value] (stm-proc fresh-tx)]
- (if (can-commit? finished-tx)
- (exec (list/map commit-var finished-tx)
- (io.run (P.resolve value output)))
- (io.run (write! [stm-proc output] pending-commits)))))
+ (io.run (if (can-commit? finished-tx)
+ (exec (list/map commit-var! finished-tx)
+ (P.resolve value output))
+ (write! [stm-proc output] pending-commits)))))
(def: init-processor!
(IO Unit)
diff --git a/stdlib/source/lux/test.lux b/stdlib/source/lux/test.lux
index b755299cd..e000f6130 100644
--- a/stdlib/source/lux/test.lux
+++ b/stdlib/source/lux/test.lux
@@ -56,9 +56,10 @@
(def: #export (assert message condition)
{#.doc "Check that a condition is true, and fail with the given message otherwise."}
(-> Text Bool (Promise [Counters Text]))
- (if condition
- (:: promise.Monad<Promise> wrap [success (format "[Success] " message)])
- (:: promise.Monad<Promise> wrap [failure (format " [Error] " message)])))
+ (<| (:: promise.Monad<Promise> wrap)
+ (if condition
+ [success (format "[Success] " message)]
+ [failure (format " [Error] " message)])))
(def: #export (test message condition)
{#.doc "Check that a condition is true, and fail with the given message otherwise."}
diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux
index f041ebe54..a5403d7d8 100644
--- a/stdlib/test/test/lux/concurrency/actor.lux
+++ b/stdlib/test/test/lux/concurrency/actor.lux
@@ -22,8 +22,8 @@
(wrap output)))
((stop cause state)
- (P/wrap (log! (if (ex.match? &.Killed cause)
- (format "Counter was killed: " (%n state))
+ (P/wrap (log! (if (ex.match? &.Poisoned cause)
+ (format "Counter was poisoned: " (%n state))
cause)))))
(message: #export Counter
@@ -37,13 +37,6 @@
(io.run (do io.Monad<IO>
[counter (new@Counter +0)]
(wrap (&.alive? counter)))))
-
- (test "Can kill actors."
- (io.run (do io.Monad<IO>
- [counter (new@Counter +0)
- killed? (&.kill counter)]
- (wrap (and killed?
- (not (&.alive? counter)))))))
(test "Can poison actors."
(io.run (do io.Monad<IO>
@@ -52,18 +45,10 @@
(wrap (and poisoned?
(not (&.alive? counter)))))))
- (test "Cannot kill an already dead actor."
- (io.run (do io.Monad<IO>
- [counter (new@Counter +0)
- first-time (&.kill counter)
- second-time (&.kill counter)]
- (wrap (and first-time
- (not second-time))))))
-
(test "Cannot poison an already dead actor."
(io.run (do io.Monad<IO>
[counter (new@Counter +0)
- first-time (&.kill counter)
+ first-time (&.poison counter)
second-time (&.poison counter)]
(wrap (and first-time
(not second-time))))))