diff options
author | Eduardo Julian | 2016-12-01 11:00:44 -0400 |
---|---|---|
committer | Eduardo Julian | 2016-12-01 11:00:44 -0400 |
commit | 7f66c54f4c9753b94dbf46ec50b8b16549daf324 (patch) | |
tree | 1b5b896cfba870a66a99a03315b09df842eb5737 /stdlib/source/lux/concurrency/actor.lux | |
parent | 9c30546af022f8fe36b73e7e93414257ff28ee75 (diff) |
- Collected the Lux compiler's repo, the Standard Library's, the Leiningen plugin's and the Emacs mode's into a big monorepo, to keep development unified.
Diffstat (limited to 'stdlib/source/lux/concurrency/actor.lux')
-rw-r--r-- | stdlib/source/lux/concurrency/actor.lux | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux new file mode 100644 index 000000000..1eb3cee21 --- /dev/null +++ b/stdlib/source/lux/concurrency/actor.lux @@ -0,0 +1,278 @@ +## Copyright (c) Eduardo Julian. All rights reserved. +## This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +## If a copy of the MPL was not distributed with this file, +## You can obtain one at http://mozilla.org/MPL/2.0/. + +(;module: + lux + (lux (control monad) + (codata [io #- run] + function) + (data error + text/format + (struct [list "List/" Monoid<List> Monad<List>]) + [product] + [number "Nat/" Codec<Text,Nat>]) + [compiler #+ with-gensyms] + (macro [ast] + ["s" syntax #+ syntax: Syntax] + (syntax [common])) + [type]) + (.. [promise #+ Monad<Promise>] + [stm #+ Monad<STM>] + [frp])) + +## [Types] +(type: #export (Actor s m) + {#mailbox (stm;Var m) + #kill-signal (promise;Promise Unit) + #obituary (promise;Promise [(Maybe Text) s (List m)])}) + +(type: #export (Proc s m) + {#step (-> (Actor s m) (-> m s (promise;Promise (Error s)))) + #end (-> (Maybe Text) s (promise;Promise Unit))}) + +## [Values] +(def: #export (spawn init [proc on-death]) + {#;doc "Given a procedure and initial state, launches an actor and returns it."} + (All [s m] (-> s (Proc s m) (IO (Actor s m)))) + (io (let [mailbox (stm;var (:! ($ 1) [])) + kill-signal (promise;promise Unit) + obituary (promise;promise [(Maybe Text) ($ 0) (List ($ 1))]) + self {#mailbox mailbox + #kill-signal kill-signal + #obituary obituary} + mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox)) + proc (proc self) + |mailbox| (stm;var mailbox-chan) + _ (:: Monad<Promise> map + (lambda [_] + (io;run (do Monad<IO> + [mb (stm;read! |mailbox|)] + (frp;close mb)))) + kill-signal) + process (loop [state init + messages mailbox-chan] + (do Monad<Promise> + [?messages+ messages] + (case ?messages+ + ## No kill-signal so far, so I may proceed... + (#;Some [message messages']) + (do Monad<Promise> + [#let [_ (io;run (stm;write! messages' |mailbox|))] + ?state' (proc message state)] + (case ?state' + (#;Left error) + (do @ + [#let [_ (io;run (promise;resolve [] kill-signal)) + _ (io;run (frp;close messages')) + death-message (#;Some error)] + _ (on-death death-message state) + remaining-messages (frp;consume messages')] + (wrap [death-message state (#;Cons message remaining-messages)])) + + (#;Right state') + (recur state' messages'))) + + ## Otherwise, clean-up and return current state. + #;None + (do Monad<Promise> + [#let [_ (io;run (frp;close messages)) + death-message #;None] + _ (on-death death-message state)] + (wrap [death-message state (list)])))))] + self))) + +(def: #export poison + {#;doc "Immediately kills the given actor (if it's not already dead)."} + (All [s m] (-> (Actor s m) (io;IO Bool))) + (|>. (get@ #kill-signal) (promise;resolve []))) + +(def: #export (alive? actor) + (All [s m] (-> (Actor s m) Bool)) + (case [(promise;poll (get@ #kill-signal actor)) + (promise;poll (get@ #obituary actor))] + [#;None #;None] + true + + _ + false)) + +(def: #export (send message actor) + (All [s m] (-> m (Actor s m) (promise;Promise Bool))) + (if (alive? actor) + (exec (io;run (stm;write! message (get@ #mailbox actor))) + (:: Monad<Promise> wrap true)) + (:: Monad<Promise> wrap false))) + +(def: #export (keep-alive init proc) + {#;doc "Given initial-state and a procedure, launches and actor that will reboot if it dies of errors. + However, it can still be killed."} + (All [s m] (-> s (Proc s m) (IO (Actor s m)))) + (io (let [ka-actor (: (Actor (Actor ($ 0) ($ 1)) ($ 1)) + (io;run (spawn (io;run (spawn init proc)) + {#step (lambda [*self* message server] + (do Monad<Promise> + [was-sent? (send message server)] + (if was-sent? + (wrap (#;Right server)) + (do @ + [[?cause state unprocessed-messages] (get@ #obituary server)] + (exec (log! (format "ACTOR DIED:\n" (default "" ?cause) "\n RESTARTING")) + (do @ + [#let [new-server (io;run (spawn state proc)) + mailbox (get@ #mailbox new-server)] + _ (promise;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))] + (wrap (#;Right new-server)))) + )))) + #end (lambda [_ server] (exec (io;run (poison server)) + (:: Monad<Promise> wrap [])))})))] + (update@ #obituary (: (-> (promise;Promise [(Maybe Text) (Actor ($ 0) ($ 1)) (List ($ 1))]) + (promise;Promise [(Maybe Text) ($ 0) (List ($ 1))])) + (lambda [process] + (do Monad<Promise> + [[_ server unprocessed-messages-0] process + [cause state unprocessed-messages-1] (get@ #obituary server)] + (wrap [cause state (List/append unprocessed-messages-0 unprocessed-messages-1)])))) + ka-actor)))) + +## [Syntax] +(type: Method + {#name Text + #vars (List Text) + #args (List [Text AST]) + #return AST + #body AST}) + +(def: method^ + (Syntax Method) + (s;form (do s;Monad<Syntax> + [_ (s;symbol! ["" "method:"]) + vars (s;default (list) (s;tuple (s;some s;local-symbol))) + [name args] (s;form ($_ s;seq + s;local-symbol + (s;many common;typed-arg) + )) + return s;any + body s;any] + (wrap {#name name + #vars vars + #args args + #return return + #body body})))) + +(def: stop^ + (Syntax AST) + (s;form (do s;Monad<Syntax> + [_ (s;symbol! ["" "stop:"])] + s;any))) + +(def: actor-decl^ + (Syntax [(List Text) Text (List [Text AST])]) + (s;seq (s;default (list) (s;tuple (s;some s;local-symbol))) + (s;either (s;form (s;seq s;local-symbol (s;many common;typed-arg))) + (s;seq s;local-symbol (:: s;Monad<Syntax> wrap (list)))))) + +(def: (actor-def-decl [_vars _name _args] return-type) + (-> [(List Text) Text (List [Text AST])] AST (List AST)) + (let [decl (` ((~ (ast;symbol ["" (format _name "//new")])) (~@ (List/map (|>. product;left [""] ast;symbol) _args)))) + base-type (` (-> (~@ (List/map product;right _args)) + (~ return-type))) + type (case _vars + #;Nil + base-type + + _ + (` (All [(~@ (List/map (|>. [""] ast;symbol) _vars))] + (~ base-type))))] + (list decl + type))) + +(syntax: #export (actor: {_ex-lev common;export-level} + {(^@ decl [_vars _name _args]) actor-decl^} + state-type + {methods (s;many method^)} + {?stop (s;opt stop^)}) + {#;doc (doc "Allows defining an actor, with a set of methods that can be called on it." + "The methods can return promisehronous outputs." + "The methods can access the actor's state through the *state* variable." + "The methods can also access the actor itself through the *self* variable." + + (actor: #export Adder + Int + + (method: (count! {to-add Int}) + [Int Int] + (if (>= 0 to-add) + (do Monad<Promise> + [#let [new-state (+ to-add *state*)]] + (wrap (#;Right [new-state [*state* new-state]]))) + (do Monad<Promise> + [] + (wrap (#;Left "Can't add negative numbers!"))))) + ))} + (with-gensyms [g!message g!error g!return g!error g!output] + (let [g!state-name (ast;symbol ["" (format _name "//STATE")]) + g!protocol-name (ast;symbol ["" (format _name "//PROTOCOL")]) + g!self (ast;symbol ["" "*self*"]) + g!state (ast;symbol ["" "*state*"]) + g!cause (ast;symbol ["" "*cause*"]) + g!stop-body (default (` (:: promise;Monad<Promise> (~' wrap) [])) ?stop) + protocol (List/map (lambda [(^slots [#name #vars #args #return #body])] + (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (promise;Promise (~ return))))) + methods) + protocol-pm (List/map (: (-> Method [AST AST]) + (lambda [(^slots [#name #vars #args #return #body])] + (let [arg-names (|> (list;size args) (list;range+ +1) (List/map (|>. Nat/encode [""] ast;symbol))) + body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (promise;Promise (Error [(~ g!state-name) (~ return)]))) + (lambda (~ (ast;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] ast;symbol) args))] + (do promise;Monad<Promise> + [] + (~ body)))))] + [(` [[(~@ arg-names)] (~ g!return)]) + (` (do promise;Monad<Promise> + [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))] + (case (~ g!output) + (#;Right [(~ g!state) (~ g!output)]) + (exec (io;run (promise;resolve (~ g!output) (~ g!return))) + ((~' wrap) (#;Right (~ g!state)))) + + (#;Left (~ g!error)) + ((~' wrap) (#;Left (~ g!error)))) + ))]))) + methods) + g!proc (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)] + (case (~ g!message) + (~@ (if (=+ +1 (list;size protocol-pm)) + (List/join (List/map (lambda [[pattern clause]] + (list pattern clause)) + protocol-pm)) + (List/join (List/map (lambda [[method [pattern clause]]] + (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern))) + clause)) + (list;zip2 methods protocol-pm))))) + )) + #end (lambda [(~ g!cause) (~ g!state)] + (do promise;Monad<Promise> + [] + (~ g!stop-body)))}) + g!actor-name (ast;symbol ["" _name]) + g!methods (List/map (: (-> Method AST) + (lambda [(^slots [#name #vars #args #return #body])] + (let [arg-names (|> (list;size args) (list;range+ +1) (List/map (|>. Nat/encode [""] ast;symbol))) + type (` (-> (~@ (List/map product;right args)) + (~ g!actor-name) + (promise;Promise (~ return))))] + (` (def: (~@ (common;gen-export-level _ex-lev)) ((~ (ast;symbol ["" name])) (~@ arg-names) (~ g!self)) + (~ type) + (let [(~ g!output) (promise;promise (~ return))] + (exec (send ((~ (ast;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self)) + (~ g!output)))))))) + methods)] + (wrap (list& (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!state-name) (~ state-type))) + (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!protocol-name) (~@ protocol))) + (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name)))) + (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Proc (~ g!state-name) (~ g!protocol-name))))) + (~ g!proc))) + g!methods)) + ))) |