From 156fac89df89669ee660bd075f516dd8d57abd19 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Thu, 25 Aug 2022 16:13:32 -0400 Subject: Added support for structured concurrency. --- .../library/lux/control/concurrency/structured.lux | 337 +++++++++++++++++++++ .../source/library/lux/meta/compiler/version.lux | 5 +- 2 files changed, 340 insertions(+), 2 deletions(-) create mode 100644 stdlib/source/library/lux/control/concurrency/structured.lux (limited to 'stdlib/source/library') diff --git a/stdlib/source/library/lux/control/concurrency/structured.lux b/stdlib/source/library/lux/control/concurrency/structured.lux new file mode 100644 index 000000000..8c0aab11b --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/structured.lux @@ -0,0 +1,337 @@ +(.require + [library + [lux (.except Scope or and) + [abstract + [functor (.only Functor)] + ["[0]" monad (.only Monad do)]] + [control + ["[0]" maybe (.use "[1]#[0]" monad)] + ["[0]" io (.only IO) (.use "[1]#[0]" monad)]] + [data + [collection + ["[0]" dictionary (.only Dictionary)]]] + [math + [number + ["[0]" nat]]] + [meta + [type (.only sharing) + [primitive (.except)]]]]] + ["[0]" // + ["[1]" async (.use "[1]#[0]" monad)] + ["[0]" thread] + ["[0]" atom (.only Atom)]]) + +(primitive .public (Async value) + (Record + [#cancel! (IO Bit) + #async (//.Async (Maybe value))]) + + (def .public async + (All (_ value) + (-> (Async value) + (//.Async (Maybe value)))) + (|>> representation (the #async))) + + (def .public cancel! + (All (_ value) + (-> (Async value) (IO Bit))) + (|>> representation (the #cancel!))) + + (with_template [ ] + [(def .public + (All (_ value) + (-> (Async value) + (IO Bit))) + (|>> representation + (the #async) + //.value + (io#each (function (_ it) + (when it + + true + + _ + false)))))] + + [pending? {.#None}] + [cancelled? {.#Some {.#None}}] + [completed? {.#Some {.#Some _}}] + ) + + (def .public functor + (Functor Async) + (implementation + (def (each $ it) + (|> it + representation + (revised #async (//#each (maybe#each $))) + abstraction)))) + + (def .public monad + (Monad Async) + (implementation + (def functor ..functor) + + (def in + (|>> maybe#in + //#in + [#cancel! (io#in false) + #async] + abstraction)) + + (def (conjoint !!it) + (let [!!it (representation !!it)] + (abstraction + [#cancel! (the #cancel! !!it) + #async (do [! //.monad] + [?!it (the #async !!it)] + (when ?!it + {.#Some !it} + (the #async (representation !it)) + + {.#None} + (in {.#None})))]))))) + + (def .public (and left right) + (All (_ left right) + (-> (Async left) (Async right) + (Async (And left right)))) + (let [[read! write!] (sharing [left right] + (is [(Async left) (Async right)] + [left right]) + (is [(//.Async (Maybe (And left right))) + (//.Resolver (Maybe (And left right)))] + (//.async []))) + _ (io.run! (//.upon! (function (_ left) + (when left + {.#None} + (write! {.#None}) + + {.#Some left} + (//.upon! (function (_ right) + (when right + {.#None} + (write! {.#None}) + + {.#Some right} + (write! {.#Some [left right]}))) + (the #async (representation right))))) + (the #async (representation left))))] + (abstraction + [#cancel! (write! {.#None}) + #async read!]))) + + (def .public (or left right) + (All (_ left right) + (-> (Async left) (Async right) + (Async (Or left right)))) + (let [[left|right write!] (sharing [left right] + (is [(Async left) (Async right)] + [left right]) + (is [(//.Async (Maybe (Either left right))) + (//.Resolver (Maybe (Either left right)))] + (//.async [])))] + (with_expansions + [ (with_template [ ] + [(io.run! (//.upon! (function (_ it) + (when it + {.#Some it} + (write! {.#Some { it}}) + + {.#None} + (//.upon! (function (_ it) + (when it + {.#Some it} + (io#in []) + + {.#None} + (write! {.#None}))) + (the #async (representation ))))) + (the #async (representation ))))] + + [left right .#Left] + [right left .#Right] + )] + (exec + + (abstraction + [#cancel! (write! {.#None}) + #async left|right]))))) + + (def .public (either left right) + (All (_ value) + (-> (Async value) (Async value) + (Async value))) + (let [[left||right write!] (sharing [value] + (is (Async value) + left) + (is [(//.Async (Maybe value)) + (//.Resolver (Maybe value))] + (//.async [])))] + (with_expansions + [ (with_template [ ] + [(io.run! (//.upon! (function (_ it) + (when it + {.#Some it} + (write! {.#Some it}) + + {.#None} + (//.upon! (function (_ it) + (when it + {.#Some it} + (io#in []) + + {.#None} + (write! {.#None}))) + (the #async (representation ))))) + (the #async (representation ))))] + + [left right] + [right left] + )] + (exec + + (abstraction + [#cancel! (write! {.#None}) + #async left||right]))))) + + (type .public (Action value) + (-> (Async value) + (IO (Maybe value)))) + + (type ID + Nat) + + (type Pending + (Dictionary ID (Ex (_ value) (Async value)))) + + (primitive .public (Scope value) + (Record + [#pending (Atom [ID Pending]) + #itself (Async value)]) + + (def .public close! + (All (_ value) + (-> (Scope value) + (IO Bit))) + (|>> (representation Scope) + (the #itself) + (representation Async) + (the #cancel!))) + + (def .public closed? + (All (_ value) + (-> (Scope value) + (IO Bit))) + (|>> (representation Scope) + (the #itself) + cancelled?)) + + (def .public (with_scope body) + (All (_ value) + (-> (-> (Scope value) (Async value)) + (Async value))) + (let [[async resolve!] (sharing [value] + (is (-> (Scope value) (Async value)) + body) + (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))] + (//.async []))) + pending (is (Atom [ID Pending]) + (atom.atom [0 (dictionary.empty nat.hash)])) + close! (do [! io.monad] + [... Cancel all pending tasks. + [next pending] (atom.read! pending) + _ (monad.each ! cancel! (dictionary.values pending))] + (resolve! {.#None})) + scope (<| (abstraction Scope) + [#pending pending + #itself (<| (abstraction Async) + [#cancel! close! + #async async])]) + it (body scope)] + (<| (abstraction Async) + [#cancel! close! + #async (exec + (|> (representation Async it) + (the #async) + (//.upon! (function (_ it) + (do io.monad + [_ (resolve! it)] + close!))) + io.run!) + async)]))) + + (def (un_register! scope id) + (All (_ value) + (-> (Scope value) ID (IO Any))) + (atom.update! (function (_ [next pending]) + (|> pending + (dictionary.lacks id) + [next])) + (|> scope representation (the #pending)))) + + (def (register! scope it) + (All (_ value) + (-> (Scope value) (Async value) (IO Any))) + (do io.monad + [[[next _] _] (atom.update! (function (_ [next pending]) + (|> pending + (dictionary.has next it) + [(++ next)])) + (|> scope representation (the #pending)))] + (|> it + (representation Async) + (the #async) + (//.upon! (function (_ _) + (un_register! scope next)))))) + + (def .public (schedule! scope milli_seconds action) + (All (_ value) + (-> (Scope value) Nat (Action value) + (Async value))) + (let [[async resolve!] (sharing [value] + (is (Action value) + action) + (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))] + (//.async []))) + cancel! (resolve! {.#None}) + it (<| (abstraction Async) + [#cancel! cancel! + #async async])] + (exec + (<| io.run! + (is (IO Any)) + (do [! io.monad] + [? (closed? scope)] + (if ? + cancel! + (do ! + [_ (register! scope it)] + (<| (thread.schedule! milli_seconds) + (do ! + [_ (in []) + value (action it)] + (resolve! value))))))) + it))) + ) + ) + +(def .public (future scope it) + (All (_ value) + (-> (Scope value) (Action value) + (Async value))) + (..schedule! scope 0 it)) + +(def .public (after scope milli_seconds value) + (All (_ value) + (-> (Scope value) Nat value + (Async value))) + (..schedule! scope milli_seconds + (function (_ _) + (io#in {.#Some value})))) + +(def .public (delay scope milli_seconds) + (All (_ value) + (-> (Scope value) Nat + (Async Any))) + (..after scope milli_seconds [])) diff --git a/stdlib/source/library/lux/meta/compiler/version.lux b/stdlib/source/library/lux/meta/compiler/version.lux index fa67df166..e576a7c29 100644 --- a/stdlib/source/library/lux/meta/compiler/version.lux +++ b/stdlib/source/library/lux/meta/compiler/version.lux @@ -30,9 +30,10 @@ (def .public major (-> Version Nat) - (|>> ..next ..next ..level)) + (|>> ..next ..next)) -(def separator ".") +(def separator + ".") (def (padded value) (-> Nat Text) -- cgit v1.2.3