From 6352437a8403b09fa0c83843984323ce1e67e980 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sun, 10 Dec 2017 14:14:08 -0400 Subject: - Some minor refactoring. - Eliminated the dependency of actors upon FRP and STM. - Fixed some bugs. --- stdlib/source/lux/concurrency/actor.lux | 197 ++++++++++++++--------------- stdlib/source/lux/concurrency/promise.lux | 29 ++--- stdlib/source/lux/concurrency/stm.lux | 66 +++------- stdlib/source/lux/test.lux | 7 +- stdlib/test/test/lux/concurrency/actor.lux | 21 +-- 5 files changed, 138 insertions(+), 182 deletions(-) (limited to 'stdlib') diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux index f91b3c57f..3a032e00f 100644 --- a/stdlib/source/lux/concurrency/actor.lux +++ b/stdlib/source/lux/concurrency/actor.lux @@ -4,36 +4,49 @@ ["p" parser] ["ex" exception #+ exception:]) [io #- run "io/" Monad] - (data text/format - (coll [list "list/" Monoid Monad Fold]) - [product]) + (data [product] + ["e" error] + text/format + (coll [list "list/" Monoid Monad Fold])) [macro #+ with-gensyms Monad] (macro [code] ["s" syntax #+ syntax: Syntax] (syntax ["cs" common] (common ["csr" reader] ["csw" writer]))) - (type abstract) - (lang [type])) - (// ["A" atom] - ["P" promise "P/" Monad] - ["T" task] - [stm #+ Monad] - [frp])) + (type abstract)) + (// [atom #+ Atom atom] + [promise #+ Promise promise "promise/" Monad] + [task #+ Task])) (exception: #export Poisoned) -(exception: #export Killed) (exception: #export Dead) ## [Types] (with-expansions - [ (as-is (-> s (Actor s) (T.Task s))) - (as-is [Text s (List )])] + [ (as-is (-> s (Actor s) (Task s))) + (as-is [Text s (List )]) + (as-is (Rec Mailbox (Promise [ Mailbox])))] + + (def: (obituary mailbox) + (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) + (case (promise.poll mailbox) + (#.Some [head tail]) + (#.Cons head (obituary tail)) + + #.None + #.Nil)) + (abstract: #export (Actor s) {#.doc "An actor, defined as all the necessities it requires."} - {#mailbox (stm.Var ) - #kill-switch (P.Promise Unit) - #obituary (P.Promise )} + {#mailbox (Atom ) + #obituary (Promise )} + + ## TODO: Delete after new-luxc becomes the new standard compiler. + (def: (actor mailbox obituary) + (All [s] (-> (Atom ) (Promise ) (Actor s))) + (@abstract {#mailbox mailbox + #obituary obituary})) (type: #export (Message s) ) @@ -43,60 +56,36 @@ (type: #export (Behavior s) {#.doc "An actor's behavior when messages are received."} - {#handle (-> (Message s) s (Actor s) (T.Task s)) - #end (-> Text s (P.Promise Unit))}) + {#handle (-> (Message s) s (Actor s) (Task s)) + #end (-> Text s (Promise Unit))}) (def: #export (spawn behavior init) {#.doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s] (-> (Behavior s) s (IO (Actor s)))) (io (let [[handle end] behavior - self (@abstract {#mailbox (stm.var (:! Message [])) - #kill-switch (P.promise #.None) - #obituary (P.promise #.None)}) - mailbox-channel (io.run (stm.follow (get@ #mailbox (@repr self)))) - |mailbox| (stm.var mailbox-channel) - _ (P/map (function [_] - (io.run (do Monad - [mb (stm.read! |mailbox|)] - (frp.close mb)))) - (get@ #kill-switch (@repr self))) + self (actor (atom (promise #.None)) + (promise #.None)) process (loop [state init - messages mailbox-channel] - (do P.Monad - [?messages+ messages] - (case ?messages+ - ## No kill-switch so far, so I may proceed... - (#.Some [message messages']) - (do P.Monad - [#let [_ (io.run (stm.write! messages' |mailbox|))] - ?state' (handle message state self)] - (case ?state' - (#.Left error) - (do @ - [#let [_ (io.run (do Monad - [_ (P.resolve [] (get@ #kill-switch (@repr self)))] - (frp.close messages')))] - _ (end error state) - remaining-messages (frp.consume messages')] - (wrap [error state (#.Cons message remaining-messages)])) - - (#.Right state') - (recur state' messages'))) - - ## Otherwise, clean-up and return current state. - #.None - (do P.Monad - [#let [_ (io.run (frp.close messages)) - death-message (Killed "")] - _ (end death-message state)] - (wrap [death-message state (list)])))))] + |mailbox| (io.run (atom.read (get@ #mailbox (@repr self))))] + (do promise.Monad + [[head tail] |mailbox| + ?state' (handle head state self)] + (case ?state' + (#e.Error error) + (do @ + [_ (end error state)] + (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] + (get@ #obituary (@repr self)))) + (wrap []))) + + (#e.Success state') + (recur state' tail))))] self))) (def: #export (alive? actor) (All [s] (-> (Actor s) Bool)) - (case [(P.poll (get@ #kill-switch (@repr actor))) - (P.poll (get@ #obituary (@repr actor)))] - [#.None #.None] + (case (promise.poll (get@ #obituary (@repr actor))) + #.None true _ @@ -106,27 +95,33 @@ {#.doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bool))) (if (alive? actor) - (do Monad - [_ (stm.write! message (get@ #mailbox (@repr actor)))] - (wrap true)) - (io/wrap false))) - - (def: #export (kill actor) - {#.doc "Immediately kills the given actor (if it is not already dead)."} - (All [s] (-> (Actor s) (io.IO Bool))) - (if (alive? actor) - (|> actor @repr (get@ #kill-switch) (P.resolve [])) + (let [entry [message (promise #.None)]] + (do Monad + [|mailbox| (atom.read (get@ #mailbox (@repr actor)))] + (loop [|mailbox| |mailbox|] + (case (promise.poll |mailbox|) + #.None + (do @ + [resolved? (promise.resolve entry |mailbox|)] + (if resolved? + (do @ + [_ (atom.write (product.right entry) (get@ #mailbox (@repr actor)))] + (wrap true)) + (recur |mailbox|))) + + (#.Some [_ |mailbox|']) + (recur |mailbox|'))))) (io/wrap false))) )) ## [Values] -(def: #export (default-handle message state self) - (All [s] (-> (Message s) s (Actor s) (T.Task s))) +(def: (default-handle message state self) + (All [s] (-> (Message s) s (Actor s) (Task s))) (message state self)) -(def: #export (default-end cause state) - (All [s] (-> Text s (P.Promise Unit))) - (P/wrap [])) +(def: (default-end cause state) + (All [s] (-> Text s (Promise Unit))) + (promise/wrap [])) (def: #export default-behavior (All [s] (Behavior s)) @@ -138,7 +133,7 @@ but allows the actor to handle previous messages."} (All [s] (-> (Actor s) (IO Bool))) (send (function [state self] - (T.throw Poisoned "")) + (task.throw Poisoned "")) actor)) ## [Syntax] @@ -206,16 +201,16 @@ Nat ((stop cause state) - (:: P.Monad wrap - (log! (if (ex.match? ..Killed cause) - (format "Counter was killed: " (%n state)) + (:: promise.Monad wrap + (log! (if (ex.match? ..Poisoned cause) + (format "Counter was poisoned: " (%n state)) cause))))) (actor: #export (Stack a) (List a) ((handle message state self) - (do T.Monad + (do task.Monad [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] @@ -240,23 +235,23 @@ (..Behavior ((~ g!type) (~+ g!vars)))) {#..handle (~ (case ?handle #.None - (` ..default-handle) + (` (~! ..default-handle)) (#.Some [[messageN stateN selfN] bodyC]) (` (function [(~ (code.local-symbol messageN)) (~ (code.local-symbol stateN)) (~ (code.local-symbol selfN))] - (do T.Monad + (do task.Monad [] (~ bodyC)))))) #..end (~ (case ?stop #.None - (` ..default-end) + (` (~! ..default-end)) (#.Some [[causeN stateN] bodyC]) (` (function [(~ (code.local-symbol causeN)) (~ (code.local-symbol stateN))] - (do P.Monad + (do promise.Monad [] (~ bodyC))))))})) (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) @@ -300,16 +295,18 @@ (message: #export Counter (count! [increment Nat] state self Nat) (let [state' (n/+ increment state)] - (T.return [state' state']))) + (task.return [state' state']))) (message: #export (Stack a) (push [value a] state self (List a)) (let [state' (#.Cons value state)] - (T.return [state' state']))))} + (task.return [state' state']))))} (with-gensyms [g!return g!error g!task g!sent?] (do @ - [actor-name (resolve-actor actor-name) - #let [g!type (code.symbol (product.both id state-name actor-name)) + [current-module macro.current-module-name + actor-name (resolve-actor actor-name) + #let [message-name [current-module (get@ #name signature)] + g!type (code.symbol (product.both id state-name actor-name)) g!message (code.local-symbol (get@ #name signature)) g!actor-vars (list/map code.local-symbol actor-vars) actorC (` ((~ (code.symbol actor-name)) (~+ g!actor-vars))) @@ -338,28 +335,30 @@ (~ (|> annotations (with-message actor-name) csw.annotations)) - (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (T.Task (~ (get@ #output signature))))) - (let [(~ g!task) (T.task (~ g!outputT))] + (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) + (let [(~ g!task) (task.task (~ g!outputT))] (io.run (do io.Monad [(~ g!sent?) (..send (function [(~ g!state) (~ g!self)] - (do P.Monad - [(~ g!return) (: (T.Task [((~ g!type) (~+ g!actor-refs)) - (~ g!outputT)]) - (do T.Monad + (do promise.Monad + [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) + (~ g!outputT)]) + (do task.Monad [] (~ body)))] (case (~ g!return) (#.Right [(~ g!state) (~ g!return)]) - (exec (io.run (P.resolve (#.Right (~ g!return)) (~ g!task))) - (T.return (~ g!state))) + (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) + (task.return (~ g!state))) (#.Left (~ g!error)) - (exec (io.run (P.resolve (#.Left (~ g!error)) (~ g!task))) - (T.fail (~ g!error)))) + (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) + (task.fail (~ g!error)))) )) (~ g!self))] (if (~ g!sent?) ((~' wrap) (~ g!task)) - ((~' wrap) (T.throw ..Dead "")))))))) + ((~' wrap) (<| (task.throw ..Dead) + (~ (code.text (format " Actor: " (%ident actor-name) "\n" + "Message: " (%ident message-name) "\n"))))))))))) )) ))) diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux index d45e8e55a..429a11931 100644 --- a/stdlib/source/lux/concurrency/promise.lux +++ b/stdlib/source/lux/concurrency/promise.lux @@ -5,31 +5,27 @@ (control [functor #+ Functor] [applicative #+ Applicative] [monad #+ do Monad]) + (data [product]) (concurrency [atom #+ Atom atom]))) (def: #export concurrency-level Nat ("lux process concurrency-level")) -(type: (Promise-State a) - {#value (Maybe a) - #observers (List (-> a (IO Top)))}) - (type: #export (Promise a) {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} - (Atom (Promise-State a))) + (Atom [(Maybe a) (List (-> a (IO Top)))])) (def: #export (promise ?value) (All [a] (-> (Maybe a) (Promise a))) - (atom {#value ?value - #observers (list)})) + (atom [?value (list)])) (def: #export (poll promise) {#.doc "Polls a promise's value."} (All [a] (-> (Promise a) (Maybe a))) (|> (atom.read promise) io.run - (get@ #value))) + product.left)) (def: #export (resolved? promise) {#.doc "Checks whether a promise's value has already been resolved."} @@ -45,31 +41,31 @@ {#.doc "Sets an promise's value if it has not been done yet."} (All [a] (-> a (Promise a) (IO Bool))) (do io.Monad - [old (atom.read promise)] - (case (get@ #value old) + [(^@ old [_value _observers]) (atom.read promise)] + (case _value (#.Some _) (wrap false) #.None (do @ - [#let [new (set@ #value (#.Some value) old)] + [#let [new [(#.Some value) _observers]] succeeded? (atom.compare-and-swap old new promise)] (if succeeded? (do @ [_ (monad.map @ (function [f] (f value)) - (get@ #observers old))] + _observers)] (wrap true)) (resolve value promise)))))) (def: #export (await f promise) (All [a] (-> (-> a (IO Top)) (Promise a) Top)) - (let [old (io.run (atom.read promise))] - (case (get@ #value old) + (let [(^@ old [_value _observers]) (io.run (atom.read promise))] + (case _value (#.Some value) (io.run (f value)) #.None - (let [new (update@ #observers (|>> (#.Cons f)) old)] + (let [new [_value (#.Cons f _observers)]] (if (io.run (atom.compare-and-swap old new promise)) [] (await f promise)))))) @@ -85,8 +81,7 @@ (def: functor Functor) (def: (wrap a) - (atom {#value (#.Some a) - #observers (list)})) + (atom [(#.Some a) (list)])) (def: (apply ff fa) (let [fb (promise #.None)] diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 0fe9ee9df..09daa5e2d 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -4,28 +4,20 @@ [applicative #+ Applicative] [monad #+ do Monad]) [io #- run] - (data (coll [list "list/" Functor Fold] - [dict #+ Dict]) - [product] - [text] - maybe + (data [product] + [maybe] [number "nat/" Codec] - text/format) - [macro] - (macro [code] - ["s" syntax #+ syntax: Syntax]) + [text] + (coll [list "list/" Functor Fold] + [dict #+ Dict])) (concurrency [atom #+ Atom atom] ["P" promise] [frp "frp/" Functor]) )) -(type: (Var-State a) - {#value a - #observers (Dict Text (-> a (IO Unit)))}) - (type: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom (Var-State a))) + (Atom [a (Dict Text (-> a (IO Unit)))])) (type: (Tx-Frame a) {#var (Var a) @@ -42,12 +34,11 @@ (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) - (atom.atom {#value value - #observers (dict.new text.Hash)})) + (atom.atom [value (dict.new text.Hash)])) (def: raw-read (All [a] (-> (Var a) a)) - (|>> atom.read io.run (get@ #value))) + (|>> atom.read io.run product.left)) (def: (find-var-value var tx) (All [a] (-> (Var a) Tx (Maybe a))) @@ -55,8 +46,8 @@ (list.find (function [[_var _original _current]] (is (:! (Var Unit) var) (:! (Var Unit) _var)))) - (:: Monad map (function [[_var _original _current]] - _current)) + (:: maybe.Monad map (function [[_var _original _current]] + _current)) (:!!) )) @@ -77,7 +68,7 @@ (All [a] (-> (Var a) (IO a))) (|> var atom.read - (:: Functor map (get@ #value)))) + (:: Functor map product.left))) (def: (update-tx-value var value tx) (All [a] (-> (Var a) a Tx Tx)) @@ -99,6 +90,7 @@ )) (def: #export (write value var) + {#.doc "Writes value to var."} (All [a] (-> a (Var a) (STM Unit))) (function [tx] (case (find-var-value var tx) @@ -110,18 +102,14 @@ [(#.Cons [var (raw-read var) value] tx) []]))) -(def: #export (write! new-value var) - {#.doc "Writes value to var immediately, without going through a transaction."} +(def: (write! new-value var) (All [a] (-> a (Var a) (IO Unit))) (do Monad - [old (atom.read var) - #let [old-value (get@ #value old) - new (set@ #value new-value old)] - succeeded? (atom.compare-and-swap old new var)] + [(^@ old [_value _observers]) (atom.read var) + succeeded? (atom.compare-and-swap old [new-value _observers] var)] (if succeeded? (do @ - [_ (|> old - (get@ #observers) + [_ (|> _observers dict.values (monad.map @ (function [f] (f new-value))))] (wrap [])) @@ -185,18 +173,6 @@ (let [[tx' ma] (mma tx)] (ma tx'))))) -(def: #export (update! f var) - {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} - (All [a] (-> (-> a a) (Var a) (IO [a a]))) - (io (loop [_ []] - (let [(^@ state [value observers]) (io.run (atom.read var)) - value' (f value)] - (if (io.run (atom.compare-and-swap state - [value' observers] - var)) - [value value'] - (recur [])))))) - (def: #export (update f var) {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) @@ -212,7 +188,7 @@ (is _original (raw-read _var))) tx)) -(def: (commit-var [_var _original _current]) +(def: (commit-var! [_var _original _current]) (-> (Ex [a] (Tx-Frame a)) Unit) (if (is _original _current) [] @@ -231,10 +207,10 @@ (def: (process-commit [stm-proc output]) (-> [(STM Unit) (P.Promise Unit)] Top) (let [[finished-tx value] (stm-proc fresh-tx)] - (if (can-commit? finished-tx) - (exec (list/map commit-var finished-tx) - (io.run (P.resolve value output))) - (io.run (write! [stm-proc output] pending-commits))))) + (io.run (if (can-commit? finished-tx) + (exec (list/map commit-var! finished-tx) + (P.resolve value output)) + (write! [stm-proc output] pending-commits))))) (def: init-processor! (IO Unit) diff --git a/stdlib/source/lux/test.lux b/stdlib/source/lux/test.lux index b755299cd..e000f6130 100644 --- a/stdlib/source/lux/test.lux +++ b/stdlib/source/lux/test.lux @@ -56,9 +56,10 @@ (def: #export (assert message condition) {#.doc "Check that a condition is true, and fail with the given message otherwise."} (-> Text Bool (Promise [Counters Text])) - (if condition - (:: promise.Monad wrap [success (format "[Success] " message)]) - (:: promise.Monad wrap [failure (format " [Error] " message)]))) + (<| (:: promise.Monad wrap) + (if condition + [success (format "[Success] " message)] + [failure (format " [Error] " message)]))) (def: #export (test message condition) {#.doc "Check that a condition is true, and fail with the given message otherwise."} diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux index f041ebe54..a5403d7d8 100644 --- a/stdlib/test/test/lux/concurrency/actor.lux +++ b/stdlib/test/test/lux/concurrency/actor.lux @@ -22,8 +22,8 @@ (wrap output))) ((stop cause state) - (P/wrap (log! (if (ex.match? &.Killed cause) - (format "Counter was killed: " (%n state)) + (P/wrap (log! (if (ex.match? &.Poisoned cause) + (format "Counter was poisoned: " (%n state)) cause))))) (message: #export Counter @@ -37,13 +37,6 @@ (io.run (do io.Monad [counter (new@Counter +0)] (wrap (&.alive? counter))))) - - (test "Can kill actors." - (io.run (do io.Monad - [counter (new@Counter +0) - killed? (&.kill counter)] - (wrap (and killed? - (not (&.alive? counter))))))) (test "Can poison actors." (io.run (do io.Monad @@ -52,18 +45,10 @@ (wrap (and poisoned? (not (&.alive? counter))))))) - (test "Cannot kill an already dead actor." - (io.run (do io.Monad - [counter (new@Counter +0) - first-time (&.kill counter) - second-time (&.kill counter)] - (wrap (and first-time - (not second-time)))))) - (test "Cannot poison an already dead actor." (io.run (do io.Monad [counter (new@Counter +0) - first-time (&.kill counter) + first-time (&.poison counter) second-time (&.poison counter)] (wrap (and first-time (not second-time)))))) -- cgit v1.2.3