aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library
diff options
context:
space:
mode:
authorEduardo Julian2022-08-25 16:13:32 -0400
committerEduardo Julian2022-08-25 16:13:32 -0400
commit156fac89df89669ee660bd075f516dd8d57abd19 (patch)
tree5f01f67d53b3313a27ec6048fe8f100a5a3b409d /stdlib/source/library
parentdc78af618f175ffc5e6a653256ca6b27a260fe83 (diff)
Added support for structured concurrency.
Diffstat (limited to 'stdlib/source/library')
-rw-r--r--stdlib/source/library/lux/control/concurrency/structured.lux337
-rw-r--r--stdlib/source/library/lux/meta/compiler/version.lux5
2 files changed, 340 insertions, 2 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/structured.lux b/stdlib/source/library/lux/control/concurrency/structured.lux
new file mode 100644
index 000000000..8c0aab11b
--- /dev/null
+++ b/stdlib/source/library/lux/control/concurrency/structured.lux
@@ -0,0 +1,337 @@
+(.require
+ [library
+ [lux (.except Scope or and)
+ [abstract
+ [functor (.only Functor)]
+ ["[0]" monad (.only Monad do)]]
+ [control
+ ["[0]" maybe (.use "[1]#[0]" monad)]
+ ["[0]" io (.only IO) (.use "[1]#[0]" monad)]]
+ [data
+ [collection
+ ["[0]" dictionary (.only Dictionary)]]]
+ [math
+ [number
+ ["[0]" nat]]]
+ [meta
+ [type (.only sharing)
+ [primitive (.except)]]]]]
+ ["[0]" //
+ ["[1]" async (.use "[1]#[0]" monad)]
+ ["[0]" thread]
+ ["[0]" atom (.only Atom)]])
+
+(primitive .public (Async value)
+ (Record
+ [#cancel! (IO Bit)
+ #async (//.Async (Maybe value))])
+
+ (def .public async
+ (All (_ value)
+ (-> (Async value)
+ (//.Async (Maybe value))))
+ (|>> representation (the #async)))
+
+ (def .public cancel!
+ (All (_ value)
+ (-> (Async value) (IO Bit)))
+ (|>> representation (the #cancel!)))
+
+ (with_template [<name> <pattern>]
+ [(def .public <name>
+ (All (_ value)
+ (-> (Async value)
+ (IO Bit)))
+ (|>> representation
+ (the #async)
+ //.value
+ (io#each (function (_ it)
+ (when it
+ <pattern>
+ true
+
+ _
+ false)))))]
+
+ [pending? {.#None}]
+ [cancelled? {.#Some {.#None}}]
+ [completed? {.#Some {.#Some _}}]
+ )
+
+ (def .public functor
+ (Functor Async)
+ (implementation
+ (def (each $ it)
+ (|> it
+ representation
+ (revised #async (//#each (maybe#each $)))
+ abstraction))))
+
+ (def .public monad
+ (Monad Async)
+ (implementation
+ (def functor ..functor)
+
+ (def in
+ (|>> maybe#in
+ //#in
+ [#cancel! (io#in false)
+ #async]
+ abstraction))
+
+ (def (conjoint !!it)
+ (let [!!it (representation !!it)]
+ (abstraction
+ [#cancel! (the #cancel! !!it)
+ #async (do [! //.monad]
+ [?!it (the #async !!it)]
+ (when ?!it
+ {.#Some !it}
+ (the #async (representation !it))
+
+ {.#None}
+ (in {.#None})))])))))
+
+ (def .public (and left right)
+ (All (_ left right)
+ (-> (Async left) (Async right)
+ (Async (And left right))))
+ (let [[read! write!] (sharing [left right]
+ (is [(Async left) (Async right)]
+ [left right])
+ (is [(//.Async (Maybe (And left right)))
+ (//.Resolver (Maybe (And left right)))]
+ (//.async [])))
+ _ (io.run! (//.upon! (function (_ left)
+ (when left
+ {.#None}
+ (write! {.#None})
+
+ {.#Some left}
+ (//.upon! (function (_ right)
+ (when right
+ {.#None}
+ (write! {.#None})
+
+ {.#Some right}
+ (write! {.#Some [left right]})))
+ (the #async (representation right)))))
+ (the #async (representation left))))]
+ (abstraction
+ [#cancel! (write! {.#None})
+ #async read!])))
+
+ (def .public (or left right)
+ (All (_ left right)
+ (-> (Async left) (Async right)
+ (Async (Or left right))))
+ (let [[left|right write!] (sharing [left right]
+ (is [(Async left) (Async right)]
+ [left right])
+ (is [(//.Async (Maybe (Either left right)))
+ (//.Resolver (Maybe (Either left right)))]
+ (//.async [])))]
+ (with_expansions
+ [<sides> (with_template [<side> <other> <tag>]
+ [(io.run! (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (write! {.#Some {<tag> it}})
+
+ {.#None}
+ (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (io#in [])
+
+ {.#None}
+ (write! {.#None})))
+ (the #async (representation <other>)))))
+ (the #async (representation <side>))))]
+
+ [left right .#Left]
+ [right left .#Right]
+ )]
+ (exec
+ <sides>
+ (abstraction
+ [#cancel! (write! {.#None})
+ #async left|right])))))
+
+ (def .public (either left right)
+ (All (_ value)
+ (-> (Async value) (Async value)
+ (Async value)))
+ (let [[left||right write!] (sharing [value]
+ (is (Async value)
+ left)
+ (is [(//.Async (Maybe value))
+ (//.Resolver (Maybe value))]
+ (//.async [])))]
+ (with_expansions
+ [<sides> (with_template [<side> <other>]
+ [(io.run! (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (write! {.#Some it})
+
+ {.#None}
+ (//.upon! (function (_ it)
+ (when it
+ {.#Some it}
+ (io#in [])
+
+ {.#None}
+ (write! {.#None})))
+ (the #async (representation <other>)))))
+ (the #async (representation <side>))))]
+
+ [left right]
+ [right left]
+ )]
+ (exec
+ <sides>
+ (abstraction
+ [#cancel! (write! {.#None})
+ #async left||right])))))
+
+ (type .public (Action value)
+ (-> (Async value)
+ (IO (Maybe value))))
+
+ (type ID
+ Nat)
+
+ (type Pending
+ (Dictionary ID (Ex (_ value) (Async value))))
+
+ (primitive .public (Scope value)
+ (Record
+ [#pending (Atom [ID Pending])
+ #itself (Async value)])
+
+ (def .public close!
+ (All (_ value)
+ (-> (Scope value)
+ (IO Bit)))
+ (|>> (representation Scope)
+ (the #itself)
+ (representation Async)
+ (the #cancel!)))
+
+ (def .public closed?
+ (All (_ value)
+ (-> (Scope value)
+ (IO Bit)))
+ (|>> (representation Scope)
+ (the #itself)
+ cancelled?))
+
+ (def .public (with_scope body)
+ (All (_ value)
+ (-> (-> (Scope value) (Async value))
+ (Async value)))
+ (let [[async resolve!] (sharing [value]
+ (is (-> (Scope value) (Async value))
+ body)
+ (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))]
+ (//.async [])))
+ pending (is (Atom [ID Pending])
+ (atom.atom [0 (dictionary.empty nat.hash)]))
+ close! (do [! io.monad]
+ [... Cancel all pending tasks.
+ [next pending] (atom.read! pending)
+ _ (monad.each ! cancel! (dictionary.values pending))]
+ (resolve! {.#None}))
+ scope (<| (abstraction Scope)
+ [#pending pending
+ #itself (<| (abstraction Async)
+ [#cancel! close!
+ #async async])])
+ it (body scope)]
+ (<| (abstraction Async)
+ [#cancel! close!
+ #async (exec
+ (|> (representation Async it)
+ (the #async)
+ (//.upon! (function (_ it)
+ (do io.monad
+ [_ (resolve! it)]
+ close!)))
+ io.run!)
+ async)])))
+
+ (def (un_register! scope id)
+ (All (_ value)
+ (-> (Scope value) ID (IO Any)))
+ (atom.update! (function (_ [next pending])
+ (|> pending
+ (dictionary.lacks id)
+ [next]))
+ (|> scope representation (the #pending))))
+
+ (def (register! scope it)
+ (All (_ value)
+ (-> (Scope value) (Async value) (IO Any)))
+ (do io.monad
+ [[[next _] _] (atom.update! (function (_ [next pending])
+ (|> pending
+ (dictionary.has next it)
+ [(++ next)]))
+ (|> scope representation (the #pending)))]
+ (|> it
+ (representation Async)
+ (the #async)
+ (//.upon! (function (_ _)
+ (un_register! scope next))))))
+
+ (def .public (schedule! scope milli_seconds action)
+ (All (_ value)
+ (-> (Scope value) Nat (Action value)
+ (Async value)))
+ (let [[async resolve!] (sharing [value]
+ (is (Action value)
+ action)
+ (is [(//.Async (Maybe value)) (//.Resolver (Maybe value))]
+ (//.async [])))
+ cancel! (resolve! {.#None})
+ it (<| (abstraction Async)
+ [#cancel! cancel!
+ #async async])]
+ (exec
+ (<| io.run!
+ (is (IO Any))
+ (do [! io.monad]
+ [? (closed? scope)]
+ (if ?
+ cancel!
+ (do !
+ [_ (register! scope it)]
+ (<| (thread.schedule! milli_seconds)
+ (do !
+ [_ (in [])
+ value (action it)]
+ (resolve! value)))))))
+ it)))
+ )
+ )
+
+(def .public (future scope it)
+ (All (_ value)
+ (-> (Scope value) (Action value)
+ (Async value)))
+ (..schedule! scope 0 it))
+
+(def .public (after scope milli_seconds value)
+ (All (_ value)
+ (-> (Scope value) Nat value
+ (Async value)))
+ (..schedule! scope milli_seconds
+ (function (_ _)
+ (io#in {.#Some value}))))
+
+(def .public (delay scope milli_seconds)
+ (All (_ value)
+ (-> (Scope value) Nat
+ (Async Any)))
+ (..after scope milli_seconds []))
diff --git a/stdlib/source/library/lux/meta/compiler/version.lux b/stdlib/source/library/lux/meta/compiler/version.lux
index fa67df166..e576a7c29 100644
--- a/stdlib/source/library/lux/meta/compiler/version.lux
+++ b/stdlib/source/library/lux/meta/compiler/version.lux
@@ -30,9 +30,10 @@
(def .public major
(-> Version Nat)
- (|>> ..next ..next ..level))
+ (|>> ..next ..next))
-(def separator ".")
+(def separator
+ ".")
(def (padded value)
(-> Nat Text)