From 9671484b6cb3f3c56d6a3053a4a55b4634c14a89 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Mon, 22 Aug 2022 20:45:33 -0400 Subject: Added support for the agent model. --- .../library/lux/control/concurrency/actor.lux | 54 ++++++++++++++++--- .../library/lux/control/concurrency/agent.lux | 25 +++++++++ stdlib/source/test/lux/control.lux | 2 + .../source/test/lux/control/concurrency/actor.lux | 60 ++++++++++++++++++++++ .../source/test/lux/control/concurrency/agent.lux | 49 ++++++++++++++++++ 5 files changed, 183 insertions(+), 7 deletions(-) create mode 100644 stdlib/source/library/lux/control/concurrency/agent.lux create mode 100644 stdlib/source/test/lux/control/concurrency/agent.lux (limited to 'stdlib') diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux index d4c50b896..7b430da17 100644 --- a/stdlib/source/library/lux/control/concurrency/actor.lux +++ b/stdlib/source/library/lux/control/concurrency/actor.lux @@ -7,7 +7,9 @@ ["[0]" pipe] ["[0]" try (.only Try)] ["[0]" exception] - ["[0]" io (.only IO io)]] + ["[0]" io (.only IO io)] + [function + [predicate (.only Predicate)]]] [data ["[0]" bit] ["[0]" product]] @@ -18,7 +20,7 @@ [primitive (.only primitive representation abstraction)]]]]] [// ["[0]" atom (.only Atom atom)] - ["[0]" async (.only Async Resolver)] + ["[0]" async (.only Async Resolver) (.use "[1]#[0]" monad)] ["[0]" frp (.only Channel Channel')]]) (exception.def .public poisoned) @@ -79,10 +81,11 @@ (when ?state' {try.#Failure error} (let [[_ resolve] (the #obituary (representation self))] - (exec (io.run! - (do io.monad - [pending (..pending tail)] - (resolve [error state {.#Item head pending}]))) + (exec + (io.run! + (do io.monad + [pending (..pending tail)] + (resolve [error state {.#Item head pending}]))) (in []))) {try.#Success state'} @@ -170,7 +173,7 @@ (do async.monad [outcome (async.future (..mail! mail actor))] (when outcome - {try.#Success} + {try.#Success _} async {try.#Failure error} @@ -209,3 +212,40 @@ (at ! each try.maybe)) (in {.#None})))) channel))) + +... The following behavior and messages allow Lux's actors to behave like Clojure's agents. +... https://clojure.org/reference/agents + +(exception.def .public invalid) + +(def .public (validated ? it) + (All (_ state) + (-> (Predicate state) (Behavior state) + (Behavior state))) + (function (_ mail before actor) + (do (try.with async.monad) + [after (mail before actor)] + (if (? after) + (in after) + (async#in (exception.except ..invalid [])))))) + +(def .public state + (All (_ state) + (Message state state)) + (function (_ state self) + (async#in {try.#Success [state state]}))) + +(def .public (update $) + (All (_ state) + (-> (-> state state) + (Message state [state state]))) + (function (_ before self) + (let [after ($ before)] + (async#in {try.#Success [after [before after]]})))) + +(def .public (reset after) + (All (_ state) + (-> state + (Message state state))) + (function (_ before self) + (async#in {try.#Success [after before]}))) diff --git a/stdlib/source/library/lux/control/concurrency/agent.lux b/stdlib/source/library/lux/control/concurrency/agent.lux new file mode 100644 index 000000000..bf9608cb7 --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/agent.lux @@ -0,0 +1,25 @@ +(.require + [library + [lux (.except) + [control + ["[0]" try (.only Try)] + ["[0]" io (.only IO)]]]] + [// + ["[0]" async (.only Async)] + ["[0]" actor (.only Actor)] + ["[0]" frp (.only Channel')]]) + +(def .public Agent + Actor) + +(def .public (react! events handler agent) + (All (_ eventR eventW state) + (-> (Channel' eventR eventW) + (-> eventR state (Async (Try state))) + (Agent state) + (IO Any))) + (frp.subscribe! (function (_ event) + (actor.mail! (function (_ state self) + (handler event state)) + agent)) + events)) diff --git a/stdlib/source/test/lux/control.lux b/stdlib/source/test/lux/control.lux index c51399523..4eba3a2a7 100644 --- a/stdlib/source/test/lux/control.lux +++ b/stdlib/source/test/lux/control.lux @@ -7,6 +7,7 @@ ["[1][0]" concatenative] ["[1][0]" concurrency ["[1]/[0]" actor] + ["[1]/[0]" agent] ["[1]/[0]" atom] ["[1]/[0]" frp] ["[1]/[0]" thread] @@ -39,6 +40,7 @@ Test (all _.and /concurrency/actor.test + /concurrency/agent.test /concurrency/atom.test /concurrency/frp.test /concurrency/thread.test diff --git a/stdlib/source/test/lux/control/concurrency/actor.lux b/stdlib/source/test/lux/control/concurrency/actor.lux index e8d3568fd..8353b2e8a 100644 --- a/stdlib/source/test/lux/control/concurrency/actor.lux +++ b/stdlib/source/test/lux/control/concurrency/actor.lux @@ -200,4 +200,64 @@ actual (async.future (atom.read! sink))] (unit.coverage [/.Stop /.observe! /.obituary] (at (list.equivalence n.equivalence) = expected (sequence.list actual)))))) + (in (do async.monad + [actor (async.future (/.spawn! /.default initial_state)) + actual (/.tell! /.state actor)] + (unit.coverage [/.state] + (<| (try.else false) + (do try.monad + [actual actual] + (in (same? initial_state actual))))))) + (in (do async.monad + [actor (async.future (/.spawn! /.default initial_state)) + before,after (/.tell! (/.update ++) actor) + actual (/.tell! /.state actor)] + (unit.coverage [/.update] + (<| (try.else false) + (do try.monad + [[before after] before,after + actual actual] + (in (and (n.= (++ before) after) + (same? after actual)))))))) + (in (do async.monad + [actor (async.future (/.spawn! /.default initial_state)) + before,after (/.tell! (/.update ++) actor) + _ (/.tell! (/.reset initial_state) actor) + actual (/.tell! /.state actor)] + (unit.coverage [/.reset] + (<| (try.else false) + (do try.monad + [[before after] before,after + actual actual] + (in (and (n.= (++ before) after) + (same? initial_state before) + (same? initial_state actual)))))))) + (in (do async.monad + [actor (async.future (/.spawn! (/.validated (n.< initial_state) /.default) + initial_state)) + before,after (/.tell! (/.update --) actor) + actual (/.tell! /.state actor)] + (unit.coverage [/.validated] + (<| (try.else false) + (do try.monad + [[before after] before,after + actual actual] + (in (and (n.= (-- before) after) + (same? after actual)))))))) + (in (do async.monad + [actor (async.future (/.spawn! (/.validated (n.< initial_state) /.default) + initial_state)) + before,after (/.tell! (/.update ++) actor) + [cause_of_death state pending] (/.obituary actor) + actual (/.tell! /.state actor)] + (unit.coverage [/.invalid] + (when [before,after actual] + [{try.#Success [before after]} + {try.#Failure afterwards}] + (and (n.= (++ before) after) + (exception.match? /.invalid cause_of_death) + (exception.match? /.dead afterwards)) + + _ + false)))) )))) diff --git a/stdlib/source/test/lux/control/concurrency/agent.lux b/stdlib/source/test/lux/control/concurrency/agent.lux new file mode 100644 index 000000000..9627da9f9 --- /dev/null +++ b/stdlib/source/test/lux/control/concurrency/agent.lux @@ -0,0 +1,49 @@ +(.require + [library + [lux (.except) + [abstract + [monad (.only do)]] + [control + ["[0]" try]] + [math + ["[0]" random] + [number + ["n" nat]]] + [test + ["_" property (.only Test)] + ["[0]" unit]]]] + [\\library + ["[0]" / (.only) + [// + ["[0]" atom (.only Atom)] + ["[0]" async (.only Async Resolver) (.use "[1]#[0]" monad)] + ["[0]" frp] + ["[0]" actor]]]]) + +(def .public test + Test + (do [! random.monad] + [left random.nat + right random.nat] + (<| (_.covering /._) + (_.for [/.Agent]) + (all _.and + (in (do async.monad + [agent (async.future + (actor.spawn! actor.default 0)) + _ (async.future + (/.react! (frp.sequential 0 (list left right)) + (function (_ next current) + (async#in {try.#Success (n.+ next current)})) + agent)) + _ (async.delay 1) + ?state (actor.tell! actor.state agent)] + (unit.coverage [/.react!] + (when ?state + {try.#Success actual} + (n.= (n.+ left right) + actual) + + failure + false)))) + )))) -- cgit v1.2.3