aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEduardo Julian2022-08-25 16:13:32 -0400
committerEduardo Julian2022-08-25 16:13:32 -0400
commit156fac89df89669ee660bd075f516dd8d57abd19 (patch)
tree5f01f67d53b3313a27ec6048fe8f100a5a3b409d
parentdc78af618f175ffc5e6a653256ca6b27a260fe83 (diff)
Added support for structured concurrency.
-rw-r--r--documentation/bookmark/concurrency/structured.md9
-rw-r--r--stdlib/source/library/lux/control/concurrency/structured.lux337
-rw-r--r--stdlib/source/library/lux/meta/compiler/version.lux5
-rw-r--r--stdlib/source/test/lux/control.lux4
-rw-r--r--stdlib/source/test/lux/control/concurrency/structured.lux240
-rw-r--r--stdlib/source/test/lux/meta/compiler/version.lux6
-rw-r--r--stdlib/source/test/lux/meta/type/implicit.lux6
7 files changed, 598 insertions, 9 deletions
diff --git a/documentation/bookmark/concurrency/structured.md b/documentation/bookmark/concurrency/structured.md
index a3c68e0a4..a45a0de91 100644
--- a/documentation/bookmark/concurrency/structured.md
+++ b/documentation/bookmark/concurrency/structured.md
@@ -6,4 +6,13 @@
0. [JEP 428: Structured Concurrency to Simplify Java Multithreaded Programming](https://www.infoq.com/news/2022/06/java-structured-concurrency/)
0. [Seamless, Fearless, and Structured Concurrency](https://verdagon.dev/blog/seamless-fearless-structured-concurrency)
0. [Handling user input with structured concurrency](https://dubroy.com/blog/handling-user-input-with-structured-concurrency/)
+0. [Notes on structured concurrency, or: Go statement considered harmful](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/)
+0. [Structured Concurrency](https://250bpm.com/blog:71/)
+0. [Two Approaches to Structured Concurrency](https://250bpm.com/blog:139/)
+0. [Some thoughts on asynchronous API design in a post-async/await world](https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/)
+0. [Structured Concurrency Kickoff](https://trio.discourse.group/t/structured-concurrency-kickoff/55)
+0. [Structured concurrency](https://elizarov.medium.com/structured-concurrency-722d765aa952)
+0. [Structured concurrency in Python with AnyIO: How to improve your spaghetti asyncio code](https://mattwestcott.org/blog/structured-concurrency-in-python-with-anyio)
+0. [Structured concurrency and pure functions](https://blog.softwaremill.com/structured-concurrency-and-pure-functions-92dd8ed1a9f2)
+0. [Structured Concurrency](https://ericniebler.com/2020/11/08/structured-concurrency/)
diff --git a/stdlib/source/library/lux/control/concurrency/structured.lux b/stdlib/source/library/lux/control/concurrency/structured.lux
new file mode 100644
index 000000000..8c0aab11b
--- /dev/null
+++ b/stdlib/source/library/lux/control/concurrency/structured.lux
@@ -0,0 +1,337 @@
+(.require
+ [library
+ [lux (.except Scope or and)
+ [abstract
+ [functor (.only Functor)]
+ ["[0]" monad (.only Monad do)]]
+ [control
+ ["[0]" maybe (.use "[1]#[0]" monad)]
+ ["[0]" io (.only IO) (.use "[1]#[0]" monad)]]
+ [data
+ [collection
+ ["[0]" dictionary (.only Dictionary)]]]
+ [math
+ [number
+ ["[0]" nat]]]
+ [meta
+ [type (.only sharing)
+ [primitive (.except)]]]]]
+ ["[0]" //
+ ["[1]" async (.use "[1]#[0]" monad)]
+ ["[0]" thread]
+ ["[0]" atom (.only Atom)]])
+
+(primitive .public (Async value)
+ (Record
+ [#cancel! (IO Bit)
+ #async (//.Async (Maybe value))])
+
+ (def .public async
+ (All (_ value)
+ (-> (Async value)
+ (//.Async (Maybe value))))
+ (|>> representation (the #async)))
+
+ (def .public cancel!
+ (All (_ value)
+ (-> (Async value) (IO Bit)))
+ (|>> representation (the #cancel!)))
+
+ (with_template [<name> <pattern>]
+ [(def .public <name>
+ (All (_ value)
+ (-> (Async value)
+ (IO Bit)))
+ (|>> representation
+ (the #async)
+ //.value
+ (io#each (function (_ it)
+ (when it
+ <pattern>
+ true
+
+ _
+ false)))))]
+
+ [pending? {.#None}]
+ [cancelled? {.#Some {.#None}}]
+ [completed? {.#Some {.#Some _}}]
+ )
+
+ (def .public functor
+ (Functor Async)
+ (implementation
+ (def (each $ it)
+ (|> it
+ representation
+ (revised #async (//#each (maybe#each $)))
+ abstraction))))
+
+ (def .public monad
+ (Monad Async)
+ (implementation
+ (def functor ..functor)
+
+ (def in
+ (|>> maybe#in
+ //#in
+ [#cancel! (io#in false)
+ #async]
+ abstraction))
+
+ (def (conjoint !!it)
+ (let [!!it (representation !!it)]
+ (abstraction
+ [#cancel! (the #cancel! !!it)
+ #async (do [! //.monad]
+ [?!it (the #async !!it)]
+ (when ?!it
+ {.#Some !it}
+ (the #async (representation !it))
+
+ {.#None}
+ (in {.#None})))])))))
+
+ (def .public (and left right)
+ (All (_ left right)
+ (-> (Async left) (Async right)
+ (Async (And left right))))
+ (let [[read! write!] (sharing [left right]
+ (is [(Async left) (Async right)]
+ [left right])
+ (is [(//.Async (Maybe (And left right)))
+ (//.Resolver (Maybe (And left right)))]
+ (//.async [])))
+ _ (io.run! (//.upon! (function (_ left)
+ (when left
+ {.#None}
+ (write! {.#None})
+
+ {.#Some left}
+ (//.upon! (function (_ right)
+ (when right
+ {.#None}
+ (write! {.#None})
+
+ {.#Some right}
+ (write! {.#Some [left right]})))
+ (the #async (representation right)))))
+ (the #async (representation left))))]
+ (abstraction
+ [#cancel! (write! {.#None})
+ #async read!])))
+
+ (def .public (or left right)
+ (All (_ left right)
+ (-> (Async left) (Async right)
+ (Async (Or left right))))
+ (let [[left|right write!] (sharing [left right]
+ (is [(Async left) (Async right)]
+ [left right])
+ (is [(//.Async (Maybe (Either left right)))
+ (//.Resolver (Maybe (Either left right)))]
+ (//.async [])))]
+ (with_expansions
+ [<sides> (with_template [<side> <other> <tag>]
+ [(io.run! (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (write! {.#Some {<tag> it}})
+
+ {.#None}
+ (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (io#in [])
+
+ {.#None}
+ (write! {.#None})))
+ (the #async (representation <other>)))))
+ (the #async (representation <side>))))]
+
+ [left right .#Left]
+ [right left .#Right]
+ )]
+ (exec
+ <sides>
+ (abstraction
+ [#cancel! (write! {.#None})
+ #async left|right])))))
+
+ (def .public (either left right)
+ (All (_ value)
+ (-> (Async value) (Async value)
+ (Async value)))
+ (let [[left||right write!] (sharing [value]
+ (is (Async value)
+ left)
+ (is [(//.Async (Maybe value))
+ (//.Resolver (Maybe value))]
+ (//.async [])))]
+ (with_expansions
+ [<sides> (with_template [<side> <other>]
+ [(io.run! (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (write! {.#Some it})
+
+ {.#None}
+ (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (io#in [])
+
+ {.#None}
+ (write! {.#None})))
+ (the #async (representation <other>)))))
+ (the #async (representation <side>))))]
+
+ [left right]
+ [right left]
+ )]
+ (exec
+ <sides>
+ (abstraction
+ [#cancel! (write! {.#None})
+ #async left||right])))))
+
+ (type .public (Action value)
+ (-> (Async value)
+ (IO (Maybe value))))
+
+ (type ID
+ Nat)
+
+ (type Pending
+ (Dictionary ID (Ex (_ value) (Async value))))
+
+ (primitive .public (Scope value)
+ (Record
+ [#pending (Atom [ID Pending])
+ #itself (Async value)])
+
+ (def .public close!
+ (All (_ value)
+ (-> (Scope value)
+ (IO Bit)))
+ (|>> (representation Scope)
+ (the #itself)
+ (representation Async)
+ (the #cancel!)))
+
+ (def .public closed?
+ (All (_ value)
+ (-> (Scope value)
+ (IO Bit)))
+ (|>> (representation Scope)
+ (the #itself)
+ cancelled?))
+
+ (def .public (with_scope body)
+ (All (_ value)
+ (-> (-> (Scope value) (Async value))
+ (Async value)))
+ (let [[async resolve!] (sharing [value]
+ (is (-> (Scope value) (Async value))
+ body)
+ (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))]
+ (//.async [])))
+ pending (is (Atom [ID Pending])
+ (atom.atom [0 (dictionary.empty nat.hash)]))
+ close! (do [! io.monad]
+ [... Cancel all pending tasks.
+ [next pending] (atom.read! pending)
+ _ (monad.each ! cancel! (dictionary.values pending))]
+ (resolve! {.#None}))
+ scope (<| (abstraction Scope)
+ [#pending pending
+ #itself (<| (abstraction Async)
+ [#cancel! close!
+ #async async])])
+ it (body scope)]
+ (<| (abstraction Async)
+ [#cancel! close!
+ #async (exec
+ (|> (representation Async it)
+ (the #async)
+ (//.upon! (function (_ it)
+ (do io.monad
+ [_ (resolve! it)]
+ close!)))
+ io.run!)
+ async)])))
+
+ (def (un_register! scope id)
+ (All (_ value)
+ (-> (Scope value) ID (IO Any)))
+ (atom.update! (function (_ [next pending])
+ (|> pending
+ (dictionary.lacks id)
+ [next]))
+ (|> scope representation (the #pending))))
+
+ (def (register! scope it)
+ (All (_ value)
+ (-> (Scope value) (Async value) (IO Any)))
+ (do io.monad
+ [[[next _] _] (atom.update! (function (_ [next pending])
+ (|> pending
+ (dictionary.has next it)
+ [(++ next)]))
+ (|> scope representation (the #pending)))]
+ (|> it
+ (representation Async)
+ (the #async)
+ (//.upon! (function (_ _)
+ (un_register! scope next))))))
+
+ (def .public (schedule! scope milli_seconds action)
+ (All (_ value)
+ (-> (Scope value) Nat (Action value)
+ (Async value)))
+ (let [[async resolve!] (sharing [value]
+ (is (Action value)
+ action)
+ (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))]
+ (//.async [])))
+ cancel! (resolve! {.#None})
+ it (<| (abstraction Async)
+ [#cancel! cancel!
+ #async async])]
+ (exec
+ (<| io.run!
+ (is (IO Any))
+ (do [! io.monad]
+ [? (closed? scope)]
+ (if ?
+ cancel!
+ (do !
+ [_ (register! scope it)]
+ (<| (thread.schedule! milli_seconds)
+ (do !
+ [_ (in [])
+ value (action it)]
+ (resolve! value)))))))
+ it)))
+ )
+ )
+
+(def .public (future scope it)
+ (All (_ value)
+ (-> (Scope value) (Action value)
+ (Async value)))
+ (..schedule! scope 0 it))
+
+(def .public (after scope milli_seconds value)
+ (All (_ value)
+ (-> (Scope value) Nat value
+ (Async value)))
+ (..schedule! scope milli_seconds
+ (function (_ _)
+ (io#in {.#Some value}))))
+
+(def .public (delay scope milli_seconds)
+ (All (_ value)
+ (-> (Scope value) Nat
+ (Async Any)))
+ (..after scope milli_seconds []))
diff --git a/stdlib/source/library/lux/meta/compiler/version.lux b/stdlib/source/library/lux/meta/compiler/version.lux
index fa67df166..e576a7c29 100644
--- a/stdlib/source/library/lux/meta/compiler/version.lux
+++ b/stdlib/source/library/lux/meta/compiler/version.lux
@@ -30,9 +30,10 @@
(def .public major
(-> Version Nat)
- (|>> ..next ..next ..level))
+ (|>> ..next ..next))
-(def separator ".")
+(def separator
+ ".")
(def (padded value)
(-> Nat Text)
diff --git a/stdlib/source/test/lux/control.lux b/stdlib/source/test/lux/control.lux
index 4eba3a2a7..18ab1d19d 100644
--- a/stdlib/source/test/lux/control.lux
+++ b/stdlib/source/test/lux/control.lux
@@ -16,7 +16,8 @@
["[1]/[0]" stm]
["[1]/[0]" event]
["[1]/[0]" cps]
- ["[1]/[0]" incremental]]
+ ["[1]/[0]" incremental]
+ ["[1]/[0]" structured]]
["[1][0]" continuation]
["[1][0]" exception]
["[1][0]" function]
@@ -50,6 +51,7 @@
/concurrency/event.test
/concurrency/cps.test
/concurrency/incremental.test
+ /concurrency/structured.test
))
(def security
diff --git a/stdlib/source/test/lux/control/concurrency/structured.lux b/stdlib/source/test/lux/control/concurrency/structured.lux
new file mode 100644
index 000000000..cd42dd4f0
--- /dev/null
+++ b/stdlib/source/test/lux/control/concurrency/structured.lux
@@ -0,0 +1,240 @@
+(.require
+ [library
+ [lux (.except)
+ [abstract
+ ["[0]" monad (.only do)]
+ [\\specification
+ ["$[0]" functor (.only Injection Comparison)]
+ ["$[0]" monad]]]
+ [control
+ ["[0]" maybe (.use "[1]#[0]" functor)]
+ ["[0]" try]
+ ["[0]" io (.use "[1]#[0]" monad)]]
+ [data
+ [collection
+ ["[0]" list (.use "[1]#[0]" functor)]]]
+ [math
+ ["[0]" random]
+ [number
+ ["n" nat]]]
+ [test
+ ["_" property (.only Test)]
+ ["[0]" unit]]]]
+ [\\library
+ ["[0]" / (.only)
+ [//
+ ["[0]" atom (.only Atom)]
+ ["[0]" async]]]])
+
+(def injection
+ (Injection /.Async)
+ (at /.monad in))
+
+(def comparison
+ (Comparison /.Async)
+ (function (_ == left right)
+ (io.run!
+ (do io.monad
+ [?left (async.value (/.async left))
+ ?right (async.value (/.async right))]
+ (in (when [?left ?right]
+ [{.#Some {.#Some left}}
+ {.#Some {.#Some right}}]
+ (== left right)
+
+ _
+ false))))))
+
+(def (action _)
+ (-> [] [(Atom Bit) (/.Action Any)])
+ (let [completed? (is (Atom Bit)
+ (atom.atom false))]
+ [completed?
+ (function (_ it)
+ (do io.monad
+ [? (/.pending? it)]
+ (if ?
+ (do io.monad
+ [_ (atom.write! true completed?)]
+ (in {.#Some []}))
+ (io#in {.#Some []}))))]))
+
+(def .public test
+ Test
+ (do [! random.monad]
+ [short (at ! each (|>> (n.% 10) ++) random.nat)
+ long (at ! each (|>> (n.% 2) ++ (n.* 50)) random.nat)
+ leftE random.nat
+ rightE random.nat
+
+ in_parallel (at ! each (|>> (n.% 10) (n.+ 2)) random.nat)]
+ (<| (_.covering /._)
+ (_.for [/.Async])
+ (all _.and
+ (_.for [/.functor]
+ ($functor.spec ..injection ..comparison /.functor))
+ (_.for [/.monad]
+ ($monad.spec ..injection ..comparison /.monad))
+
+ (in (do async.monad
+ [leftA (<| /.async
+ (at /.monad in leftE))]
+ (unit.coverage [/.async]
+ (|> leftA
+ (maybe#each (same? leftE))
+ (maybe.else false)))))
+ (in (do async.monad
+ [? (<| /.async
+ /.with_scope
+ (function (_ scope))
+ (/.schedule! scope short (function (_ _) (io#in {.#Some true}))))]
+ (unit.coverage [/.schedule!]
+ (maybe.else false ?))))
+ (in (do async.monad
+ [_ (<| /.async
+ /.with_scope
+ (function (_ scope))
+ (/.future scope (function (_ _) (io#in {.#Some true}))))]
+ (unit.coverage [/.future]
+ true)))
+ (in (do async.monad
+ [_ (<| /.async
+ /.with_scope
+ (function (_ scope))
+ (/.after scope short []))]
+ (unit.coverage [/.after]
+ true)))
+ (in (do async.monad
+ [_ (<| /.async
+ /.with_scope
+ (function (_ scope))
+ (/.delay scope short))]
+ (unit.coverage [/.delay]
+ true)))
+ (in (do async.monad
+ [.let [all_cancelled (is (Atom Nat)
+ (atom.atom 0))
+ [done? done!] (is [(async.Async []) (async.Resolver [])]
+ (async.async []))
+ scope (<| /.with_scope
+ (function (_ scope))
+ (monad.all /.monad)
+ (list#each (function (_ _)
+ (/.schedule! scope long
+ (function (_ it)
+ (do [! io.monad]
+ [? (/.cancelled? it)]
+ (if ?
+ (do !
+ [[before after] (atom.update! ++ all_cancelled)
+ _ (if (n.= in_parallel after)
+ (done! [])
+ (in false))]
+ (in {.#Some []}))
+ (io#in {.#Some []})))))))
+ (list.repeated in_parallel []))
+ _ (io.run! (/.cancel! scope))]
+ _ (/.async scope)
+ _ done?
+ all_cancelled (async.future (atom.read! all_cancelled))]
+ (unit.coverage [/.with_scope]
+ (n.= in_parallel all_cancelled))))
+
+ (_.coverage [/.pending?]
+ (io.run! (/.pending? (<| /.with_scope
+ (function (_ scope))
+ (/.schedule! scope long (function (_ _) (io#in {.#Some []})))))))
+ (_.coverage [/.completed?]
+ (io.run! (/.completed? (at /.monad in []))))
+ (in (do async.monad
+ [.let [[done? done!] (is [(async.Async Bit) (async.Resolver Bit)]
+ (async.async []))
+ it (<| /.with_scope
+ (function (_ scope))
+ (/.schedule! scope long)
+ (function (_ it)
+ (do io.monad
+ [pre (/.cancel! it)
+ post (/.cancelled? it)
+ _ (done! (and pre post))]
+ (in {.#Some []}))))]
+ _ (/.async it)
+ ? done?]
+ (unit.coverage [/.cancel! /.cancelled?]
+ ?)))
+ (in (do async.monad
+ [.let [[done? done!] (is [(async.Async Bit) (async.Resolver Bit)]
+ (async.async []))
+
+ [@not_completed action] (..action [])
+ to_cancel (<| /.with_scope
+ (function (_ scope))
+ (/.schedule! scope long)
+ (function (_ it)
+ (do [! io.monad]
+ [pre (/.cancel! it)
+ _ (done! pre)]
+ (in {.#None}))))]
+ cancelled! done?
+ .let [confirmed! (io.run! (/.cancelled? to_cancel))]
+ _ (/.async to_cancel)
+ not_completed! (async.future (io#each not (atom.read! @not_completed)))
+
+ .let [[@completed action] (..action [])
+ to_complete (<| /.with_scope
+ (function (_ scope))
+ (/.schedule! scope long action))]
+ _ (/.async to_complete)
+ completed! (async.future (atom.read! @completed))]
+ (unit.coverage [/.Action]
+ (and cancelled!
+ confirmed!
+ not_completed!
+ completed!))))
+
+ (in (do async.monad
+ [left&right (/.async (with /.monad
+ (/.and (in leftE) (in rightE))))]
+ (unit.coverage [/.and]
+ (<| (maybe.else false)
+ (do maybe.monad
+ [[leftA rightA] left&right]
+ (in (and (same? leftE leftA)
+ (same? rightE rightA))))))))
+ (in (do [! async.monad]
+ [left (/.async (with /.monad
+ (/.or (in leftE) (in rightE))))
+ right (let [left (<| /.with_scope
+ (function (_ scope))
+ (/.schedule! scope long (function (_ _) (io#in {.#Some leftE}))))]
+ (do !
+ [_ (async.future (/.cancel! left))]
+ (/.async (/.or left (at /.monad in rightE)))))]
+ (unit.coverage [/.or]
+ (when [left right]
+ [{.#Some {.#Left leftA}}
+ {.#Some {.#Right rightA}}]
+ (and (same? leftE leftA)
+ (same? rightE rightA))
+
+ _
+ false))))
+ (in (do [! async.monad]
+ [left (/.async (with /.monad
+ (/.either (in leftE) (in rightE))))
+ right (let [left (<| /.with_scope
+ (function (_ scope))
+ (/.schedule! scope long (function (_ _) (io#in {.#Some leftE}))))]
+ (do !
+ [_ (async.future (/.cancel! left))]
+ (/.async (/.either left (at /.monad in rightE)))))]
+ (unit.coverage [/.either]
+ (when [left right]
+ [{.#Some leftA}
+ {.#Some rightA}]
+ (and (same? leftE leftA)
+ (same? rightE rightA))
+
+ _
+ false))))
+ ))))
diff --git a/stdlib/source/test/lux/meta/compiler/version.lux b/stdlib/source/test/lux/meta/compiler/version.lux
index 2129286ae..fd1df3bd7 100644
--- a/stdlib/source/test/lux/meta/compiler/version.lux
+++ b/stdlib/source/test/lux/meta/compiler/version.lux
@@ -6,11 +6,11 @@
[data
["[0]" bit (.use "[1]#[0]" equivalence)]
["[0]" text (.use "[1]#[0]" equivalence)
- ["%" \\format (.only format)]]]
+ ["%" \\format]]]
[math
["[0]" random (.only Random)]
[number
- ["n" nat]]]
+ ["[0]" nat]]]
[test
["_" property (.only Test)]]]]
[\\library
@@ -29,7 +29,7 @@
that ..random]
(`` (all _.and
(_.coverage [/.format]
- (bit#= (n.= this that)
+ (bit#= (nat.= this that)
(text#= (/.format this) (/.format that))))
(,, (with_template [<level>]
[(_.coverage [<level>]
diff --git a/stdlib/source/test/lux/meta/type/implicit.lux b/stdlib/source/test/lux/meta/type/implicit.lux
index 66daa4a32..233c121ca 100644
--- a/stdlib/source/test/lux/meta/type/implicit.lux
+++ b/stdlib/source/test/lux/meta/type/implicit.lux
@@ -35,13 +35,13 @@
left random.nat
right random.nat]
(all _.and
- (_.coverage [/.a/an]
+ (_.coverage [/.a/an /.a /.an]
(let [first_order!
(let [(open "list#[0]") (list.equivalence n.equivalence)]
(and (bit#= (at n.equivalence = left right)
- (/.a/an = left right))
+ (/.a = left right))
(list#= (at list.functor each ++ (enum.range n.enum start end))
- (/.a/an each ++ (enum.range n.enum start end)))))
+ (/.an each ++ (enum.range n.enum start end)))))
second_order!
(/.a/an =