(;module: {#;doc "The actor model of concurrency."} lux (lux (control monad ["p" parser] ["ex" exception #+ exception:]) [io #- run "io/" Monad] (data text/format (coll [list "list/" Monoid Monad Fold]) [product]) [meta #+ with-gensyms Monad] (meta [code] ["s" syntax #+ syntax: Syntax] (syntax ["cs" common] (common ["csr" reader] ["csw" writer])) (type opaque)) (lang [type])) (.. ["A" atom] ["P" promise "P/" Monad] ["T" task] [stm #+ Monad] [frp])) (exception: #export Poisoned) (exception: #export Killed) (exception: #export Dead) ## [Types] (with-expansions [ (as-is (-> s (Actor s) (T;Task s))) (as-is [Text s (List )])] (opaque: #export (Actor s) {#;doc "An actor, defined as all the necessities it requires."} {#mailbox (stm;Var ) #kill-switch (P;Promise Unit) #obituary (P;Promise )} (type: #export (Message s) ) (type: #export (Obituary s) ) (type: #export (Behavior s) {#;doc "An actor's behavior when messages are received."} {#handle (-> (Message s) s (Actor s) (T;Task s)) #end (-> Text s (P;Promise Unit))}) (def: #export (spawn behavior init) {#;doc "Given a behavior and initial state, spawns an actor and returns it."} (All [s] (-> (Behavior s) s (IO (Actor s)))) (io (let [[handle end] behavior self (: (Actor ($ +0)) (@opaque {#mailbox (stm;var (:! (Message ($ +0)) [])) #kill-switch (P;promise Unit) #obituary (P;promise (Obituary ($ +0)))})) mailbox-channel (io;run (stm;follow (get@ #mailbox (@repr self)))) |mailbox| (stm;var mailbox-channel) _ (P/map (function [_] (io;run (do Monad [mb (stm;read! |mailbox|)] (frp;close mb)))) (get@ #kill-switch (@repr self))) process (loop [state init messages mailbox-channel] (do P;Monad [?messages+ messages] (case ?messages+ ## No kill-switch so far, so I may proceed... (#;Some [message messages']) (do P;Monad [#let [_ (io;run (stm;write! messages' |mailbox|))] ?state' (handle message state self)] (case ?state' (#;Left error) (do @ [#let [_ (io;run (do Monad [_ (P;resolve [] (get@ #kill-switch (@repr self)))] (frp;close messages')))] _ (end error state) remaining-messages (frp;consume messages')] (wrap [error state (#;Cons message remaining-messages)])) (#;Right state') (recur state' messages'))) ## Otherwise, clean-up and return current state. #;None (do P;Monad [#let [_ (io;run (frp;close messages)) death-message (Killed "")] _ (end death-message state)] (wrap [death-message state (list)])))))] self))) (def: #export (alive? actor) (All [s] (-> (Actor s) Bool)) (case [(P;poll (get@ #kill-switch (@repr actor))) (P;poll (get@ #obituary (@repr actor)))] [#;None #;None] true _ false)) (def: #export (send message actor) {#;doc "Communicate with an actor through message passing."} (All [s] (-> (Message s) (Actor s) (IO Bool))) (if (alive? actor) (do Monad [_ (stm;write! message (get@ #mailbox (@repr actor)))] (wrap true)) (io/wrap false))) (def: #export (kill actor) {#;doc "Immediately kills the given actor (if it is not already dead)."} (All [s] (-> (Actor s) (io;IO Bool))) (if (alive? actor) (|> actor @repr (get@ #kill-switch) (P;resolve [])) (io/wrap false))) )) ## [Values] (def: #export (default-handle message state self) (All [s] (-> (Message s) s (Actor s) (T;Task s))) (message state self)) (def: #export (default-end cause state) (All [s] (-> Text s (P;Promise Unit))) (P/wrap [])) (def: #export default-behavior (All [s] (Behavior s)) {#handle default-handle #end default-end}) (def: #export (poison actor) {#;doc "Kills the actor by sending a message that will kill it upon processing, but allows the actor to handle previous messages."} (All [s] (-> (Actor s) (IO Bool))) (send (function [state self] (T;throw Poisoned "")) actor)) ## [Syntax] (do-template [ ] [(def: #hidden ( name) (-> Ident cs;Annotations cs;Annotations) (|>. (#;Cons [(ident-for ) (code;tag name)]))) (def: #hidden ( name) (-> Ident (Meta Ident)) (do Monad [name (meta;normalize name) [_ annotations _] (meta;find-def name)] (case (meta;get-tag-ann (ident-for ) annotations) (#;Some actor-name) (wrap actor-name) _ (meta;fail (format "Definition is not " ".")))))] [with-actor resolve-actor #;;actor "an actor"] [with-message resolve-message #;;message "a message"] ) (def: actor-decl^ (Syntax [Text (List Text)]) (p;either (s;form (p;seq s;local-symbol (p;some s;local-symbol))) (p;seq s;local-symbol (:: p;Monad wrap (list))))) (do-template [ ] [(def: #hidden (-> Text Text) (|>. (format "@")))] [state-name "State"] [behavior-name "Behavior"] [new-name "new"] ) (type: HandleC [[Text Text Text] Code]) (type: StopC [[Text Text] Code]) (type: BehaviorC [(Maybe HandleC) (Maybe StopC)]) (def: behavior^ (s;Syntax BehaviorC) (let [handle-args ($_ p;seq s;local-symbol s;local-symbol s;local-symbol) stop-args ($_ p;seq s;local-symbol s;local-symbol)] (p;seq (p;maybe (s;form (p;seq (s;form (p;after (s;this (' handle)) handle-args)) s;any))) (p;maybe (s;form (p;seq (s;form (p;after (s;this (' stop)) stop-args)) s;any)))))) (syntax: #export (actor: [export csr;export] [[_name _vars] actor-decl^] [annotations (p;default cs;empty-annotations csr;annotations)] state-type [[?handle ?stop] behavior^]) {#;doc (doc "Defines an actor, with its behavior and internal state." (actor: #export Counter Nat ((stop cause state) (:: P;Monad wrap (log! (if (ex;match? ;;Killed cause) (format "Counter was killed: " (%n state)) cause))))) (actor: #export (Stack a) (List a) ((handle message state self) (do T;Monad [#let [_ (log! "BEFORE")] output (message state self) #let [_ (log! "AFTER")]] (wrap output)))))} (with-gensyms [g!message g!self g!state g!init g!error g!return g!output] (do @ [module meta;current-module-name #let [g!type (code;local-symbol (state-name _name)) g!behavior (code;local-symbol (behavior-name _name)) g!actor (code;local-symbol _name) g!new (code;local-symbol (new-name _name)) g!vars (list/map code;local-symbol _vars)]] (wrap (list (` (type: (~@ (csw;export export)) ((~ g!type) (~@ g!vars)) (~ state-type))) (` (type: (~@ (csw;export export)) ((~ g!actor) (~@ g!vars)) (~ (|> annotations (with-actor [module _name]) csw;annotations)) (;;Actor ((~ g!type) (~@ g!vars))))) (` (def: (~@ (csw;export export)) (~ g!behavior) (All [(~@ g!vars)] (;;Behavior ((~ g!type) (~@ g!vars)))) {#;;handle (~ (case ?handle #;None (` ;;default-handle) (#;Some [[messageN stateN selfN] bodyC]) (` (function [(~ (code;local-symbol messageN)) (~ (code;local-symbol stateN)) (~ (code;local-symbol selfN))] (do T;Monad [] (~ bodyC)))))) #;;end (~ (case ?stop #;None (` ;;default-end) (#;Some [[causeN stateN] bodyC]) (` (function [(~ (code;local-symbol causeN)) (~ (code;local-symbol stateN))] (do P;Monad [] (~ bodyC))))))})) (` (def: (~@ (csw;export export)) ((~ g!new) (~ g!init)) (All [(~@ g!vars)] (-> ((~ g!type) (~@ g!vars)) (io;IO ((~ g!actor) (~@ g!vars))))) (;;spawn (~ g!behavior) (~ g!init)))))) ))) (type: Signature {#vars (List Text) #name Text #inputs (List [Text Code]) #state Text #self Text #output Code}) (def: signature^ (s;Syntax Signature) (s;form ($_ p;seq (p;default (list) (s;tuple (p;some s;local-symbol))) s;local-symbol (p;some csr;typed-input) s;local-symbol s;local-symbol s;any))) (def: reference^ (s;Syntax [Ident (List Text)]) (p;either (s;form (p;seq s;symbol (p;some s;local-symbol))) (p;seq s;symbol (:: p;Monad wrap (list))))) (syntax: #export (message: [export csr;export] [[actor-name actor-vars] reference^] [signature signature^] [annotations (p;default cs;empty-annotations csr;annotations)] body) {#;doc (doc "A message can access the actor's state through the state parameter." "A message can also access the actor itself through the self parameter." "A message's output must be a task containing a 2-tuple with the updated state and a return value." "A message may succeed or fail (in case of failure, the actor dies)." (message: #export Counter (count! [increment Nat] state self Nat) (let [state' (n.+ increment state)] (T;return [state' state']))) (message: #export (Stack a) (push [value a] state self (List a)) (let [state' (#;Cons value state)] (T;return [state' state']))))} (with-gensyms [g!return g!error g!task g!sent?] (do @ [actor-name (resolve-actor actor-name) #let [g!type (code;symbol (product;both id state-name actor-name)) g!message (code;local-symbol (get@ #name signature)) g!actor-vars (list/map code;local-symbol actor-vars) g!actor (` ((~ (code;symbol actor-name)) (~@ g!actor-vars))) g!all-vars (|> (get@ #vars signature) (list/map code;local-symbol) (list/compose g!actor-vars)) g!inputsC (|> (get@ #inputs signature) (list/map (|>. product;left code;local-symbol))) g!inputsT (|> (get@ #inputs signature) (list/map product;right)) g!state (|> signature (get@ #state) code;local-symbol) g!self (|> signature (get@ #self) code;local-symbol) g!actor-refs (: (List Code) (if (list;empty? actor-vars) (list) (|> actor-vars list;size n.dec (list;n.range +0) (list/map (|>. code;nat (~) ($) (`)))))) ref-replacements (|> (if (list;empty? actor-vars) (list) (|> actor-vars list;size n.dec (list;n.range +0) (list/map (|>. code;nat (~) ($) (`))))) (: (List Code)) (list;zip2 g!all-vars) (: (List [Code Code]))) g!outputT (list/fold (function [[g!var g!ref] outputT] (code;replace g!var g!ref outputT)) (get@ #output signature) ref-replacements)]] (wrap (list (` (def: (~@ (csw;export export)) ((~ g!message) (~@ g!inputsC) (~ g!self)) (~ (|> annotations (with-message actor-name) csw;annotations)) (All [(~@ g!all-vars)] (-> (~@ g!inputsT) (~ g!actor) (T;Task (~ (get@ #output signature))))) (let [(~ g!task) (T;task (~ g!outputT))] (io;run (do io;Monad [(~ g!sent?) (;;send (function [(~ g!state) (~ g!self)] (do P;Monad [(~ g!return) (: (T;Task [((~ g!type) (~@ g!actor-refs)) (~ g!outputT)]) (do T;Monad [] (~ body)))] (case (~ g!return) (#;Right [(~ g!state) (~ g!return)]) (exec (io;run (P;resolve (#;Right (~ g!return)) (~ g!task))) (T;return (~ g!state))) (#;Left (~ g!error)) (exec (io;run (P;resolve (#;Left (~ g!error)) (~ g!task))) (T;fail (~ g!error)))) )) (~ g!self))] (if (~ g!sent?) ((~' wrap) (~ g!task)) ((~' wrap) (T;throw ;;Dead "")))))))) )) )))