From 9671484b6cb3f3c56d6a3053a4a55b4634c14a89 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Mon, 22 Aug 2022 20:45:33 -0400 Subject: Added support for the agent model. --- .../library/lux/control/concurrency/actor.lux | 54 +++++++++++++++++++--- .../library/lux/control/concurrency/agent.lux | 25 ++++++++++ 2 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 stdlib/source/library/lux/control/concurrency/agent.lux (limited to 'stdlib/source/library') diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux index d4c50b896..7b430da17 100644 --- a/stdlib/source/library/lux/control/concurrency/actor.lux +++ b/stdlib/source/library/lux/control/concurrency/actor.lux @@ -7,7 +7,9 @@ ["[0]" pipe] ["[0]" try (.only Try)] ["[0]" exception] - ["[0]" io (.only IO io)]] + ["[0]" io (.only IO io)] + [function + [predicate (.only Predicate)]]] [data ["[0]" bit] ["[0]" product]] @@ -18,7 +20,7 @@ [primitive (.only primitive representation abstraction)]]]]] [// ["[0]" atom (.only Atom atom)] - ["[0]" async (.only Async Resolver)] + ["[0]" async (.only Async Resolver) (.use "[1]#[0]" monad)] ["[0]" frp (.only Channel Channel')]]) (exception.def .public poisoned) @@ -79,10 +81,11 @@ (when ?state' {try.#Failure error} (let [[_ resolve] (the #obituary (representation self))] - (exec (io.run! - (do io.monad - [pending (..pending tail)] - (resolve [error state {.#Item head pending}]))) + (exec + (io.run! + (do io.monad + [pending (..pending tail)] + (resolve [error state {.#Item head pending}]))) (in []))) {try.#Success state'} @@ -170,7 +173,7 @@ (do async.monad [outcome (async.future (..mail! mail actor))] (when outcome - {try.#Success} + {try.#Success _} async {try.#Failure error} @@ -209,3 +212,40 @@ (at ! each try.maybe)) (in {.#None})))) channel))) + +... The following behavior and messages allow Lux's actors to behave like Clojure's agents. +... https://clojure.org/reference/agents + +(exception.def .public invalid) + +(def .public (validated ? it) + (All (_ state) + (-> (Predicate state) (Behavior state) + (Behavior state))) + (function (_ mail before actor) + (do (try.with async.monad) + [after (mail before actor)] + (if (? after) + (in after) + (async#in (exception.except ..invalid [])))))) + +(def .public state + (All (_ state) + (Message state state)) + (function (_ state self) + (async#in {try.#Success [state state]}))) + +(def .public (update $) + (All (_ state) + (-> (-> state state) + (Message state [state state]))) + (function (_ before self) + (let [after ($ before)] + (async#in {try.#Success [after [before after]]})))) + +(def .public (reset after) + (All (_ state) + (-> state + (Message state state))) + (function (_ before self) + (async#in {try.#Success [after before]}))) diff --git a/stdlib/source/library/lux/control/concurrency/agent.lux b/stdlib/source/library/lux/control/concurrency/agent.lux new file mode 100644 index 000000000..bf9608cb7 --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/agent.lux @@ -0,0 +1,25 @@ +(.require + [library + [lux (.except) + [control + ["[0]" try (.only Try)] + ["[0]" io (.only IO)]]]] + [// + ["[0]" async (.only Async)] + ["[0]" actor (.only Actor)] + ["[0]" frp (.only Channel')]]) + +(def .public Agent + Actor) + +(def .public (react! events handler agent) + (All (_ eventR eventW state) + (-> (Channel' eventR eventW) + (-> eventR state (Async (Try state))) + (Agent state) + (IO Any))) + (frp.subscribe! (function (_ event) + (actor.mail! (function (_ state self) + (handler event state)) + agent)) + events)) -- cgit v1.2.3