aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/actor.lux
diff options
context:
space:
mode:
authorEduardo Julian2016-12-01 11:00:44 -0400
committerEduardo Julian2016-12-01 11:00:44 -0400
commit7f66c54f4c9753b94dbf46ec50b8b16549daf324 (patch)
tree1b5b896cfba870a66a99a03315b09df842eb5737 /stdlib/source/lux/concurrency/actor.lux
parent9c30546af022f8fe36b73e7e93414257ff28ee75 (diff)
- Collected the Lux compiler's repo, the Standard Library's, the Leiningen plugin's and the Emacs mode's into a big monorepo, to keep development unified.
Diffstat (limited to 'stdlib/source/lux/concurrency/actor.lux')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux278
1 files changed, 278 insertions, 0 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
new file mode 100644
index 000000000..1eb3cee21
--- /dev/null
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -0,0 +1,278 @@
+## Copyright (c) Eduardo Julian. All rights reserved.
+## This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
+## If a copy of the MPL was not distributed with this file,
+## You can obtain one at http://mozilla.org/MPL/2.0/.
+
+(;module:
+ lux
+ (lux (control monad)
+ (codata [io #- run]
+ function)
+ (data error
+ text/format
+ (struct [list "List/" Monoid<List> Monad<List>])
+ [product]
+ [number "Nat/" Codec<Text,Nat>])
+ [compiler #+ with-gensyms]
+ (macro [ast]
+ ["s" syntax #+ syntax: Syntax]
+ (syntax [common]))
+ [type])
+ (.. [promise #+ Monad<Promise>]
+ [stm #+ Monad<STM>]
+ [frp]))
+
+## [Types]
+(type: #export (Actor s m)
+ {#mailbox (stm;Var m)
+ #kill-signal (promise;Promise Unit)
+ #obituary (promise;Promise [(Maybe Text) s (List m)])})
+
+(type: #export (Proc s m)
+ {#step (-> (Actor s m) (-> m s (promise;Promise (Error s))))
+ #end (-> (Maybe Text) s (promise;Promise Unit))})
+
+## [Values]
+(def: #export (spawn init [proc on-death])
+ {#;doc "Given a procedure and initial state, launches an actor and returns it."}
+ (All [s m] (-> s (Proc s m) (IO (Actor s m))))
+ (io (let [mailbox (stm;var (:! ($ 1) []))
+ kill-signal (promise;promise Unit)
+ obituary (promise;promise [(Maybe Text) ($ 0) (List ($ 1))])
+ self {#mailbox mailbox
+ #kill-signal kill-signal
+ #obituary obituary}
+ mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox))
+ proc (proc self)
+ |mailbox| (stm;var mailbox-chan)
+ _ (:: Monad<Promise> map
+ (lambda [_]
+ (io;run (do Monad<IO>
+ [mb (stm;read! |mailbox|)]
+ (frp;close mb))))
+ kill-signal)
+ process (loop [state init
+ messages mailbox-chan]
+ (do Monad<Promise>
+ [?messages+ messages]
+ (case ?messages+
+ ## No kill-signal so far, so I may proceed...
+ (#;Some [message messages'])
+ (do Monad<Promise>
+ [#let [_ (io;run (stm;write! messages' |mailbox|))]
+ ?state' (proc message state)]
+ (case ?state'
+ (#;Left error)
+ (do @
+ [#let [_ (io;run (promise;resolve [] kill-signal))
+ _ (io;run (frp;close messages'))
+ death-message (#;Some error)]
+ _ (on-death death-message state)
+ remaining-messages (frp;consume messages')]
+ (wrap [death-message state (#;Cons message remaining-messages)]))
+
+ (#;Right state')
+ (recur state' messages')))
+
+ ## Otherwise, clean-up and return current state.
+ #;None
+ (do Monad<Promise>
+ [#let [_ (io;run (frp;close messages))
+ death-message #;None]
+ _ (on-death death-message state)]
+ (wrap [death-message state (list)])))))]
+ self)))
+
+(def: #export poison
+ {#;doc "Immediately kills the given actor (if it's not already dead)."}
+ (All [s m] (-> (Actor s m) (io;IO Bool)))
+ (|>. (get@ #kill-signal) (promise;resolve [])))
+
+(def: #export (alive? actor)
+ (All [s m] (-> (Actor s m) Bool))
+ (case [(promise;poll (get@ #kill-signal actor))
+ (promise;poll (get@ #obituary actor))]
+ [#;None #;None]
+ true
+
+ _
+ false))
+
+(def: #export (send message actor)
+ (All [s m] (-> m (Actor s m) (promise;Promise Bool)))
+ (if (alive? actor)
+ (exec (io;run (stm;write! message (get@ #mailbox actor)))
+ (:: Monad<Promise> wrap true))
+ (:: Monad<Promise> wrap false)))
+
+(def: #export (keep-alive init proc)
+ {#;doc "Given initial-state and a procedure, launches and actor that will reboot if it dies of errors.
+ However, it can still be killed."}
+ (All [s m] (-> s (Proc s m) (IO (Actor s m))))
+ (io (let [ka-actor (: (Actor (Actor ($ 0) ($ 1)) ($ 1))
+ (io;run (spawn (io;run (spawn init proc))
+ {#step (lambda [*self* message server]
+ (do Monad<Promise>
+ [was-sent? (send message server)]
+ (if was-sent?
+ (wrap (#;Right server))
+ (do @
+ [[?cause state unprocessed-messages] (get@ #obituary server)]
+ (exec (log! (format "ACTOR DIED:\n" (default "" ?cause) "\n RESTARTING"))
+ (do @
+ [#let [new-server (io;run (spawn state proc))
+ mailbox (get@ #mailbox new-server)]
+ _ (promise;future (mapM io;Monad<IO> ((flip stm;write!) mailbox) (#;Cons message unprocessed-messages)))]
+ (wrap (#;Right new-server))))
+ ))))
+ #end (lambda [_ server] (exec (io;run (poison server))
+ (:: Monad<Promise> wrap [])))})))]
+ (update@ #obituary (: (-> (promise;Promise [(Maybe Text) (Actor ($ 0) ($ 1)) (List ($ 1))])
+ (promise;Promise [(Maybe Text) ($ 0) (List ($ 1))]))
+ (lambda [process]
+ (do Monad<Promise>
+ [[_ server unprocessed-messages-0] process
+ [cause state unprocessed-messages-1] (get@ #obituary server)]
+ (wrap [cause state (List/append unprocessed-messages-0 unprocessed-messages-1)]))))
+ ka-actor))))
+
+## [Syntax]
+(type: Method
+ {#name Text
+ #vars (List Text)
+ #args (List [Text AST])
+ #return AST
+ #body AST})
+
+(def: method^
+ (Syntax Method)
+ (s;form (do s;Monad<Syntax>
+ [_ (s;symbol! ["" "method:"])
+ vars (s;default (list) (s;tuple (s;some s;local-symbol)))
+ [name args] (s;form ($_ s;seq
+ s;local-symbol
+ (s;many common;typed-arg)
+ ))
+ return s;any
+ body s;any]
+ (wrap {#name name
+ #vars vars
+ #args args
+ #return return
+ #body body}))))
+
+(def: stop^
+ (Syntax AST)
+ (s;form (do s;Monad<Syntax>
+ [_ (s;symbol! ["" "stop:"])]
+ s;any)))
+
+(def: actor-decl^
+ (Syntax [(List Text) Text (List [Text AST])])
+ (s;seq (s;default (list) (s;tuple (s;some s;local-symbol)))
+ (s;either (s;form (s;seq s;local-symbol (s;many common;typed-arg)))
+ (s;seq s;local-symbol (:: s;Monad<Syntax> wrap (list))))))
+
+(def: (actor-def-decl [_vars _name _args] return-type)
+ (-> [(List Text) Text (List [Text AST])] AST (List AST))
+ (let [decl (` ((~ (ast;symbol ["" (format _name "//new")])) (~@ (List/map (|>. product;left [""] ast;symbol) _args))))
+ base-type (` (-> (~@ (List/map product;right _args))
+ (~ return-type)))
+ type (case _vars
+ #;Nil
+ base-type
+
+ _
+ (` (All [(~@ (List/map (|>. [""] ast;symbol) _vars))]
+ (~ base-type))))]
+ (list decl
+ type)))
+
+(syntax: #export (actor: {_ex-lev common;export-level}
+ {(^@ decl [_vars _name _args]) actor-decl^}
+ state-type
+ {methods (s;many method^)}
+ {?stop (s;opt stop^)})
+ {#;doc (doc "Allows defining an actor, with a set of methods that can be called on it."
+ "The methods can return promisehronous outputs."
+ "The methods can access the actor's state through the *state* variable."
+ "The methods can also access the actor itself through the *self* variable."
+
+ (actor: #export Adder
+ Int
+
+ (method: (count! {to-add Int})
+ [Int Int]
+ (if (>= 0 to-add)
+ (do Monad<Promise>
+ [#let [new-state (+ to-add *state*)]]
+ (wrap (#;Right [new-state [*state* new-state]])))
+ (do Monad<Promise>
+ []
+ (wrap (#;Left "Can't add negative numbers!")))))
+ ))}
+ (with-gensyms [g!message g!error g!return g!error g!output]
+ (let [g!state-name (ast;symbol ["" (format _name "//STATE")])
+ g!protocol-name (ast;symbol ["" (format _name "//PROTOCOL")])
+ g!self (ast;symbol ["" "*self*"])
+ g!state (ast;symbol ["" "*state*"])
+ g!cause (ast;symbol ["" "*cause*"])
+ g!stop-body (default (` (:: promise;Monad<Promise> (~' wrap) [])) ?stop)
+ protocol (List/map (lambda [(^slots [#name #vars #args #return #body])]
+ (` ((~ (ast;tag ["" name])) [(~@ (List/map product;right args))] (promise;Promise (~ return)))))
+ methods)
+ protocol-pm (List/map (: (-> Method [AST AST])
+ (lambda [(^slots [#name #vars #args #return #body])]
+ (let [arg-names (|> (list;size args) (list;range+ +1) (List/map (|>. Nat/encode [""] ast;symbol)))
+ body-func (` (: (-> (~ g!state-name) (~@ (List/map product;right args)) (promise;Promise (Error [(~ g!state-name) (~ return)])))
+ (lambda (~ (ast;symbol ["" _name])) [(~ g!state) (~@ (List/map (|>. product;left [""] ast;symbol) args))]
+ (do promise;Monad<Promise>
+ []
+ (~ body)))))]
+ [(` [[(~@ arg-names)] (~ g!return)])
+ (` (do promise;Monad<Promise>
+ [(~ g!output) ((~ body-func) (~ g!state) (~@ arg-names))]
+ (case (~ g!output)
+ (#;Right [(~ g!state) (~ g!output)])
+ (exec (io;run (promise;resolve (~ g!output) (~ g!return)))
+ ((~' wrap) (#;Right (~ g!state))))
+
+ (#;Left (~ g!error))
+ ((~' wrap) (#;Left (~ g!error))))
+ ))])))
+ methods)
+ g!proc (` {#step (lambda [(~ g!self) (~ g!message) (~ g!state)]
+ (case (~ g!message)
+ (~@ (if (=+ +1 (list;size protocol-pm))
+ (List/join (List/map (lambda [[pattern clause]]
+ (list pattern clause))
+ protocol-pm))
+ (List/join (List/map (lambda [[method [pattern clause]]]
+ (list (` ((~ (ast;tag ["" (get@ #name method)])) (~ pattern)))
+ clause))
+ (list;zip2 methods protocol-pm)))))
+ ))
+ #end (lambda [(~ g!cause) (~ g!state)]
+ (do promise;Monad<Promise>
+ []
+ (~ g!stop-body)))})
+ g!actor-name (ast;symbol ["" _name])
+ g!methods (List/map (: (-> Method AST)
+ (lambda [(^slots [#name #vars #args #return #body])]
+ (let [arg-names (|> (list;size args) (list;range+ +1) (List/map (|>. Nat/encode [""] ast;symbol)))
+ type (` (-> (~@ (List/map product;right args))
+ (~ g!actor-name)
+ (promise;Promise (~ return))))]
+ (` (def: (~@ (common;gen-export-level _ex-lev)) ((~ (ast;symbol ["" name])) (~@ arg-names) (~ g!self))
+ (~ type)
+ (let [(~ g!output) (promise;promise (~ return))]
+ (exec (send ((~ (ast;tag ["" name])) [[(~@ arg-names)] (~ g!output)]) (~ g!self))
+ (~ g!output))))))))
+ methods)]
+ (wrap (list& (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!state-name) (~ state-type)))
+ (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!protocol-name) (~@ protocol)))
+ (` (type: (~@ (common;gen-export-level _ex-lev)) (~ g!actor-name) (Actor (~ g!state-name) (~ g!protocol-name))))
+ (` (def: (~@ (common;gen-export-level _ex-lev)) (~@ (actor-def-decl decl (` (Proc (~ g!state-name) (~ g!protocol-name)))))
+ (~ g!proc)))
+ g!methods))
+ )))