From 156fac89df89669ee660bd075f516dd8d57abd19 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Thu, 25 Aug 2022 16:13:32 -0400 Subject: Added support for structured concurrency. --- documentation/bookmark/concurrency/structured.md | 9 + .../library/lux/control/concurrency/structured.lux | 337 +++++++++++++++++++++ .../source/library/lux/meta/compiler/version.lux | 5 +- stdlib/source/test/lux/control.lux | 4 +- .../test/lux/control/concurrency/structured.lux | 240 +++++++++++++++ stdlib/source/test/lux/meta/compiler/version.lux | 6 +- stdlib/source/test/lux/meta/type/implicit.lux | 6 +- 7 files changed, 598 insertions(+), 9 deletions(-) create mode 100644 stdlib/source/library/lux/control/concurrency/structured.lux create mode 100644 stdlib/source/test/lux/control/concurrency/structured.lux diff --git a/documentation/bookmark/concurrency/structured.md b/documentation/bookmark/concurrency/structured.md index a3c68e0a4..a45a0de91 100644 --- a/documentation/bookmark/concurrency/structured.md +++ b/documentation/bookmark/concurrency/structured.md @@ -6,4 +6,13 @@ 0. [JEP 428: Structured Concurrency to Simplify Java Multithreaded Programming](https://www.infoq.com/news/2022/06/java-structured-concurrency/) 0. [Seamless, Fearless, and Structured Concurrency](https://verdagon.dev/blog/seamless-fearless-structured-concurrency) 0. [Handling user input with structured concurrency](https://dubroy.com/blog/handling-user-input-with-structured-concurrency/) +0. [Notes on structured concurrency, or: Go statement considered harmful](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/) +0. [Structured Concurrency](https://250bpm.com/blog:71/) +0. [Two Approaches to Structured Concurrency](https://250bpm.com/blog:139/) +0. [Some thoughts on asynchronous API design in a post-async/await world](https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/) +0. [Structured Concurrency Kickoff](https://trio.discourse.group/t/structured-concurrency-kickoff/55) +0. [Structured concurrency](https://elizarov.medium.com/structured-concurrency-722d765aa952) +0. [Structured concurrency in Python with AnyIO: How to improve your spaghetti asyncio code](https://mattwestcott.org/blog/structured-concurrency-in-python-with-anyio) +0. [Structured concurrency and pure functions](https://blog.softwaremill.com/structured-concurrency-and-pure-functions-92dd8ed1a9f2) +0. [Structured Concurrency](https://ericniebler.com/2020/11/08/structured-concurrency/) diff --git a/stdlib/source/library/lux/control/concurrency/structured.lux b/stdlib/source/library/lux/control/concurrency/structured.lux new file mode 100644 index 000000000..8c0aab11b --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/structured.lux @@ -0,0 +1,337 @@ +(.require + [library + [lux (.except Scope or and) + [abstract + [functor (.only Functor)] + ["[0]" monad (.only Monad do)]] + [control + ["[0]" maybe (.use "[1]#[0]" monad)] + ["[0]" io (.only IO) (.use "[1]#[0]" monad)]] + [data + [collection + ["[0]" dictionary (.only Dictionary)]]] + [math + [number + ["[0]" nat]]] + [meta + [type (.only sharing) + [primitive (.except)]]]]] + ["[0]" // + ["[1]" async (.use "[1]#[0]" monad)] + ["[0]" thread] + ["[0]" atom (.only Atom)]]) + +(primitive .public (Async value) + (Record + [#cancel! (IO Bit) + #async (//.Async (Maybe value))]) + + (def .public async + (All (_ value) + (-> (Async value) + (//.Async (Maybe value)))) + (|>> representation (the #async))) + + (def .public cancel! + (All (_ value) + (-> (Async value) (IO Bit))) + (|>> representation (the #cancel!))) + + (with_template [ ] + [(def .public + (All (_ value) + (-> (Async value) + (IO Bit))) + (|>> representation + (the #async) + //.value + (io#each (function (_ it) + (when it + + true + + _ + false)))))] + + [pending? {.#None}] + [cancelled? {.#Some {.#None}}] + [completed? {.#Some {.#Some _}}] + ) + + (def .public functor + (Functor Async) + (implementation + (def (each $ it) + (|> it + representation + (revised #async (//#each (maybe#each $))) + abstraction)))) + + (def .public monad + (Monad Async) + (implementation + (def functor ..functor) + + (def in + (|>> maybe#in + //#in + [#cancel! (io#in false) + #async] + abstraction)) + + (def (conjoint !!it) + (let [!!it (representation !!it)] + (abstraction + [#cancel! (the #cancel! !!it) + #async (do [! //.monad] + [?!it (the #async !!it)] + (when ?!it + {.#Some !it} + (the #async (representation !it)) + + {.#None} + (in {.#None})))]))))) + + (def .public (and left right) + (All (_ left right) + (-> (Async left) (Async right) + (Async (And left right)))) + (let [[read! write!] (sharing [left right] + (is [(Async left) (Async right)] + [left right]) + (is [(//.Async (Maybe (And left right))) + (//.Resolver (Maybe (And left right)))] + (//.async []))) + _ (io.run! (//.upon! (function (_ left) + (when left + {.#None} + (write! {.#None}) + + {.#Some left} + (//.upon! (function (_ right) + (when right + {.#None} + (write! {.#None}) + + {.#Some right} + (write! {.#Some [left right]}))) + (the #async (representation right))))) + (the #async (representation left))))] + (abstraction + [#cancel! (write! {.#None}) + #async read!]))) + + (def .public (or left right) + (All (_ left right) + (-> (Async left) (Async right) + (Async (Or left right)))) + (let [[left|right write!] (sharing [left right] + (is [(Async left) (Async right)] + [left right]) + (is [(//.Async (Maybe (Either left right))) + (//.Resolver (Maybe (Either left right)))] + (//.async [])))] + (with_expansions + [ (with_template [ ] + [(io.run! (//.upon! (function (_ it) + (when it + {.#Some it} + (write! {.#Some { it}}) + + {.#None} + (//.upon! (function (_ it) + (when it + {.#Some it} + (io#in []) + + {.#None} + (write! {.#None}))) + (the #async (representation ))))) + (the #async (representation ))))] + + [left right .#Left] + [right left .#Right] + )] + (exec + + (abstraction + [#cancel! (write! {.#None}) + #async left|right]))))) + + (def .public (either left right) + (All (_ value) + (-> (Async value) (Async value) + (Async value))) + (let [[left||right write!] (sharing [value] + (is (Async value) + left) + (is [(//.Async (Maybe value)) + (//.Resolver (Maybe value))] + (//.async [])))] + (with_expansions + [ (with_template [ ] + [(io.run! (//.upon! (function (_ it) + (when it + {.#Some it} + (write! {.#Some it}) + + {.#None} + (//.upon! (function (_ it) + (when it + {.#Some it} + (io#in []) + + {.#None} + (write! {.#None}))) + (the #async (representation ))))) + (the #async (representation ))))] + + [left right] + [right left] + )] + (exec + + (abstraction + [#cancel! (write! {.#None}) + #async left||right]))))) + + (type .public (Action value) + (-> (Async value) + (IO (Maybe value)))) + + (type ID + Nat) + + (type Pending + (Dictionary ID (Ex (_ value) (Async value)))) + + (primitive .public (Scope value) + (Record + [#pending (Atom [ID Pending]) + #itself (Async value)]) + + (def .public close! + (All (_ value) + (-> (Scope value) + (IO Bit))) + (|>> (representation Scope) + (the #itself) + (representation Async) + (the #cancel!))) + + (def .public closed? + (All (_ value) + (-> (Scope value) + (IO Bit))) + (|>> (representation Scope) + (the #itself) + cancelled?)) + + (def .public (with_scope body) + (All (_ value) + (-> (-> (Scope value) (Async value)) + (Async value))) + (let [[async resolve!] (sharing [value] + (is (-> (Scope value) (Async value)) + body) + (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))] + (//.async []))) + pending (is (Atom [ID Pending]) + (atom.atom [0 (dictionary.empty nat.hash)])) + close! (do [! io.monad] + [... Cancel all pending tasks. + [next pending] (atom.read! pending) + _ (monad.each ! cancel! (dictionary.values pending))] + (resolve! {.#None})) + scope (<| (abstraction Scope) + [#pending pending + #itself (<| (abstraction Async) + [#cancel! close! + #async async])]) + it (body scope)] + (<| (abstraction Async) + [#cancel! close! + #async (exec + (|> (representation Async it) + (the #async) + (//.upon! (function (_ it) + (do io.monad + [_ (resolve! it)] + close!))) + io.run!) + async)]))) + + (def (un_register! scope id) + (All (_ value) + (-> (Scope value) ID (IO Any))) + (atom.update! (function (_ [next pending]) + (|> pending + (dictionary.lacks id) + [next])) + (|> scope representation (the #pending)))) + + (def (register! scope it) + (All (_ value) + (-> (Scope value) (Async value) (IO Any))) + (do io.monad + [[[next _] _] (atom.update! (function (_ [next pending]) + (|> pending + (dictionary.has next it) + [(++ next)])) + (|> scope representation (the #pending)))] + (|> it + (representation Async) + (the #async) + (//.upon! (function (_ _) + (un_register! scope next)))))) + + (def .public (schedule! scope milli_seconds action) + (All (_ value) + (-> (Scope value) Nat (Action value) + (Async value))) + (let [[async resolve!] (sharing [value] + (is (Action value) + action) + (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))] + (//.async []))) + cancel! (resolve! {.#None}) + it (<| (abstraction Async) + [#cancel! cancel! + #async async])] + (exec + (<| io.run! + (is (IO Any)) + (do [! io.monad] + [? (closed? scope)] + (if ? + cancel! + (do ! + [_ (register! scope it)] + (<| (thread.schedule! milli_seconds) + (do ! + [_ (in []) + value (action it)] + (resolve! value))))))) + it))) + ) + ) + +(def .public (future scope it) + (All (_ value) + (-> (Scope value) (Action value) + (Async value))) + (..schedule! scope 0 it)) + +(def .public (after scope milli_seconds value) + (All (_ value) + (-> (Scope value) Nat value + (Async value))) + (..schedule! scope milli_seconds + (function (_ _) + (io#in {.#Some value})))) + +(def .public (delay scope milli_seconds) + (All (_ value) + (-> (Scope value) Nat + (Async Any))) + (..after scope milli_seconds [])) diff --git a/stdlib/source/library/lux/meta/compiler/version.lux b/stdlib/source/library/lux/meta/compiler/version.lux index fa67df166..e576a7c29 100644 --- a/stdlib/source/library/lux/meta/compiler/version.lux +++ b/stdlib/source/library/lux/meta/compiler/version.lux @@ -30,9 +30,10 @@ (def .public major (-> Version Nat) - (|>> ..next ..next ..level)) + (|>> ..next ..next)) -(def separator ".") +(def separator + ".") (def (padded value) (-> Nat Text) diff --git a/stdlib/source/test/lux/control.lux b/stdlib/source/test/lux/control.lux index 4eba3a2a7..18ab1d19d 100644 --- a/stdlib/source/test/lux/control.lux +++ b/stdlib/source/test/lux/control.lux @@ -16,7 +16,8 @@ ["[1]/[0]" stm] ["[1]/[0]" event] ["[1]/[0]" cps] - ["[1]/[0]" incremental]] + ["[1]/[0]" incremental] + ["[1]/[0]" structured]] ["[1][0]" continuation] ["[1][0]" exception] ["[1][0]" function] @@ -50,6 +51,7 @@ /concurrency/event.test /concurrency/cps.test /concurrency/incremental.test + /concurrency/structured.test )) (def security diff --git a/stdlib/source/test/lux/control/concurrency/structured.lux b/stdlib/source/test/lux/control/concurrency/structured.lux new file mode 100644 index 000000000..cd42dd4f0 --- /dev/null +++ b/stdlib/source/test/lux/control/concurrency/structured.lux @@ -0,0 +1,240 @@ +(.require + [library + [lux (.except) + [abstract + ["[0]" monad (.only do)] + [\\specification + ["$[0]" functor (.only Injection Comparison)] + ["$[0]" monad]]] + [control + ["[0]" maybe (.use "[1]#[0]" functor)] + ["[0]" try] + ["[0]" io (.use "[1]#[0]" monad)]] + [data + [collection + ["[0]" list (.use "[1]#[0]" functor)]]] + [math + ["[0]" random] + [number + ["n" nat]]] + [test + ["_" property (.only Test)] + ["[0]" unit]]]] + [\\library + ["[0]" / (.only) + [// + ["[0]" atom (.only Atom)] + ["[0]" async]]]]) + +(def injection + (Injection /.Async) + (at /.monad in)) + +(def comparison + (Comparison /.Async) + (function (_ == left right) + (io.run! + (do io.monad + [?left (async.value (/.async left)) + ?right (async.value (/.async right))] + (in (when [?left ?right] + [{.#Some {.#Some left}} + {.#Some {.#Some right}}] + (== left right) + + _ + false)))))) + +(def (action _) + (-> [] [(Atom Bit) (/.Action Any)]) + (let [completed? (is (Atom Bit) + (atom.atom false))] + [completed? + (function (_ it) + (do io.monad + [? (/.pending? it)] + (if ? + (do io.monad + [_ (atom.write! true completed?)] + (in {.#Some []})) + (io#in {.#Some []}))))])) + +(def .public test + Test + (do [! random.monad] + [short (at ! each (|>> (n.% 10) ++) random.nat) + long (at ! each (|>> (n.% 2) ++ (n.* 50)) random.nat) + leftE random.nat + rightE random.nat + + in_parallel (at ! each (|>> (n.% 10) (n.+ 2)) random.nat)] + (<| (_.covering /._) + (_.for [/.Async]) + (all _.and + (_.for [/.functor] + ($functor.spec ..injection ..comparison /.functor)) + (_.for [/.monad] + ($monad.spec ..injection ..comparison /.monad)) + + (in (do async.monad + [leftA (<| /.async + (at /.monad in leftE))] + (unit.coverage [/.async] + (|> leftA + (maybe#each (same? leftE)) + (maybe.else false))))) + (in (do async.monad + [? (<| /.async + /.with_scope + (function (_ scope)) + (/.schedule! scope short (function (_ _) (io#in {.#Some true}))))] + (unit.coverage [/.schedule!] + (maybe.else false ?)))) + (in (do async.monad + [_ (<| /.async + /.with_scope + (function (_ scope)) + (/.future scope (function (_ _) (io#in {.#Some true}))))] + (unit.coverage [/.future] + true))) + (in (do async.monad + [_ (<| /.async + /.with_scope + (function (_ scope)) + (/.after scope short []))] + (unit.coverage [/.after] + true))) + (in (do async.monad + [_ (<| /.async + /.with_scope + (function (_ scope)) + (/.delay scope short))] + (unit.coverage [/.delay] + true))) + (in (do async.monad + [.let [all_cancelled (is (Atom Nat) + (atom.atom 0)) + [done? done!] (is [(async.Async []) (async.Resolver [])] + (async.async [])) + scope (<| /.with_scope + (function (_ scope)) + (monad.all /.monad) + (list#each (function (_ _) + (/.schedule! scope long + (function (_ it) + (do [! io.monad] + [? (/.cancelled? it)] + (if ? + (do ! + [[before after] (atom.update! ++ all_cancelled) + _ (if (n.= in_parallel after) + (done! []) + (in false))] + (in {.#Some []})) + (io#in {.#Some []}))))))) + (list.repeated in_parallel [])) + _ (io.run! (/.cancel! scope))] + _ (/.async scope) + _ done? + all_cancelled (async.future (atom.read! all_cancelled))] + (unit.coverage [/.with_scope] + (n.= in_parallel all_cancelled)))) + + (_.coverage [/.pending?] + (io.run! (/.pending? (<| /.with_scope + (function (_ scope)) + (/.schedule! scope long (function (_ _) (io#in {.#Some []}))))))) + (_.coverage [/.completed?] + (io.run! (/.completed? (at /.monad in [])))) + (in (do async.monad + [.let [[done? done!] (is [(async.Async Bit) (async.Resolver Bit)] + (async.async [])) + it (<| /.with_scope + (function (_ scope)) + (/.schedule! scope long) + (function (_ it) + (do io.monad + [pre (/.cancel! it) + post (/.cancelled? it) + _ (done! (and pre post))] + (in {.#Some []}))))] + _ (/.async it) + ? done?] + (unit.coverage [/.cancel! /.cancelled?] + ?))) + (in (do async.monad + [.let [[done? done!] (is [(async.Async Bit) (async.Resolver Bit)] + (async.async [])) + + [@not_completed action] (..action []) + to_cancel (<| /.with_scope + (function (_ scope)) + (/.schedule! scope long) + (function (_ it) + (do [! io.monad] + [pre (/.cancel! it) + _ (done! pre)] + (in {.#None}))))] + cancelled! done? + .let [confirmed! (io.run! (/.cancelled? to_cancel))] + _ (/.async to_cancel) + not_completed! (async.future (io#each not (atom.read! @not_completed))) + + .let [[@completed action] (..action []) + to_complete (<| /.with_scope + (function (_ scope)) + (/.schedule! scope long action))] + _ (/.async to_complete) + completed! (async.future (atom.read! @completed))] + (unit.coverage [/.Action] + (and cancelled! + confirmed! + not_completed! + completed!)))) + + (in (do async.monad + [left&right (/.async (with /.monad + (/.and (in leftE) (in rightE))))] + (unit.coverage [/.and] + (<| (maybe.else false) + (do maybe.monad + [[leftA rightA] left&right] + (in (and (same? leftE leftA) + (same? rightE rightA)))))))) + (in (do [! async.monad] + [left (/.async (with /.monad + (/.or (in leftE) (in rightE)))) + right (let [left (<| /.with_scope + (function (_ scope)) + (/.schedule! scope long (function (_ _) (io#in {.#Some leftE}))))] + (do ! + [_ (async.future (/.cancel! left))] + (/.async (/.or left (at /.monad in rightE)))))] + (unit.coverage [/.or] + (when [left right] + [{.#Some {.#Left leftA}} + {.#Some {.#Right rightA}}] + (and (same? leftE leftA) + (same? rightE rightA)) + + _ + false)))) + (in (do [! async.monad] + [left (/.async (with /.monad + (/.either (in leftE) (in rightE)))) + right (let [left (<| /.with_scope + (function (_ scope)) + (/.schedule! scope long (function (_ _) (io#in {.#Some leftE}))))] + (do ! + [_ (async.future (/.cancel! left))] + (/.async (/.either left (at /.monad in rightE)))))] + (unit.coverage [/.either] + (when [left right] + [{.#Some leftA} + {.#Some rightA}] + (and (same? leftE leftA) + (same? rightE rightA)) + + _ + false)))) + )))) diff --git a/stdlib/source/test/lux/meta/compiler/version.lux b/stdlib/source/test/lux/meta/compiler/version.lux index 2129286ae..fd1df3bd7 100644 --- a/stdlib/source/test/lux/meta/compiler/version.lux +++ b/stdlib/source/test/lux/meta/compiler/version.lux @@ -6,11 +6,11 @@ [data ["[0]" bit (.use "[1]#[0]" equivalence)] ["[0]" text (.use "[1]#[0]" equivalence) - ["%" \\format (.only format)]]] + ["%" \\format]]] [math ["[0]" random (.only Random)] [number - ["n" nat]]] + ["[0]" nat]]] [test ["_" property (.only Test)]]]] [\\library @@ -29,7 +29,7 @@ that ..random] (`` (all _.and (_.coverage [/.format] - (bit#= (n.= this that) + (bit#= (nat.= this that) (text#= (/.format this) (/.format that)))) (,, (with_template [] [(_.coverage [] diff --git a/stdlib/source/test/lux/meta/type/implicit.lux b/stdlib/source/test/lux/meta/type/implicit.lux index 66daa4a32..233c121ca 100644 --- a/stdlib/source/test/lux/meta/type/implicit.lux +++ b/stdlib/source/test/lux/meta/type/implicit.lux @@ -35,13 +35,13 @@ left random.nat right random.nat] (all _.and - (_.coverage [/.a/an] + (_.coverage [/.a/an /.a /.an] (let [first_order! (let [(open "list#[0]") (list.equivalence n.equivalence)] (and (bit#= (at n.equivalence = left right) - (/.a/an = left right)) + (/.a = left right)) (list#= (at list.functor each ++ (enum.range n.enum start end)) - (/.a/an each ++ (enum.range n.enum start end))))) + (/.an each ++ (enum.range n.enum start end))))) second_order! (/.a/an = -- cgit v1.2.3