diff options
author | Eduardo Julian | 2022-08-25 16:13:32 -0400 |
---|---|---|
committer | Eduardo Julian | 2022-08-25 16:13:32 -0400 |
commit | 156fac89df89669ee660bd075f516dd8d57abd19 (patch) | |
tree | 5f01f67d53b3313a27ec6048fe8f100a5a3b409d /stdlib/source/library | |
parent | dc78af618f175ffc5e6a653256ca6b27a260fe83 (diff) |
Added support for structured concurrency.
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/structured.lux | 337 | ||||
-rw-r--r-- | stdlib/source/library/lux/meta/compiler/version.lux | 5 |
2 files changed, 340 insertions, 2 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/structured.lux b/stdlib/source/library/lux/control/concurrency/structured.lux new file mode 100644 index 000000000..8c0aab11b --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/structured.lux @@ -0,0 +1,337 @@ +(.require + [library + [lux (.except Scope or and) + [abstract + [functor (.only Functor)] + ["[0]" monad (.only Monad do)]] + [control + ["[0]" maybe (.use "[1]#[0]" monad)] + ["[0]" io (.only IO) (.use "[1]#[0]" monad)]] + [data + [collection + ["[0]" dictionary (.only Dictionary)]]] + [math + [number + ["[0]" nat]]] + [meta + [type (.only sharing) + [primitive (.except)]]]]] + ["[0]" // + ["[1]" async (.use "[1]#[0]" monad)] + ["[0]" thread] + ["[0]" atom (.only Atom)]]) + +(primitive .public (Async value) + (Record + [#cancel! (IO Bit) + #async (//.Async (Maybe value))]) + + (def .public async + (All (_ value) + (-> (Async value) + (//.Async (Maybe value)))) + (|>> representation (the #async))) + + (def .public cancel! + (All (_ value) + (-> (Async value) (IO Bit))) + (|>> representation (the #cancel!))) + + (with_template [<name> <pattern>] + [(def .public <name> + (All (_ value) + (-> (Async value) + (IO Bit))) + (|>> representation + (the #async) + //.value + (io#each (function (_ it) + (when it + <pattern> + true + + _ + false)))))] + + [pending? {.#None}] + [cancelled? {.#Some {.#None}}] + [completed? {.#Some {.#Some _}}] + ) + + (def .public functor + (Functor Async) + (implementation + (def (each $ it) + (|> it + representation + (revised #async (//#each (maybe#each $))) + abstraction)))) + + (def .public monad + (Monad Async) + (implementation + (def functor ..functor) + + (def in + (|>> maybe#in + //#in + [#cancel! (io#in false) + #async] + abstraction)) + + (def (conjoint !!it) + (let [!!it (representation !!it)] + (abstraction + [#cancel! (the #cancel! !!it) + #async (do [! //.monad] + [?!it (the #async !!it)] + (when ?!it + {.#Some !it} + (the #async (representation !it)) + + {.#None} + (in {.#None})))]))))) + + (def .public (and left right) + (All (_ left right) + (-> (Async left) (Async right) + (Async (And left right)))) + (let [[read! write!] (sharing [left right] + (is [(Async left) (Async right)] + [left right]) + (is [(//.Async (Maybe (And left right))) + (//.Resolver (Maybe (And left right)))] + (//.async []))) + _ (io.run! (//.upon! (function (_ left) + (when left + {.#None} + (write! {.#None}) + + {.#Some left} + (//.upon! (function (_ right) + (when right + {.#None} + (write! {.#None}) + + {.#Some right} + (write! {.#Some [left right]}))) + (the #async (representation right))))) + (the #async (representation left))))] + (abstraction + [#cancel! (write! {.#None}) + #async read!]))) + + (def .public (or left right) + (All (_ left right) + (-> (Async left) (Async right) + (Async (Or left right)))) + (let [[left|right write!] (sharing [left right] + (is [(Async left) (Async right)] + [left right]) + (is [(//.Async (Maybe (Either left right))) + (//.Resolver (Maybe (Either left right)))] + (//.async [])))] + (with_expansions + [<sides> (with_template [<side> <other> <tag>] + [(io.run! (//.upon! (function (_ it) + (when it + {.#Some it} + (write! {.#Some {<tag> it}}) + + {.#None} + (//.upon! (function (_ it) + (when it + {.#Some it} + (io#in []) + + {.#None} + (write! {.#None}))) + (the #async (representation <other>))))) + (the #async (representation <side>))))] + + [left right .#Left] + [right left .#Right] + )] + (exec + <sides> + (abstraction + [#cancel! (write! {.#None}) + #async left|right]))))) + + (def .public (either left right) + (All (_ value) + (-> (Async value) (Async value) + (Async value))) + (let [[left||right write!] (sharing [value] + (is (Async value) + left) + (is [(//.Async (Maybe value)) + (//.Resolver (Maybe value))] + (//.async [])))] + (with_expansions + [<sides> (with_template [<side> <other>] + [(io.run! (//.upon! (function (_ it) + (when it + {.#Some it} + (write! {.#Some it}) + + {.#None} + (//.upon! (function (_ it) + (when it + {.#Some it} + (io#in []) + + {.#None} + (write! {.#None}))) + (the #async (representation <other>))))) + (the #async (representation <side>))))] + + [left right] + [right left] + )] + (exec + <sides> + (abstraction + [#cancel! (write! {.#None}) + #async left||right]))))) + + (type .public (Action value) + (-> (Async value) + (IO (Maybe value)))) + + (type ID + Nat) + + (type Pending + (Dictionary ID (Ex (_ value) (Async value)))) + + (primitive .public (Scope value) + (Record + [#pending (Atom [ID Pending]) + #itself (Async value)]) + + (def .public close! + (All (_ value) + (-> (Scope value) + (IO Bit))) + (|>> (representation Scope) + (the #itself) + (representation Async) + (the #cancel!))) + + (def .public closed? + (All (_ value) + (-> (Scope value) + (IO Bit))) + (|>> (representation Scope) + (the #itself) + cancelled?)) + + (def .public (with_scope body) + (All (_ value) + (-> (-> (Scope value) (Async value)) + (Async value))) + (let [[async resolve!] (sharing [value] + (is (-> (Scope value) (Async value)) + body) + (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))] + (//.async []))) + pending (is (Atom [ID Pending]) + (atom.atom [0 (dictionary.empty nat.hash)])) + close! (do [! io.monad] + [... Cancel all pending tasks. + [next pending] (atom.read! pending) + _ (monad.each ! cancel! (dictionary.values pending))] + (resolve! {.#None})) + scope (<| (abstraction Scope) + [#pending pending + #itself (<| (abstraction Async) + [#cancel! close! + #async async])]) + it (body scope)] + (<| (abstraction Async) + [#cancel! close! + #async (exec + (|> (representation Async it) + (the #async) + (//.upon! (function (_ it) + (do io.monad + [_ (resolve! it)] + close!))) + io.run!) + async)]))) + + (def (un_register! scope id) + (All (_ value) + (-> (Scope value) ID (IO Any))) + (atom.update! (function (_ [next pending]) + (|> pending + (dictionary.lacks id) + [next])) + (|> scope representation (the #pending)))) + + (def (register! scope it) + (All (_ value) + (-> (Scope value) (Async value) (IO Any))) + (do io.monad + [[[next _] _] (atom.update! (function (_ [next pending]) + (|> pending + (dictionary.has next it) + [(++ next)])) + (|> scope representation (the #pending)))] + (|> it + (representation Async) + (the #async) + (//.upon! (function (_ _) + (un_register! scope next)))))) + + (def .public (schedule! scope milli_seconds action) + (All (_ value) + (-> (Scope value) Nat (Action value) + (Async value))) + (let [[async resolve!] (sharing [value] + (is (Action value) + action) + (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))] + (//.async []))) + cancel! (resolve! {.#None}) + it (<| (abstraction Async) + [#cancel! cancel! + #async async])] + (exec + (<| io.run! + (is (IO Any)) + (do [! io.monad] + [? (closed? scope)] + (if ? + cancel! + (do ! + [_ (register! scope it)] + (<| (thread.schedule! milli_seconds) + (do ! + [_ (in []) + value (action it)] + (resolve! value))))))) + it))) + ) + ) + +(def .public (future scope it) + (All (_ value) + (-> (Scope value) (Action value) + (Async value))) + (..schedule! scope 0 it)) + +(def .public (after scope milli_seconds value) + (All (_ value) + (-> (Scope value) Nat value + (Async value))) + (..schedule! scope milli_seconds + (function (_ _) + (io#in {.#Some value})))) + +(def .public (delay scope milli_seconds) + (All (_ value) + (-> (Scope value) Nat + (Async Any))) + (..after scope milli_seconds [])) diff --git a/stdlib/source/library/lux/meta/compiler/version.lux b/stdlib/source/library/lux/meta/compiler/version.lux index fa67df166..e576a7c29 100644 --- a/stdlib/source/library/lux/meta/compiler/version.lux +++ b/stdlib/source/library/lux/meta/compiler/version.lux @@ -30,9 +30,10 @@ (def .public major (-> Version Nat) - (|>> ..next ..next ..level)) + (|>> ..next ..next)) -(def separator ".") +(def separator + ".") (def (padded value) (-> Nat Text) |