From 0c3c53a714019a543e70e0751387b5dfccc3b872 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Thu, 3 Aug 2017 19:44:43 -0400 Subject: - Some refactoring and small bug fixes. --- stdlib/source/lux/concurrency/actor.lux | 229 +++++++++++++++++--------------- 1 file changed, 121 insertions(+), 108 deletions(-) diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux index dd34a7ae8..94dc81e48 100644 --- a/stdlib/source/lux/concurrency/actor.lux +++ b/stdlib/source/lux/concurrency/actor.lux @@ -13,7 +13,8 @@ (syntax ["cs" common] (common ["csr" reader] ["csw" writer]))) - [type]) + [type] + (type model)) (.. ["A" atom] ["P" promise "P/" Monad] ["T" task] @@ -22,27 +23,102 @@ (exception: #export Poisoned) (exception: #export Killed) +(exception: #export Dead) ## [Types] (with-expansions [ (as-is (-> s (Actor s) (T;Task s))) (as-is [Text s (List )])] - (type: #export (Actor s) + (model: #export (Actor s) {#;doc "An actor, defined as all the necessities it requires."} {#mailbox (stm;Var ) #kill-switch (P;Promise Unit) - #obituary (P;Promise )}) - - (type: #export (Message s) - ) - - (type: #export (Obituary s) - )) - -(type: #export (Behavior s) - {#;doc "An actor's behavior when messages are received."} - {#handle (-> (Message s) s (Actor s) (T;Task s)) - #end (-> Text s (P;Promise Unit))}) + #obituary (P;Promise )} + + (type: #export (Message s) + ) + + (type: #export (Obituary s) + ) + + (type: #export (Behavior s) + {#;doc "An actor's behavior when messages are received."} + {#handle (-> (Message s) s (Actor s) (T;Task s)) + #end (-> Text s (P;Promise Unit))}) + + (def: #export (spawn behavior init) + {#;doc "Given a behavior and initial state, spawns an actor and returns it."} + (All [s] (-> (Behavior s) s (IO (Actor s)))) + (io (let [[handle end] behavior + self (: (Actor ($ +0)) + (@model {#mailbox (stm;var (:! (Message ($ +0)) [])) + #kill-switch (P;promise Unit) + #obituary (P;promise (Obituary ($ +0)))})) + mailbox-channel (io;run (stm;follow (get@ #mailbox (@repr self)))) + |mailbox| (stm;var mailbox-channel) + _ (P/map (function [_] + (io;run (do Monad + [mb (stm;read! |mailbox|)] + (frp;close mb)))) + (get@ #kill-switch (@repr self))) + process (loop [state init + messages mailbox-channel] + (do P;Monad + [?messages+ messages] + (case ?messages+ + ## No kill-switch so far, so I may proceed... + (#;Some [message messages']) + (do P;Monad + [#let [_ (io;run (stm;write! messages' |mailbox|))] + ?state' (handle message state self)] + (case ?state' + (#;Left error) + (do @ + [#let [_ (io;run (do Monad + [_ (P;resolve [] (get@ #kill-switch (@repr self)))] + (frp;close messages')))] + _ (end error state) + remaining-messages (frp;consume messages')] + (wrap [error state (#;Cons message remaining-messages)])) + + (#;Right state') + (recur state' messages'))) + + ## Otherwise, clean-up and return current state. + #;None + (do P;Monad + [#let [_ (io;run (frp;close messages)) + death-message (Killed "")] + _ (end death-message state)] + (wrap [death-message state (list)])))))] + self))) + + (def: #export (alive? actor) + (All [s] (-> (Actor s) Bool)) + (case [(P;poll (get@ #kill-switch (@repr actor))) + (P;poll (get@ #obituary (@repr actor)))] + [#;None #;None] + true + + _ + false)) + + (def: #export (send message actor) + {#;doc "Communicate with an actor through message passing."} + (All [s] (-> (Message s) (Actor s) (IO Bool))) + (if (alive? actor) + (do Monad + [_ (stm;write! message (get@ #mailbox (@repr actor)))] + (wrap true)) + (io/wrap false))) + + (def: #export (kill actor) + {#;doc "Immediately kills the given actor (if it is not already dead)."} + (All [s] (-> (Actor s) (io;IO Bool))) + (if (alive? actor) + (|> actor @repr (get@ #kill-switch) (P;resolve [])) + (io/wrap false))) + )) ## [Values] (def: #export (default-handle message state self) @@ -53,76 +129,10 @@ (All [s] (-> Text s (P;Promise Unit))) (P/wrap [])) -(def: #export (spawn behavior init) - {#;doc "Given a behavior and initial state, spawns an actor and returns it."} - (All [s] (-> (Behavior s) s (IO (Actor s)))) - (io (let [[handle end] behavior - self (: (Actor ($ +0)) - {#mailbox (stm;var (:! (Message ($ +0)) [])) - #kill-switch (P;promise Unit) - #obituary (P;promise (Obituary ($ +0)))}) - mailbox-channel (io;run (stm;follow (get@ #mailbox self))) - |mailbox| (stm;var mailbox-channel) - _ (P/map (function [_] - (io;run (do Monad - [mb (stm;read! |mailbox|)] - (frp;close mb)))) - (get@ #kill-switch self)) - process (loop [state init - messages mailbox-channel] - (do P;Monad - [?messages+ messages] - (case ?messages+ - ## No kill-switch so far, so I may proceed... - (#;Some [message messages']) - (do P;Monad - [#let [_ (io;run (stm;write! messages' |mailbox|))] - ?state' (handle message state self)] - (case ?state' - (#;Left error) - (do @ - [#let [_ (io;run (P;resolve [] (get@ #kill-switch self))) - _ (io;run (frp;close messages'))] - _ (end error state) - remaining-messages (frp;consume messages')] - (wrap [error state (#;Cons message remaining-messages)])) - - (#;Right state') - (recur state' messages'))) - - ## Otherwise, clean-up and return current state. - #;None - (do P;Monad - [#let [_ (io;run (frp;close messages)) - death-message (Killed "")] - _ (end death-message state)] - (wrap [death-message state (list)])))))] - self))) - -(def: #export (alive? actor) - (All [s] (-> (Actor s) Bool)) - (case [(P;poll (get@ #kill-switch actor)) - (P;poll (get@ #obituary actor))] - [#;None #;None] - true - - _ - false)) - -(def: #export (send message actor) - {#;doc "Communicate with an actor through message passing."} - (All [s] (-> (Message s) (Actor s) (IO Bool))) - (if (alive? actor) - (exec (io;run (stm;write! message (get@ #mailbox actor))) - (io/wrap true)) - (io/wrap false))) - -(def: #export (kill actor) - {#;doc "Immediately kills the given actor (if it is not already dead)."} - (All [s] (-> (Actor s) (io;IO Bool))) - (if (alive? actor) - (|> actor (get@ #kill-switch) (P;resolve [])) - (io/wrap false))) +(def: #export default-behavior + (All [s] (Behavior s)) + {#handle default-handle + #end default-end}) (def: #export (poison actor) {#;doc "Kills the actor by sending a message that will kill it upon processing, @@ -134,12 +144,12 @@ ## [Syntax] (do-template [ ] - [(def: ( name) + [(def: #hidden ( name) (-> Ident cs;Annotations cs;Annotations) (|>. (#;Cons [(ident-for ) (code;tag name)]))) - (def: ( name) + (def: #hidden ( name) (-> Ident (Lux Ident)) (do Monad [name (macro;normalize name) @@ -161,7 +171,7 @@ (p;seq s;local-symbol (:: p;Monad wrap (list))))) (do-template [ ] - [(def: + [(def: #hidden (-> Text Text) (|>. (format "@")))] @@ -202,7 +212,7 @@ (log! (if (ex;match? ;;Killed cause) (format "Counter was killed: " (%n state)) cause))))) - + (actor: #export (Stack a) (List a) @@ -243,7 +253,7 @@ (~ bodyC)))))) #;;end (~ (case ?stop #;None - (` ;;default-handle) + (` ;;default-end) (#;Some [[causeN stateN] bodyC]) (` (function [(~ (code;local-symbol causeN)) @@ -298,7 +308,7 @@ (push [value a] state self (List a)) (let [state' (#;Cons value state)] (T;return [state' state']))))} - (with-gensyms [g!return g!error g!task] + (with-gensyms [g!return g!error g!task g!sent?] (do @ [actor-name (resolve-actor actor-name) #let [g!type (code;symbol (product;both id state-name actor-name)) @@ -321,23 +331,26 @@ csw;annotations)) (All [(~@ g!tvars)] (-> (~@ g!inputsT) (~ g!actor) (T;Task (~ g!outputT)))) (let [(~ g!task) (T;task (~ g!outputT))] - (exec (;;send (function [(~ g!state) (~ g!self)] - (do P;Monad - [(~ g!return) (: (T;Task [((~ g!type) (~@ g!refs)) - (~ g!outputT)]) - (do T;Monad - [] - (~ body)))] - (case (~ g!return) - (#;Right [(~ g!state) (~ g!return)]) - (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task))) - (T;return (~ g!state))) - - (#;Left (~ g!error)) - (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task))) - (T;fail (~ g!error)))) - )) - (~ g!self)) - (~ g!task))))) + (io;run (do io;Monad + [(~ g!sent?) (;;send (function [(~ g!state) (~ g!self)] + (do P;Monad + [(~ g!return) (: (T;Task [((~ g!type) (~@ g!refs)) + (~ g!outputT)]) + (do T;Monad + [] + (~ body)))] + (case (~ g!return) + (#;Right [(~ g!state) (~ g!return)]) + (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task))) + (T;return (~ g!state))) + + (#;Left (~ g!error)) + (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task))) + (T;fail (~ g!error)))) + )) + (~ g!self))] + (if (~ g!sent?) + ((~' wrap) (~ g!task)) + ((~' wrap) (T;throw ;;Dead "")))))))) )) ))) -- cgit v1.2.3