diff options
author | Eduardo Julian | 2022-08-18 21:34:21 -0400 |
---|---|---|
committer | Eduardo Julian | 2022-08-18 21:34:21 -0400 |
commit | 81b6e0d7038a99c66456033c8285f740a3b0c719 (patch) | |
tree | 4d09e45791f19bd40170260502ebc958b07ee1af /stdlib/source/library | |
parent | ecda0d219cf7dc25bd9b0b76815a8880e20232c2 (diff) |
Added support for Communicating Sequential Processes (CSP).
Diffstat (limited to 'stdlib/source/library')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/cps.lux | 76 |
1 files changed, 76 insertions, 0 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/cps.lux b/stdlib/source/library/lux/control/concurrency/cps.lux new file mode 100644 index 000000000..f8cd41a77 --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/cps.lux @@ -0,0 +1,76 @@ +(.require + [library + [lux (.except try) + [abstract + [functor (.only Functor)] + [monad (.only Monad do)]] + [control + ["[0]" try (.only Try) (.use "[1]#[0]" monad)] + ["[0]" exception (.only Exception)]]]] + [// + ["[0]" async (.only Async) (.use "[1]#[0]" monad)] + ["[0]" frp]]) + +(type .public (Process a) + (Async (Try a))) + +(type .public Channel' frp.Channel') +(type .public Channel frp.Channel) +(type .public Sink frp.Sink) + +(def .public channel + (All (_ a) (-> Any [(Channel a) (Sink a)])) + frp.channel) + +(def .public functor + (Functor Process) + (implementation + (def (each $) + (async#each (try#each $))))) + +(def .public monad + (Monad Process) + (implementation + (def functor ..functor) + (def in (|>> try#in async#in)) + (def (conjoint atatx) + (do async.monad + [tatx atatx] + (when tatx + {try.#Success atx} + atx + + {try.#Failure error} + (in {try.#Failure error})))))) + +(exception.def .public channel_has_been_closed) + +(def .public (read it) + (All (_ r w) + (-> (Channel' r w) (Process [r (Channel' r w)]))) + (let [[output resolver] (async.async [])] + (exec + (async.future + (async.upon! (function (_ head,tail) + (resolver (when head,tail + {.#Some [head tail]} + {try.#Success [head tail]} + + {.#None} + (exception.except ..channel_has_been_closed [])))) + it)) + output))) + +(def .public (write value sink) + (All (_ w) + (-> w (Sink w) (Process Any))) + (async.future (at sink feed value))) + +(def .public (close sink) + (All (_ w) + (-> (Sink w) (Process Any))) + (async.future (at sink close))) + +(def .public try + (All (_ a) (-> (Process a) (Process (Try a)))) + (async#each (|>> {try.#Success}))) |