## Copyright (c) Eduardo Julian. All rights reserved. ## This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. ## If a copy of the MPL was not distributed with this file, ## You can obtain one at http://mozilla.org/MPL/2.0/. (;module: lux (lux (control monad) (codata [io #- run] function) (data error text/format (struct [list "List/" Monoid Monad]) [product] [number "Nat/" Codec]) [compiler #+ with-gensyms] (macro [ast] ["s" syntax #+ syntax: Syntax] (syntax [common])) [type]) (.. [promise #+ Monad] [stm #+ Monad] [frp])) ## [Types] (type: #export (Actor s m) {#mailbox (stm;Var m) #kill-signal (promise;Promise Unit) #obituary (promise;Promise [(Maybe Text) s (List m)])}) (type: #export (Proc s m) {#step (-> (Actor s m) (-> m s (promise;Promise (Error s)))) #end (-> (Maybe Text) s (promise;Promise Unit))}) ## [Values] (def: #export (spawn init [proc on-death]) {#;doc "Given a procedure and initial state, launches an actor and returns it."} (All [s m] (-> s (Proc s m) (IO (Actor s m)))) (io (let [mailbox (stm;var (:! ($ 1) [])) kill-signal (promise;promise Unit) obituary (promise;promise [(Maybe Text) ($ 0) (List ($ 1))]) self {#mailbox mailbox #kill-signal kill-signal #obituary obituary} mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox)) proc (proc self) |mailbox| (stm;var mailbox-chan) _ (:: Monad map (lambda [_] (io;run (do Monad [mb (stm;read! |mailbox|)] (frp;close mb)))) kill-signal) process (loop [state init messages mailbox-chan] (do Monad [?messages+ messages] (case ?messages+ ## No kill-signal so far, so I may proceed... (#;Some [message messages']) (do Monad [#let [_ (io;run (stm;write! messages' |mailbox|))] ?state' (proc message state)] (case ?state' (#;Left error) (do @ [#let [_ (io;run (promise;resolve [] kill-signal)) _ (io;run (frp;close messages')) death-message (#;Some error)] _ (on-death death-message state) remaining-messages (frp;consume messages')] (wrap [death-message state (#;Cons message remaining-messages)])) (#;Right state') (recur state' messages'))) ## Otherwise, clean-up and return current state. #;None (do Monad [#let [_ (io;run (frp;close messages)) death-message #;None] _ (on-death death-message state)] (wrap [death-message state (list)])))))] self))) (def: #export poison {#;doc "Immediately kills the given actor (if it's not already dead)."} (All [s m] (-> (Actor s m) (io;IO Bool))) (|>. (get@ #kill-signal) (promise;resolve []))) (def: #export (alive? actor) (All [s m] (-> (Actor s m) Bool)) (case [(promise;poll (get@ #kill-signal actor)) (promise;poll (get@ #obituary actor))] [#;None #;None] true _ false)) (def: #export (send message actor) (All [s m] (-> m (Actor s m) (promise;Promise Bool))) (if (alive? actor) (exec (io;run (stm;write! message (get@ #mailbox actor))) (:: Monad wrap true)) (:: Monad wrap false))) (def: #export (keep-alive init proc) {#;doc "Given initial-state and a procedure, launches and actor that will reboot if it dies of errors. However, it can still be killed."} (All [s m] (-> s (Proc s m) (IO (Actor s m)))) (io (let [ka-actor (: (Actor (Actor ($ 0) ($ 1)) ($ 1)) (io;run (spawn (io;run (spawn init proc)) {#step (lambda [*self* message server] (do Monad [was-sent? (send message server)] (if was-sent? (wrap (#;Right server)) (do @ [[?cause state unprocessed-messages] (get@ #obituary server)] (exec (log! (format "ACTOR DIED:\n" (default "" ?cause) "\n RESTARTING")) (do @ [#let [new-server (io;run (spawn state proc)) mailbox (get@ #mailbox new-server)] _ (promise;future (mapM io;Monad ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))] (wrap (#;Right new-server)))) )))) #end (lambda [_ server] (exec (io;run (poison server)) (:: Monad wrap [])))})))] (update@ #obituary (: (-> (promise;Promise [(Maybe Text) (Actor ($ 0) ($ 1)) (List ($ 1))]) (promise;Promise [(Maybe Text) ($ 0) (List ($ 1))])) (lambda [process] (do Monad [[_ server unprocessed-messages-0] process [cause state unprocessed-messages-1] (get@ #obituary server)] (wrap [cause state (List/append unprocessed-messages-0 unprocessed-messages-1)])))) ka-actor)))) ## [Syntax] (type: Method {#name Text #vars (List Text) #args (List [Text AST]) #return AST #body AST}) (def: method^ (Syntax Method) (s;form (do s;Monad [_ (s;symbol! ["" "method:"]) vars (s;default (list) (s;tuple (s;some s;local-symbol))) [name args] (s;form ($_ s;seq s;local-symbol (s;many common;typed-arg) )) return s;any body s;any] (wrap {#name name #vars vars #args args #return return #body body})))) (def: stop^ (Syntax AST) (s;form (do s;Monad [_ (s;symbol! ["" "stop:"])] s;any))) (def: actor-decl^ (Syntax [(List Text) Text (List [Text AST])]) (s;seq (s;default (list) (s;tuple (s;some s;local-symbol))) (s;either (s;form (s;seq s;local-symbol (s;many common;typed-arg))) (s;seq s;local-symbol (:: s;Monad wrap (list)))))) (def: (actor-def-decl [_vars _name _args] return-type) (-> [(List Text) Text (List [Text AST])] AST (List AST)) (let [decl (` ((~ (ast;symbol ["" (format _name "//new")])) (~@ (List/map (|>. product;left [""] ast;symbol) _args)))) base-type (` (-> (~@ (List/map product;right _args)) (~ return-type))) type (case _vars #;Nil base-type _ (` (All [(~@ (List/map (|>. [""] ast;symbol) _vars))] (~ base-type))))] (list decl type))) (syntax: #export (actor: {_ex-lev common;export-level} {(^@ decl [_vars _name _args]) actor-decl^} state-type {methods (s;many method^)} {?stop (s;opt stop^)}) {#;doc (doc "Allows defining an actor, with a set of methods that can be called on it." "The methods can return promisehronous outputs." "The methods can access the actor's state through the *state* variable." "The methods can also access the actor itself through the *self* variable." (actor: #export Adder Int (method: (count! {to-add Int}) [Int Int] (if (>= 0 to-add) (do Monad [#let [new-state (+ to-add *state*)]] (wrap (#;Right [new-state [*state* new-state]]))) (do Monad [] (wrap (#;Left "Can't add negative numbers!"))))) ))} (with-gensyms [g!message g!error g!return g!error g!output] (let [g!state-name (ast;symbol ["" (format _name "//STATE")]) g!protocol-name (ast;symbol ["" (format _name "//PROTOCOL")]) g!self (ast;symbol ["" "*self*"]) g!state (ast;symbol ["" "*state*"]) g!cause (ast;symbol ["" "*cause*"]) g!stop-body (default (` (:: promise;Monad (~' wrap) [])) ?stop) protocol (List/map (lambda [(^slots [#name #vars #args #return #body])] (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (promise;Promise (~ return))))) methods) protocol-pm (List/map (: (-> Method [AST AST]) (lambda [(^slots [#name #vars #args #return #body])] (let [arg-names (|> (list;size args) (list;range+ +1) (List/map (|>. Nat/encode [""] ast;symbol))) body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (promise;Promise (Error [(~ g!state-name) (~ return)]))) (lambda (~ (ast;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] ast;symbol) args))] (do promise;Monad [] (~ body)))))] [(` [[(~@ arg-names)] (~ g!return)]) (` (do promise;Monad [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))] (case (~ g!output) (#;Right [(~ g!state) (~ g!output)]) (exec (io;run (promise;resolve (~ g!output) (~ g!return))) ((~' wrap) (#;Right (~ g!state)))) (#;Left (~ g!error)) ((~' wrap) (#;Left (~ g!error)))) ))]))) methods) g!proc (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)] (case (~ g!message) (~@ (if (=+ +1 (list;size protocol-pm)) (List/join (List/map (lambda [[pattern clause]] (list pattern clause)) protocol-pm)) (List/join (List/map (lambda [[method [pattern clause]]] (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern))) clause)) (list;zip2 methods protocol-pm))))) )) #end (lambda [(~ g!cause) (~ g!state)] (do promise;Monad [] (~ g!stop-body)))}) g!actor-name (ast;symbol ["" _name]) g!methods (List/map (: (-> Method AST) (lambda [(^slots [#name #vars #args #return #body])] (let [arg-names (|> (list;size args) (list;range+ +1) (List/map (|>. Nat/encode [""] ast;symbol))) type (` (-> (~@ (List/map product;right args)) (~ g!actor-name) (promise;Promise (~ return))))] (` (def: (~@ (common;gen-export-level _ex-lev)) ((~ (ast;symbol ["" name])) (~@ arg-names) (~ g!self)) (~ type) (let [(~ g!output) (promise;promise (~ return))] (exec (send ((~ (ast;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self)) (~ g!output)))))))) methods)] (wrap (list& (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!state-name) (~ state-type))) (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!protocol-name) (~@ protocol))) (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name)))) (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Proc (~ g!state-name) (~ g!protocol-name))))) (~ g!proc))) g!methods)) )))