aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library
diff options
context:
space:
mode:
authorEduardo Julian2022-08-18 21:34:21 -0400
committerEduardo Julian2022-08-18 21:34:21 -0400
commit81b6e0d7038a99c66456033c8285f740a3b0c719 (patch)
tree4d09e45791f19bd40170260502ebc958b07ee1af /stdlib/source/library
parentecda0d219cf7dc25bd9b0b76815a8880e20232c2 (diff)
Added support for Communicating Sequential Processes (CSP).
Diffstat (limited to 'stdlib/source/library')
-rw-r--r--stdlib/source/library/lux/control/concurrency/cps.lux76
1 files changed, 76 insertions, 0 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/cps.lux b/stdlib/source/library/lux/control/concurrency/cps.lux
new file mode 100644
index 000000000..f8cd41a77
--- /dev/null
+++ b/stdlib/source/library/lux/control/concurrency/cps.lux
@@ -0,0 +1,76 @@
+(.require
+ [library
+ [lux (.except try)
+ [abstract
+ [functor (.only Functor)]
+ [monad (.only Monad do)]]
+ [control
+ ["[0]" try (.only Try) (.use "[1]#[0]" monad)]
+ ["[0]" exception (.only Exception)]]]]
+ [//
+ ["[0]" async (.only Async) (.use "[1]#[0]" monad)]
+ ["[0]" frp]])
+
+(type .public (Process a)
+ (Async (Try a)))
+
+(type .public Channel' frp.Channel')
+(type .public Channel frp.Channel)
+(type .public Sink frp.Sink)
+
+(def .public channel
+ (All (_ a) (-> Any [(Channel a) (Sink a)]))
+ frp.channel)
+
+(def .public functor
+ (Functor Process)
+ (implementation
+ (def (each $)
+ (async#each (try#each $)))))
+
+(def .public monad
+ (Monad Process)
+ (implementation
+ (def functor ..functor)
+ (def in (|>> try#in async#in))
+ (def (conjoint atatx)
+ (do async.monad
+ [tatx atatx]
+ (when tatx
+ {try.#Success atx}
+ atx
+
+ {try.#Failure error}
+ (in {try.#Failure error}))))))
+
+(exception.def .public channel_has_been_closed)
+
+(def .public (read it)
+ (All (_ r w)
+ (-> (Channel' r w) (Process [r (Channel' r w)])))
+ (let [[output resolver] (async.async [])]
+ (exec
+ (async.future
+ (async.upon! (function (_ head,tail)
+ (resolver (when head,tail
+ {.#Some [head tail]}
+ {try.#Success [head tail]}
+
+ {.#None}
+ (exception.except ..channel_has_been_closed []))))
+ it))
+ output)))
+
+(def .public (write value sink)
+ (All (_ w)
+ (-> w (Sink w) (Process Any)))
+ (async.future (at sink feed value)))
+
+(def .public (close sink)
+ (All (_ w)
+ (-> (Sink w) (Process Any)))
+ (async.future (at sink close)))
+
+(def .public try
+ (All (_ a) (-> (Process a) (Process (Try a))))
+ (async#each (|>> {try.#Success})))