aboutsummaryrefslogtreecommitdiff
path: root/stdlib
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib')
-rw-r--r--stdlib/source/library/lux/control/concurrency/actor.lux54
-rw-r--r--stdlib/source/library/lux/control/concurrency/agent.lux25
-rw-r--r--stdlib/source/test/lux/control.lux2
-rw-r--r--stdlib/source/test/lux/control/concurrency/actor.lux60
-rw-r--r--stdlib/source/test/lux/control/concurrency/agent.lux49
5 files changed, 183 insertions, 7 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/actor.lux b/stdlib/source/library/lux/control/concurrency/actor.lux
index d4c50b896..7b430da17 100644
--- a/stdlib/source/library/lux/control/concurrency/actor.lux
+++ b/stdlib/source/library/lux/control/concurrency/actor.lux
@@ -7,7 +7,9 @@
["[0]" pipe]
["[0]" try (.only Try)]
["[0]" exception]
- ["[0]" io (.only IO io)]]
+ ["[0]" io (.only IO io)]
+ [function
+ [predicate (.only Predicate)]]]
[data
["[0]" bit]
["[0]" product]]
@@ -18,7 +20,7 @@
[primitive (.only primitive representation abstraction)]]]]]
[//
["[0]" atom (.only Atom atom)]
- ["[0]" async (.only Async Resolver)]
+ ["[0]" async (.only Async Resolver) (.use "[1]#[0]" monad)]
["[0]" frp (.only Channel Channel')]])
(exception.def .public poisoned)
@@ -79,10 +81,11 @@
(when ?state'
{try.#Failure error}
(let [[_ resolve] (the #obituary (representation self))]
- (exec (io.run!
- (do io.monad
- [pending (..pending tail)]
- (resolve [error state {.#Item head pending}])))
+ (exec
+ (io.run!
+ (do io.monad
+ [pending (..pending tail)]
+ (resolve [error state {.#Item head pending}])))
(in [])))
{try.#Success state'}
@@ -170,7 +173,7 @@
(do async.monad
[outcome (async.future (..mail! mail actor))]
(when outcome
- {try.#Success}
+ {try.#Success _}
async
{try.#Failure error}
@@ -209,3 +212,40 @@
(at ! each try.maybe))
(in {.#None}))))
channel)))
+
+... The following behavior and messages allow Lux's actors to behave like Clojure's agents.
+... https://clojure.org/reference/agents
+
+(exception.def .public invalid)
+
+(def .public (validated ? it)
+ (All (_ state)
+ (-> (Predicate state) (Behavior state)
+ (Behavior state)))
+ (function (_ mail before actor)
+ (do (try.with async.monad)
+ [after (mail before actor)]
+ (if (? after)
+ (in after)
+ (async#in (exception.except ..invalid []))))))
+
+(def .public state
+ (All (_ state)
+ (Message state state))
+ (function (_ state self)
+ (async#in {try.#Success [state state]})))
+
+(def .public (update $)
+ (All (_ state)
+ (-> (-> state state)
+ (Message state [state state])))
+ (function (_ before self)
+ (let [after ($ before)]
+ (async#in {try.#Success [after [before after]]}))))
+
+(def .public (reset after)
+ (All (_ state)
+ (-> state
+ (Message state state)))
+ (function (_ before self)
+ (async#in {try.#Success [after before]})))
diff --git a/stdlib/source/library/lux/control/concurrency/agent.lux b/stdlib/source/library/lux/control/concurrency/agent.lux
new file mode 100644
index 000000000..bf9608cb7
--- /dev/null
+++ b/stdlib/source/library/lux/control/concurrency/agent.lux
@@ -0,0 +1,25 @@
+(.require
+ [library
+ [lux (.except)
+ [control
+ ["[0]" try (.only Try)]
+ ["[0]" io (.only IO)]]]]
+ [//
+ ["[0]" async (.only Async)]
+ ["[0]" actor (.only Actor)]
+ ["[0]" frp (.only Channel')]])
+
+(def .public Agent
+ Actor)
+
+(def .public (react! events handler agent)
+ (All (_ eventR eventW state)
+ (-> (Channel' eventR eventW)
+ (-> eventR state (Async (Try state)))
+ (Agent state)
+ (IO Any)))
+ (frp.subscribe! (function (_ event)
+ (actor.mail! (function (_ state self)
+ (handler event state))
+ agent))
+ events))
diff --git a/stdlib/source/test/lux/control.lux b/stdlib/source/test/lux/control.lux
index c51399523..4eba3a2a7 100644
--- a/stdlib/source/test/lux/control.lux
+++ b/stdlib/source/test/lux/control.lux
@@ -7,6 +7,7 @@
["[1][0]" concatenative]
["[1][0]" concurrency
["[1]/[0]" actor]
+ ["[1]/[0]" agent]
["[1]/[0]" atom]
["[1]/[0]" frp]
["[1]/[0]" thread]
@@ -39,6 +40,7 @@
Test
(all _.and
/concurrency/actor.test
+ /concurrency/agent.test
/concurrency/atom.test
/concurrency/frp.test
/concurrency/thread.test
diff --git a/stdlib/source/test/lux/control/concurrency/actor.lux b/stdlib/source/test/lux/control/concurrency/actor.lux
index e8d3568fd..8353b2e8a 100644
--- a/stdlib/source/test/lux/control/concurrency/actor.lux
+++ b/stdlib/source/test/lux/control/concurrency/actor.lux
@@ -200,4 +200,64 @@
actual (async.future (atom.read! sink))]
(unit.coverage [/.Stop /.observe! /.obituary]
(at (list.equivalence n.equivalence) = expected (sequence.list actual))))))
+ (in (do async.monad
+ [actor (async.future (/.spawn! /.default initial_state))
+ actual (/.tell! /.state actor)]
+ (unit.coverage [/.state]
+ (<| (try.else false)
+ (do try.monad
+ [actual actual]
+ (in (same? initial_state actual)))))))
+ (in (do async.monad
+ [actor (async.future (/.spawn! /.default initial_state))
+ before,after (/.tell! (/.update ++) actor)
+ actual (/.tell! /.state actor)]
+ (unit.coverage [/.update]
+ (<| (try.else false)
+ (do try.monad
+ [[before after] before,after
+ actual actual]
+ (in (and (n.= (++ before) after)
+ (same? after actual))))))))
+ (in (do async.monad
+ [actor (async.future (/.spawn! /.default initial_state))
+ before,after (/.tell! (/.update ++) actor)
+ _ (/.tell! (/.reset initial_state) actor)
+ actual (/.tell! /.state actor)]
+ (unit.coverage [/.reset]
+ (<| (try.else false)
+ (do try.monad
+ [[before after] before,after
+ actual actual]
+ (in (and (n.= (++ before) after)
+ (same? initial_state before)
+ (same? initial_state actual))))))))
+ (in (do async.monad
+ [actor (async.future (/.spawn! (/.validated (n.< initial_state) /.default)
+ initial_state))
+ before,after (/.tell! (/.update --) actor)
+ actual (/.tell! /.state actor)]
+ (unit.coverage [/.validated]
+ (<| (try.else false)
+ (do try.monad
+ [[before after] before,after
+ actual actual]
+ (in (and (n.= (-- before) after)
+ (same? after actual))))))))
+ (in (do async.monad
+ [actor (async.future (/.spawn! (/.validated (n.< initial_state) /.default)
+ initial_state))
+ before,after (/.tell! (/.update ++) actor)
+ [cause_of_death state pending] (/.obituary actor)
+ actual (/.tell! /.state actor)]
+ (unit.coverage [/.invalid]
+ (when [before,after actual]
+ [{try.#Success [before after]}
+ {try.#Failure afterwards}]
+ (and (n.= (++ before) after)
+ (exception.match? /.invalid cause_of_death)
+ (exception.match? /.dead afterwards))
+
+ _
+ false))))
))))
diff --git a/stdlib/source/test/lux/control/concurrency/agent.lux b/stdlib/source/test/lux/control/concurrency/agent.lux
new file mode 100644
index 000000000..9627da9f9
--- /dev/null
+++ b/stdlib/source/test/lux/control/concurrency/agent.lux
@@ -0,0 +1,49 @@
+(.require
+ [library
+ [lux (.except)
+ [abstract
+ [monad (.only do)]]
+ [control
+ ["[0]" try]]
+ [math
+ ["[0]" random]
+ [number
+ ["n" nat]]]
+ [test
+ ["_" property (.only Test)]
+ ["[0]" unit]]]]
+ [\\library
+ ["[0]" / (.only)
+ [//
+ ["[0]" atom (.only Atom)]
+ ["[0]" async (.only Async Resolver) (.use "[1]#[0]" monad)]
+ ["[0]" frp]
+ ["[0]" actor]]]])
+
+(def .public test
+ Test
+ (do [! random.monad]
+ [left random.nat
+ right random.nat]
+ (<| (_.covering /._)
+ (_.for [/.Agent])
+ (all _.and
+ (in (do async.monad
+ [agent (async.future
+ (actor.spawn! actor.default 0))
+ _ (async.future
+ (/.react! (frp.sequential 0 (list left right))
+ (function (_ next current)
+ (async#in {try.#Success (n.+ next current)}))
+ agent))
+ _ (async.delay 1)
+ ?state (actor.tell! actor.state agent)]
+ (unit.coverage [/.react!]
+ (when ?state
+ {try.#Success actual}
+ (n.= (n.+ left right)
+ actual)
+
+ failure
+ false))))
+ ))))