From f6e280bd4ab41d12083c0eef2c823ad3962d6a04 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sat, 15 Dec 2018 12:39:15 -0400 Subject: Moved the "lux/concurrency" modules under "lux/control". --- stdlib/source/lux/cli.lux | 52 +-- stdlib/source/lux/concurrency/actor.lux | 377 --------------------- stdlib/source/lux/concurrency/atom.lux | 59 ---- stdlib/source/lux/concurrency/frp.lux | 132 -------- stdlib/source/lux/concurrency/process.lux | 110 ------ stdlib/source/lux/concurrency/promise.lux | 174 ---------- stdlib/source/lux/concurrency/semaphore.lux | 149 -------- stdlib/source/lux/concurrency/stm.lux | 245 ------------- stdlib/source/lux/concurrency/task.lux | 82 ----- stdlib/source/lux/control/concurrency/actor.lux | 377 +++++++++++++++++++++ stdlib/source/lux/control/concurrency/atom.lux | 59 ++++ stdlib/source/lux/control/concurrency/frp.lux | 132 ++++++++ stdlib/source/lux/control/concurrency/process.lux | 110 ++++++ stdlib/source/lux/control/concurrency/promise.lux | 174 ++++++++++ .../source/lux/control/concurrency/semaphore.lux | 149 ++++++++ stdlib/source/lux/control/concurrency/stm.lux | 245 +++++++++++++ stdlib/source/lux/control/concurrency/task.lux | 82 +++++ stdlib/source/lux/test.lux | 8 +- stdlib/source/lux/world/console.lux | 4 +- stdlib/source/lux/world/file.lux | 4 +- stdlib/source/lux/world/net/tcp.jvm.lux | 8 +- stdlib/source/lux/world/net/udp.jvm.lux | 6 +- stdlib/test/test/lux/concurrency/actor.lux | 75 ---- stdlib/test/test/lux/concurrency/atom.lux | 34 -- stdlib/test/test/lux/concurrency/frp.lux | 122 ------- stdlib/test/test/lux/concurrency/promise.lux | 72 ---- stdlib/test/test/lux/concurrency/semaphore.lux | 143 -------- stdlib/test/test/lux/concurrency/stm.lux | 77 ----- stdlib/test/test/lux/control/concurrency/actor.lux | 75 ++++ stdlib/test/test/lux/control/concurrency/atom.lux | 34 ++ stdlib/test/test/lux/control/concurrency/frp.lux | 122 +++++++ .../test/test/lux/control/concurrency/promise.lux | 72 ++++ .../test/lux/control/concurrency/semaphore.lux | 143 ++++++++ stdlib/test/test/lux/control/concurrency/stm.lux | 77 +++++ 34 files changed, 1892 insertions(+), 1892 deletions(-) delete mode 100644 stdlib/source/lux/concurrency/actor.lux delete mode 100644 stdlib/source/lux/concurrency/atom.lux delete mode 100644 stdlib/source/lux/concurrency/frp.lux delete mode 100644 stdlib/source/lux/concurrency/process.lux delete mode 100644 stdlib/source/lux/concurrency/promise.lux delete mode 100644 stdlib/source/lux/concurrency/semaphore.lux delete mode 100644 stdlib/source/lux/concurrency/stm.lux delete mode 100644 stdlib/source/lux/concurrency/task.lux create mode 100644 stdlib/source/lux/control/concurrency/actor.lux create mode 100644 stdlib/source/lux/control/concurrency/atom.lux create mode 100644 stdlib/source/lux/control/concurrency/frp.lux create mode 100644 stdlib/source/lux/control/concurrency/process.lux create mode 100644 stdlib/source/lux/control/concurrency/promise.lux create mode 100644 stdlib/source/lux/control/concurrency/semaphore.lux create mode 100644 stdlib/source/lux/control/concurrency/stm.lux create mode 100644 stdlib/source/lux/control/concurrency/task.lux delete mode 100644 stdlib/test/test/lux/concurrency/actor.lux delete mode 100644 stdlib/test/test/lux/concurrency/atom.lux delete mode 100644 stdlib/test/test/lux/concurrency/frp.lux delete mode 100644 stdlib/test/test/lux/concurrency/promise.lux delete mode 100644 stdlib/test/test/lux/concurrency/semaphore.lux delete mode 100644 stdlib/test/test/lux/concurrency/stm.lux create mode 100644 stdlib/test/test/lux/control/concurrency/actor.lux create mode 100644 stdlib/test/test/lux/control/concurrency/atom.lux create mode 100644 stdlib/test/test/lux/control/concurrency/frp.lux create mode 100644 stdlib/test/test/lux/control/concurrency/promise.lux create mode 100644 stdlib/test/test/lux/control/concurrency/semaphore.lux create mode 100644 stdlib/test/test/lux/control/concurrency/stm.lux diff --git a/stdlib/source/lux/cli.lux b/stdlib/source/lux/cli.lux index 0ac9ff0bd..34514b5b9 100644 --- a/stdlib/source/lux/cli.lux +++ b/stdlib/source/lux/cli.lux @@ -2,22 +2,22 @@ [lux #* [control monad - ["p" parser]] + ["p" parser] + [concurrency + ["." process]]] [data [collection [list ("list/." Monoid Monad)]] ["." text ("text/." Equivalence) format] - ["E" error]] + ["." error (#+ Error)]] [macro (#+ with-gensyms) ["." code] ["s" syntax (#+ syntax: Syntax)]] [platform [compiler ["." host]]] - ["." io] - [concurrency - ["." process]]]) + ["." io]]) ## [Types] (type: #export (CLI a) @@ -26,18 +26,18 @@ ## [Combinators] (def: #export (run inputs parser) - (All [a] (-> (List Text) (CLI a) (E.Error a))) + (All [a] (-> (List Text) (CLI a) (Error a))) (case (p.run inputs parser) - (#E.Success [remaining output]) + (#error.Success [remaining output]) (case remaining #.Nil - (#E.Success output) + (#error.Success output) _ - (#E.Error (format "Remaining CLI inputs: " (text.join-with " " remaining)))) + (#error.Error (format "Remaining CLI inputs: " (text.join-with " " remaining)))) - (#E.Error error) - (#E.Error error))) + (#error.Error error) + (#error.Error error))) (def: #export any {#.doc "Just returns the next input without applying any logic."} @@ -45,16 +45,16 @@ (function (_ inputs) (case inputs (#.Cons arg inputs') - (#E.Success [inputs' arg]) + (#error.Success [inputs' arg]) _ - (#E.Error "Cannot parse empty arguments.")))) + (#error.Error "Cannot parse empty arguments.")))) (def: #export (parse parser) {#.doc "Parses the next input with a parsing function."} - (All [a] (-> (-> Text (E.Error a)) (CLI a))) + (All [a] (-> (-> Text (Error a)) (CLI a))) (function (_ inputs) - (do E.Monad + (do error.Monad [[remaining raw] (any inputs) output (parser raw)] (wrap [remaining output])))) @@ -63,11 +63,11 @@ {#.doc "Checks that a token is in the inputs."} (-> Text (CLI Any)) (function (_ inputs) - (do E.Monad + (do error.Monad [[remaining raw] (any inputs)] (if (text/= reference raw) (wrap [remaining []]) - (E.fail (format "Missing token: '" reference "'")))))) + (error.fail (format "Missing token: '" reference "'")))))) (def: #export (somewhere cli) {#.doc "Given a parser, tries to parse it somewhere in the inputs (i.e. not necessarily parsing the immediate inputs)."} @@ -75,16 +75,16 @@ (function (_ inputs) (loop [immediate inputs] (case (p.run immediate cli) - (#E.Success [remaining output]) - (#E.Success [remaining output]) + (#error.Success [remaining output]) + (#error.Success [remaining output]) - (#E.Error error) + (#error.Error error) (case immediate #.Nil - (#E.Error error) + (#error.Error error) (#.Cons to-omit immediate') - (do E.Monad + (do error.Monad [[remaining output] (recur immediate')] (wrap [(#.Cons to-omit remaining) output]))))))) @@ -94,8 +94,8 @@ (CLI Any) (function (_ inputs) (case inputs - #.Nil (#E.Success [inputs []]) - _ (#E.Error (format "Unknown parameters: " (text.join-with " " inputs)))))) + #.Nil (#error.Success [inputs []]) + _ (#error.Error (format "Unknown parameters: " (text.join-with " " inputs)))))) (def: #export (named name value) (All [a] (-> Text (CLI a) (CLI a))) @@ -168,10 +168,10 @@ (` process.run!)))))] ((~' wrap) (~ g!output)))))) (~ g!args)) - (#E.Success [(~ g!_) (~ g!output)]) + (#error.Success [(~ g!_) (~ g!output)]) (~ g!output) - (#E.Error (~ g!message)) + (#error.Error (~ g!message)) (.error! (~ g!message)) )))) ))) diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux deleted file mode 100644 index 0af0d09f9..000000000 --- a/stdlib/source/lux/concurrency/actor.lux +++ /dev/null @@ -1,377 +0,0 @@ -(.module: {#.doc "The actor model of concurrency."} - [lux #* - [control monad - ["p" parser] - ["ex" exception (#+ exception:)]] - ["." io (#- run) ("io/." Monad)] - [data - ["." product] - ["e" error] - [text - format] - [collection - ["." list ("list/." Monoid Monad Fold)]]] - ["." macro (#+ with-gensyms Monad) - ["." code] - ["s" syntax (#+ syntax: Syntax)] - [syntax - ["cs" common] - [common - ["csr" reader] - ["csw" writer]]]] - [type - abstract]] - [// - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise promise) ("promise/." Monad)] - ["." task (#+ Task)]]) - -(exception: #export poisoned) - -(exception: #export (dead {actor-name Text} - {message-name Text}) - (ex.report ["Actor" actor-name] - ["Message" message-name])) - -## [Types] -(with-expansions - [ (as-is (-> s (Actor s) (Task s))) - (as-is [Text s (List )]) - (as-is (Rec Mailbox (Promise [ Mailbox])))] - - (def: (obituary mailbox) - (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) - (case (promise.poll mailbox) - (#.Some [head tail]) - (#.Cons head (obituary tail)) - - #.None - #.Nil)) - - (abstract: #export (Actor s) - {#.doc "An actor, defined as all the necessities it requires."} - {#mailbox (Atom ) - #obituary (Promise )} - - ## TODO: Delete after new-luxc becomes the new standard compiler. - (def: (actor mailbox obituary) - (All [s] (-> (Atom ) (Promise ) (Actor s))) - (:abstraction {#mailbox mailbox - #obituary obituary})) - - (type: #export (Message s) - ) - - (type: #export (Obituary s) - ) - - (type: #export (Behavior s) - {#.doc "An actor's behavior when messages are received."} - {#handle (-> (Message s) s (Actor s) (Task s)) - #end (-> Text s (Promise Any))}) - - (def: #export (spawn behavior init) - {#.doc "Given a behavior and initial state, spawns an actor and returns it."} - (All [s] (-> (Behavior s) s (IO (Actor s)))) - (io (let [[handle end] behavior - self (actor (atom (promise #.None)) - (promise #.None)) - process (loop [state init - |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))] - (do promise.Monad - [[head tail] |mailbox| - ?state' (handle head state self)] - (case ?state' - (#e.Error error) - (do @ - [_ (end error state)] - (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] - (get@ #obituary (:representation self)))) - (wrap []))) - - (#e.Success state') - (recur state' tail))))] - self))) - - (def: #export (alive? actor) - (All [s] (-> (Actor s) Bit)) - (case (promise.poll (get@ #obituary (:representation actor))) - #.None - #1 - - _ - #0)) - - (def: #export (send message actor) - {#.doc "Communicate with an actor through message passing."} - (All [s] (-> (Message s) (Actor s) (IO Bit))) - (if (alive? actor) - (let [entry [message (promise #.None)]] - (do Monad - [|mailbox| (atom.read (get@ #mailbox (:representation actor)))] - (loop [|mailbox| |mailbox|] - (case (promise.poll |mailbox|) - #.None - (do @ - [resolved? (promise.resolve entry |mailbox|)] - (if resolved? - (do @ - [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] - (wrap #1)) - (recur |mailbox|))) - - (#.Some [_ |mailbox|']) - (recur |mailbox|'))))) - (io/wrap #0))) - )) - -## [Values] -(def: (default-handle message state self) - (All [s] (-> (Message s) s (Actor s) (Task s))) - (message state self)) - -(def: (default-end cause state) - (All [s] (-> Text s (Promise Any))) - (promise/wrap [])) - -(def: #export default-behavior - (All [s] (Behavior s)) - {#handle default-handle - #end default-end}) - -(def: #export (poison actor) - {#.doc (doc "Kills the actor by sending a message that will kill it upon processing," - "but allows the actor to handle previous messages.")} - (All [s] (-> (Actor s) (IO Bit))) - (send (function (_ state self) - (task.throw poisoned [])) - actor)) - -## [Syntax] -(do-template [ ] - [(def: #export ( name) - (-> Name cs.Annotations cs.Annotations) - (|>> (#.Cons [(name-of ) - (code.tag name)]))) - - (def: #export ( name) - (-> Name (Meta Name)) - (do Monad - [[_ annotations _] (macro.find-def name)] - (case (macro.get-tag-ann (name-of ) annotations) - (#.Some actor-name) - (wrap actor-name) - - _ - (macro.fail (format "Definition is not " ".")))))] - - [with-actor resolve-actor #..actor "an actor"] - [with-message resolve-message #..message "a message"] - ) - -(def: actor-decl^ - (Syntax [Text (List Text)]) - (p.either (s.form (p.and s.local-identifier (p.some s.local-identifier))) - (p.and s.local-identifier (:: p.Monad wrap (list))))) - -(do-template [ ] - [(def: #export - (-> Text Text) - (|>> (format "@")))] - - [state-name "State"] - [behavior-name "Behavior"] - [new-name "new"] - ) - -(type: HandleC - [[Text Text Text] Code]) - -(type: StopC - [[Text Text] Code]) - -(type: BehaviorC - [(Maybe HandleC) (Maybe StopC)]) - -(def: behavior^ - (s.Syntax BehaviorC) - (let [handle-args ($_ p.and s.local-identifier s.local-identifier s.local-identifier) - stop-args ($_ p.and s.local-identifier s.local-identifier)] - (p.and (p.maybe (s.form (p.and (s.form (p.after (s.this (' handle)) handle-args)) - s.any))) - (p.maybe (s.form (p.and (s.form (p.after (s.this (' stop)) stop-args)) - s.any)))))) - -(syntax: #export (actor: - {export csr.export} - {[_name _vars] actor-decl^} - {annotations (p.default cs.empty-annotations csr.annotations)} - state-type - {[?handle ?stop] behavior^}) - {#.doc (doc "Defines an actor, with its behavior and internal state." - (actor: #export Counter - Nat - - ((stop cause state) - (:: promise.Monad wrap - (log! (if (ex.match? ..poisoned cause) - (format "Counter was poisoned: " (%n state)) - cause))))) - - (actor: #export (Stack a) - (List a) - - ((handle message state self) - (do task.Monad - [#let [_ (log! "BEFORE")] - output (message state self) - #let [_ (log! "AFTER")]] - (wrap output)))))} - (with-gensyms [g!_ g!init] - (do @ - [module macro.current-module-name - #let [g!type (code.local-identifier (state-name _name)) - g!behavior (code.local-identifier (behavior-name _name)) - g!actor (code.local-identifier _name) - g!new (code.local-identifier (new-name _name)) - g!vars (list/map code.local-identifier _vars)]] - (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars)) - (~ state-type))) - (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars)) - (~ (|> annotations - (with-actor [module _name]) - csw.annotations)) - (..Actor ((~ g!type) (~+ g!vars))))) - (` (def: (~+ (csw.export export)) (~ g!behavior) - (All [(~+ g!vars)] - (..Behavior ((~ g!type) (~+ g!vars)))) - {#..handle (~ (case ?handle - #.None - (` (~! ..default-handle)) - - (#.Some [[messageN stateN selfN] bodyC]) - (` (function ((~ g!_) - (~ (code.local-identifier messageN)) - (~ (code.local-identifier stateN)) - (~ (code.local-identifier selfN))) - (do task.Monad - [] - (~ bodyC)))))) - #..end (~ (case ?stop - #.None - (` (~! ..default-end)) - - (#.Some [[causeN stateN] bodyC]) - (` (function ((~ g!_) - (~ (code.local-identifier causeN)) - (~ (code.local-identifier stateN))) - (do promise.Monad - [] - (~ bodyC))))))})) - (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) - (All [(~+ g!vars)] - (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) - (..spawn (~ g!behavior) (~ g!init)))))) - ))) - -(type: Signature - {#vars (List Text) - #name Text - #inputs (List cs.Typed-Input) - #state Text - #self Text - #output Code}) - -(def: signature^ - (s.Syntax Signature) - (s.form ($_ p.and - (p.default (list) (s.tuple (p.some s.local-identifier))) - s.local-identifier - (p.some csr.typed-input) - s.local-identifier - s.local-identifier - s.any))) - -(def: reference^ - (s.Syntax [Name (List Text)]) - (p.either (s.form (p.and s.identifier (p.some s.local-identifier))) - (p.and s.identifier (:: p.Monad wrap (list))))) - -(syntax: #export (message: - {export csr.export} - {[actor-name actor-vars] reference^} - {signature signature^} - {annotations (p.default cs.empty-annotations csr.annotations)} - body) - {#.doc (doc "A message can access the actor's state through the state parameter." - "A message can also access the actor itself through the self parameter." - "A message's output must be a task containing a 2-tuple with the updated state and a return value." - "A message may succeed or fail (in case of failure, the actor dies)." - - (message: #export Counter - (count! [increment Nat] state self Nat) - (let [state' (n/+ increment state)] - (task.return [state' state']))) - - (message: #export (Stack a) - (push [value a] state self (List a)) - (let [state' (#.Cons value state)] - (task.return [state' state']))))} - (with-gensyms [g!_ g!return g!error g!task g!sent?] - (do @ - [current-module macro.current-module-name - actor-name (resolve-actor actor-name) - #let [message-name [current-module (get@ #name signature)] - g!type (code.identifier (product.both id state-name actor-name)) - g!message (code.local-identifier (get@ #name signature)) - g!actor-vars (list/map code.local-identifier actor-vars) - actorC (` ((~ (code.identifier actor-name)) (~+ g!actor-vars))) - g!all-vars (|> (get@ #vars signature) (list/map code.local-identifier) (list/compose g!actor-vars)) - g!inputsC (|> (get@ #inputs signature) (list/map product.left)) - g!inputsT (|> (get@ #inputs signature) (list/map product.right)) - g!state (|> signature (get@ #state) code.local-identifier) - g!self (|> signature (get@ #self) code.local-identifier) - g!actor-refs (: (List Code) - (if (list.empty? actor-vars) - (list) - (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`)))))) - ref-replacements (|> (if (list.empty? actor-vars) - (list) - (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`))))) - (: (List Code)) - (list.zip2 g!all-vars) - (: (List [Code Code]))) - g!outputT (list/fold (function (_ [g!var g!ref] outputT) - (code.replace g!var g!ref outputT)) - (get@ #output signature) - ref-replacements)]] - (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self)) - (~ (|> annotations - (with-message actor-name) - csw.annotations)) - (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) - (let [(~ g!task) (task.task (~ g!outputT))] - (io.run (do io.Monad - [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) - (do promise.Monad - [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) - (~ g!outputT)]) - (do task.Monad - [] - (~ body)))] - (case (~ g!return) - (#.Right [(~ g!state) (~ g!return)]) - (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) - (task.return (~ g!state))) - - (#.Left (~ g!error)) - (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) - (task.fail (~ g!error)))) - )) - (~ g!self))] - (if (~ g!sent?) - ((~' wrap) (~ g!task)) - ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name))) - (~ (code.text (%name message-name)))])))))))) - )) - ))) diff --git a/stdlib/source/lux/concurrency/atom.lux b/stdlib/source/lux/concurrency/atom.lux deleted file mode 100644 index b1692b6e3..000000000 --- a/stdlib/source/lux/concurrency/atom.lux +++ /dev/null @@ -1,59 +0,0 @@ -(.module: - [lux #* - [control - [monad (#+ do)]] - ["." function] - ["." io (#- run)] - [type - abstract] - [platform - [compiler - ["." host]]] - [host (#+ import:)]]) - -(`` (for {(~~ (static host.jvm)) - (import: (java/util/concurrent/atomic/AtomicReference a) - (new [a]) - (get [] a) - (compareAndSet [a a] boolean))})) - -(`` (abstract: #export (Atom a) - {#.doc "Atomic references that are safe to mutate concurrently."} - - (for {(~~ (static host.jvm)) - (AtomicReference a)}) - - (def: #export (atom value) - (All [a] (-> a (Atom a))) - (:abstraction (for {(~~ (static host.jvm)) - (AtomicReference::new value)}))) - - (def: #export (read atom) - (All [a] (-> (Atom a) (IO a))) - (io (for {(~~ (static host.jvm)) - (AtomicReference::get (:representation atom))}))) - - (def: #export (compare-and-swap current new atom) - {#.doc (doc "Only mutates an atom if you can present it's current value." - "That guarantees that atom was not updated since you last read from it.")} - (All [a] (-> a a (Atom a) (IO Bit))) - (io (AtomicReference::compareAndSet current new (:representation atom)))) - )) - -(def: #export (update f atom) - {#.doc (doc "Updates an atom by applying a function to its current value." - "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds." - "The retries will be done with the new values of the atom, as they show up.")} - (All [a] (-> (-> a a) (Atom a) (IO a))) - (loop [_ []] - (do io.Monad - [old (read atom) - #let [new (f old)] - swapped? (compare-and-swap old new atom)] - (if swapped? - (wrap new) - (recur []))))) - -(def: #export (write value atom) - (All [a] (-> a (Atom a) (IO Any))) - (update (function.constant value) atom)) diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux deleted file mode 100644 index 8db54f28f..000000000 --- a/stdlib/source/lux/concurrency/frp.lux +++ /dev/null @@ -1,132 +0,0 @@ -(.module: - [lux #* - [control - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ do Monad)]] - ["." io (#+ IO io)] - [data - [collection - [list ("list/." Monoid)]]] - [type (#+ :share) - abstract]] - [// - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise)]]) - -(abstract: #export (Channel a) - {#.doc "An asynchronous channel to distribute values."} - (Atom (List (-> a (IO Any)))) - - (def: #export (channel _) - (All [a] (-> Any (Channel a))) - (:abstraction (atom (list)))) - - (def: #export (listen listener channel) - (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) - ## TODO: Simplify when possible. - (do io.Monad - [_ (atom.update (|>> (#.Cons listener)) - (:representation channel))] - (wrap []))) - - (def: #export (publish channel value) - {#.doc "Publish to a channel."} - (All [a] (-> (Channel a) a (IO Any))) - (do io.Monad - [listeners (atom.read (:representation channel))] - (monad.map @ (function (_ listener) (listener value)) listeners))) - ) - -(def: #export (filter predicate input) - (All [a] (-> (-> a Bit) (Channel a) (Channel a))) - (let [output (channel [])] - (exec (io.run (listen (function (_ value) - (if (predicate value) - (publish output value) - (io []))) - input)) - output))) - -(def: #export (pipe output input) - {#.doc "Copy/pipe the contents of a channel on to another."} - (All [a] (-> (Channel a) (Channel a) (IO Any))) - (listen (publish output) - input)) - -(def: #export (merge inputs) - {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} - (All [a] (-> (List (Channel a)) (IO (Channel a)))) - (let [output (channel [])] - (do io.Monad - [_ (monad.map @ (pipe output) inputs)] - (wrap output)))) - -(def: #export (from-promise promise) - (All [a] (-> (Promise a) (Channel a))) - (let [output (channel [])] - (exec (promise.await (publish output) promise) - output))) - -(def: #export (poll time action) - (All [a] (-> Nat (IO a) (Channel a))) - (let [output (channel [])] - (exec (io.run - (loop [_ []] - (do io.Monad - [value action - _ (publish output value)] - (wrap (promise.await recur (promise.wait time)))))) - output))) - -(def: #export (periodic time) - (-> Nat (Channel Any)) - (let [output (channel [])] - (exec (io.run - (loop [_ []] - (do io.Monad - [_ (publish output [])] - (wrap (promise.await recur (promise.wait time)))))) - output))) - -(def: #export (iterate f init) - (All [a] (-> (-> a (Promise a)) a (Channel a))) - (let [output (channel [])] - (exec (io.run - (loop [zero init] - (do io.Monad - [_ (publish output zero)] - (wrap (promise.await recur (f zero)))))) - output))) - -(structure: #export _ (Functor Channel) - (def: (map f input) - (let [output (channel [])] - (exec (io.run (listen (|>> f (publish output)) - input)) - output)))) - -(structure: #export _ (Apply Channel) - (def: functor Functor) - - (def: (apply ff fa) - (let [output (channel [])] - (exec (io.run (listen (function (_ f) - (listen (|>> f (publish output)) - fa)) - ff)) - output)))) - -(structure: #export _ (Monad Channel) - (def: functor Functor) - - (def: (wrap a) - (let [output (channel [])] - (exec (io.run (publish output a)) - output))) - - (def: (join mma) - (let [output (channel [])] - (exec (io.run (listen (listen (publish output)) - mma)) - output)))) diff --git a/stdlib/source/lux/concurrency/process.lux b/stdlib/source/lux/concurrency/process.lux deleted file mode 100644 index a67734747..000000000 --- a/stdlib/source/lux/concurrency/process.lux +++ /dev/null @@ -1,110 +0,0 @@ -(.module: - [lux #* - [control - ["." monad (#+ do)] - ["ex" exception (#+ exception:)]] - [data - [collection - ["." list]]] - [platform - [compiler - ["." host]]] - ["." io (#+ IO io)] - [host (#+ import: object)]] - [// - ["." atom (#+ Atom)]]) - -(`` (for {(~~ (static host.jvm)) - (as-is (import: java/lang/Object) - - (import: java/lang/Runtime - (#static getRuntime [] Runtime) - (availableProcessors [] int)) - - (import: java/lang/Runnable) - - (import: java/util/concurrent/TimeUnit - (#enum MILLISECONDS)) - - (import: java/util/concurrent/Executor - (execute [Runnable] #io void)) - - (import: (java/util/concurrent/ScheduledFuture a)) - - (import: java/util/concurrent/ScheduledThreadPoolExecutor - (new [int]) - (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} - - ## Default - (type: Process - {#creation Nat - #delay Nat - #action (IO Any)}) - )) - -(def: #export parallelism - Nat - (`` (for {(~~ (static host.jvm)) - (|> (Runtime::getRuntime) - (Runtime::availableProcessors) - .nat)} - - ## Default - 1))) - -(def: runner - (`` (for {(~~ (static host.jvm)) - (ScheduledThreadPoolExecutor::new (.int ..parallelism))} - - ## Default - (: (Atom (List Process)) - (atom.atom (list)))))) - -(def: #export (schedule milli-seconds action) - (-> Nat (IO Any) (IO Any)) - (`` (for {(~~ (static host.jvm)) - (let [runnable (object [] [Runnable] - [] - (Runnable [] (run) void - (io.run action)))] - (case milli-seconds - 0 (Executor::execute runnable runner) - _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS - runner)))} - - ## Default - (atom.update (|>> (#.Cons {#creation ("lux io current-time") - #delay milli-seconds - #action action})) - runner)))) - -(`` (for {(~~ (static host.jvm)) - (as-is)} - - ## Default - (as-is (exception: #export (cannot-continue-running-processes) "") - - (def: #export run! - (IO Any) - (loop [_ []] - (do io.Monad - [processes (atom.read runner)] - (case processes - ## And... we're done! - #.Nil - (wrap []) - - _ - (do @ - [#let [now ("lux io current-time") - [ready pending] (list.partition (function (_ process) - (|> (get@ #creation process) - (n/+ (get@ #delay process)) - (n/<= now))) - processes)] - swapped? (atom.compare-and-swap! processes pending runner)] - (if swapped? - (monad.seq @ ready) - (error! (ex.construct cannot-continue-running-processes [])))) - )))) - ))) diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux deleted file mode 100644 index 1a471022f..000000000 --- a/stdlib/source/lux/concurrency/promise.lux +++ /dev/null @@ -1,174 +0,0 @@ -(.module: - [lux (#- and or) - [control - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ do Monad)]] - [data - ["." product]] - ["." function] - [type - abstract] - ["." io (#+ IO io)]] - [// - ["." process] - ["." atom (#+ Atom atom)]]) - -(abstract: #export (Promise a) - {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} - (Atom [(Maybe a) (List (-> a (IO Any)))]) - - (def: #export (promise ?value) - (All [a] (-> (Maybe a) (Promise a))) - (:abstraction (atom [?value (list)]))) - - (def: #export (poll (^:representation promise)) - {#.doc "Polls a promise's value."} - (All [a] (-> (Promise a) (Maybe a))) - (|> (atom.read promise) - io.run - product.left)) - - (def: #export (resolve value (^:representation promise)) - {#.doc "Sets an promise's value if it has not been done yet."} - (All [a] (-> a (Promise a) (IO Bit))) - (do io.Monad - [(^@ old [_value _observers]) (atom.read promise)] - (case _value - (#.Some _) - (wrap #0) - - #.None - (do @ - [#let [new [(#.Some value) #.None]] - succeeded? (atom.compare-and-swap old new promise)] - (if succeeded? - (do @ - [_ (monad.map @ (function (_ f) (f value)) - _observers)] - (wrap #1)) - (resolve value (:abstraction promise))))))) - - (def: #export (await f (^:representation promise)) - (All [a] (-> (-> a (IO Any)) (Promise a) Any)) - (let [(^@ old [_value _observers]) (io.run (atom.read promise))] - (case _value - (#.Some value) - (io.run (f value)) - - #.None - (let [new [_value (#.Cons f _observers)]] - (if (io.run (atom.compare-and-swap old new promise)) - [] - (await f (:abstraction promise))))))) - ) - -(def: #export (resolved? promise) - {#.doc "Checks whether a promise's value has already been resolved."} - (All [a] (-> (Promise a) Bit)) - (case (poll promise) - #.None - #0 - - (#.Some _) - #1)) - -(structure: #export _ (Functor Promise) - (def: (map f fa) - (let [fb (promise #.None)] - (exec (await (function (_ a) (resolve (f a) fb)) - fa) - fb)))) - -(structure: #export _ (Apply Promise) - (def: functor Functor) - - (def: (apply ff fa) - (let [fb (promise #.None)] - (exec (await (function (_ f) - (io (await (function (_ a) (resolve (f a) fb)) - fa))) - ff) - fb)))) - -(structure: #export _ (Monad Promise) - (def: functor Functor) - - (def: (wrap a) - (promise (#.Some a))) - - (def: (join mma) - (let [ma (promise #.None)] - (exec (await (function (_ ma') - (io (await (function (_ a') (resolve a' ma)) - ma'))) - mma) - ma)))) - -(def: #export (and left right) - {#.doc "Sequencing combinator."} - (All [a b] (-> (Promise a) (Promise b) (Promise [a b]))) - (do Monad - [a left - b right] - (wrap [a b]))) - -(def: #export (or left right) - {#.doc "Heterogeneous alternative combinator."} - (All [a b] (-> (Promise a) (Promise b) (Promise (| a b)))) - (let [a|b (promise #.None)] - (with-expansions - [ (do-template [ ] - [(await (function (_ value) (resolve ( value) a|b)) - )] - - [left #.Left] - [right #.Right] - )] - (exec - a|b)))) - -(def: #export (either left right) - {#.doc "Homogeneous alternative combinator."} - (All [a] (-> (Promise a) (Promise a) (Promise a))) - (let [left||right (promise #.None)] - (`` (exec (~~ (do-template [] - [(await (function (_ value) (resolve value left||right)) - )] - - [left] - [right])) - left||right)))) - -(def: #export (schedule millis-delay computation) - {#.doc (doc "Runs an I/O computation on its own process (after a specified delay)." - "Returns a Promise that will eventually host its result.")} - (All [a] (-> Nat (IO a) (Promise a))) - (let [!out (promise #.None)] - (exec (|> (do io.Monad - [value computation] - (resolve value !out)) - (process.schedule millis-delay) - io.run) - !out))) - -(def: #export future - {#.doc (doc "Runs an I/O computation on its own process." - "Returns a Promise that will eventually host its result.")} - (All [a] (-> (IO a) (Promise a))) - (schedule 0)) - -(def: #export (delay time-millis value) - {#.doc "Delivers a value after a certain period has passed."} - (All [a] (-> Nat a (Promise a))) - (schedule time-millis (io value))) - -(def: #export (wait time-millis) - {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."} - (-> Nat (Promise Any)) - (delay time-millis [])) - -(def: #export (time-out time-millis promise) - {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."} - (All [a] (-> Nat (Promise a) (Promise (Maybe a)))) - (..or (wait time-millis) promise)) diff --git a/stdlib/source/lux/concurrency/semaphore.lux b/stdlib/source/lux/concurrency/semaphore.lux deleted file mode 100644 index 7b749ea60..000000000 --- a/stdlib/source/lux/concurrency/semaphore.lux +++ /dev/null @@ -1,149 +0,0 @@ -(.module: - [lux #* - [control [monad (#+ do)]] - [concurrency - ["." atom (#+ Atom)] - ["." promise (#+ Promise)]] - ["." io (#+ IO)] - [type - abstract - ["." refinement]]]) - -(type: State - {#open-positions Nat - #waiting-list (List (Promise Any))}) - -(abstract: #export Semaphore - {#.doc "A tool for controlling access to resources by multiple concurrent processes."} - - (Atom State) - - (def: #export (semaphore init-open-positions) - (-> Nat Semaphore) - (:abstraction (atom.atom {#open-positions init-open-positions - #waiting-list (list)}))) - - (def: #export (wait semaphore) - (Ex [k] (-> Semaphore (Promise Any))) - (let [semaphore (:representation semaphore)] - (io.run - (loop [signal (: (Promise Any) - (promise.promise #.None))] - (do io.Monad - [state (atom.read semaphore) - #let [[ready? state'] (: [Bit State] - (case (get@ #open-positions state) - 0 [#0 (update@ #waiting-list (|>> (#.Cons signal)) - state)] - _ [#1 (update@ #open-positions dec - state)]))] - success? (atom.compare-and-swap state state' semaphore) - _ (if ready? - (promise.resolve [] signal) - (wrap #0))] - (if success? - (wrap signal) - (recur signal))))))) - - (def: #export (signal semaphore) - (Ex [k] (-> Semaphore (Promise Any))) - (let [semaphore (:representation semaphore)] - (promise.future - (loop [_ []] - (do io.Monad - [state (atom.read semaphore) - #let [[?signal state'] (: [(Maybe (Promise Any)) State] - (case (get@ #waiting-list state) - #.Nil - [#.None (update@ #open-positions inc state)] - - (#.Cons head tail) - [(#.Some head) (set@ #waiting-list tail state)]))] - success? (atom.compare-and-swap state state' semaphore)] - (if success? - (do @ - [_ (case ?signal - #.None - (wrap #1) - - (#.Some signal) - (promise.resolve [] signal))] - (wrap [])) - (recur []))))))) - ) - -(abstract: #export Mutex - {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} - - Semaphore - - (def: #export (mutex _) - (-> Any Mutex) - (:abstraction (semaphore 1))) - - (def: (acquire mutex) - (-> Mutex (Promise Any)) - (wait (:representation mutex))) - - (def: (release mutex) - (-> Mutex (Promise Any)) - (signal (:representation mutex))) - - (def: #export (synchronize mutex procedure) - (All [a] (-> Mutex (IO (Promise a)) (Promise a))) - (do promise.Monad - [_ (acquire mutex) - output (io.run procedure) - _ (release mutex)] - (wrap output))) - ) - -(def: #export limit (refinement.refinement (n/> 0))) -(`` (type: #export Limit (~~ (refinement.type limit)))) - -(abstract: #export Barrier - {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} - - {#limit Limit - #count (Atom Nat) - #start-turnstile Semaphore - #end-turnstile Semaphore} - - (def: #export (barrier limit) - (-> Limit Barrier) - (:abstraction {#limit limit - #count (atom.atom 0) - #start-turnstile (semaphore 0) - #end-turnstile (semaphore 0)})) - - (def: (un-block times turnstile) - (-> Nat Semaphore (Promise Any)) - (loop [step 0] - (if (n/< times step) - (do promise.Monad - [_ (signal turnstile)] - (recur (inc step))) - (:: promise.Monad wrap [])))) - - (do-template [ ] - [(def: ( (^:representation barrier)) - (-> Barrier (Promise Any)) - (do promise.Monad - [#let [limit (refinement.un-refine (get@ #limit barrier)) - goal - count (io.run (atom.update (get@ #count barrier)))] - _ (if (n/= goal count) - (un-block limit (get@ barrier)) - (wrap []))] - (wait (get@ barrier))))] - - [start inc limit #start-turnstile] - [end dec 0 #end-turnstile] - ) - - (def: #export (block barrier) - (-> Barrier (Promise Any)) - (do promise.Monad - [_ (start barrier)] - (end barrier))) - ) diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux deleted file mode 100644 index 648d86d95..000000000 --- a/stdlib/source/lux/concurrency/stm.lux +++ /dev/null @@ -1,245 +0,0 @@ -(.module: - [lux #* - [control - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ do Monad)]] - ["." io (#+ IO io)] - [data - ["." product] - ["." maybe] - [collection - ["." list ("list/." Functor Fold)]]] - [concurrency - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise promise)] - ["." frp ("frp/." Functor)]] - [type - abstract]]) - -(type: #export (Observer a) - (-> a (IO Any))) - -(abstract: #export (Var a) - {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - - (Atom [a (List (Observer a))]) - - (def: #export (var value) - {#.doc "Creates a new STM var, with a default value."} - (All [a] (-> a (Var a))) - (:abstraction (atom.atom [value (list)]))) - - (def: read!! - (All [a] (-> (Var a) a)) - (|>> :representation atom.read io.run product.left)) - - (def: #export (read! (^:representation var)) - {#.doc "Reads var immediately, without going through a transaction."} - (All [a] (-> (Var a) (IO a))) - (|> var - atom.read - (:: io.Functor map product.left))) - - (def: (write! new-value (^:representation var)) - (All [a] (-> a (Var a) (IO Any))) - (do io.Monad - [(^@ old [_value _observers]) (atom.read var) - succeeded? (atom.compare-and-swap old [new-value _observers] var)] - (if succeeded? - (do @ - [_ (monad.map @ (function (_ f) (f new-value)) _observers)] - (wrap [])) - (write! new-value (:abstraction var))))) - - ## TODO: Remove when possible - (def: (helper|follow var) - (All [a] (-> (Var a) (frp.Channel a))) - (frp.channel [])) - (def: #export (follow target) - {#.doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO (frp.Channel a)))) - (do io.Monad - [#let [channel (helper|follow target) - target (:representation target)] - _ (atom.update (function (_ [value observers]) - [value (#.Cons (frp.publish channel) observers)]) - target)] - (wrap channel))) - ) - -(type: (Tx-Frame a) - {#var (Var a) - #original a - #current a}) - -(type: Tx - (List (Ex [a] (Tx-Frame a)))) - -(type: #export (STM a) - {#.doc "A computation which updates a transaction and produces a value."} - (-> Tx [Tx a])) - -(def: (find-var-value var tx) - (All [a] (-> (Var a) Tx (Maybe a))) - (|> tx - (list.find (function (_ [_var _original _current]) - (is? (:coerce (Var Any) var) - (:coerce (Var Any) _var)))) - (:: maybe.Monad map (function (_ [_var _original _current]) - _current)) - (:assume) - )) - -(def: #export (read var) - (All [a] (-> (Var a) (STM a))) - (function (_ tx) - (case (find-var-value var tx) - (#.Some value) - [tx value] - - #.None - (let [value (read!! var)] - [(#.Cons [var value value] tx) - value])))) - -(def: (update-tx-value var value tx) - (All [a] (-> (Var a) a Tx Tx)) - (case tx - #.Nil - #.Nil - - (#.Cons [_var _original _current] tx') - (if (is? (:coerce (Var Any) var) - (:coerce (Var Any) _var)) - (#.Cons {#var (:coerce (Var Any) _var) - #original (:coerce Any _original) - #current (:coerce Any value)} - tx') - (#.Cons {#var _var - #original _original - #current _current} - (update-tx-value var value tx'))) - )) - -(def: #export (write value var) - {#.doc "Writes value to var."} - (All [a] (-> a (Var a) (STM Any))) - (function (_ tx) - (case (find-var-value var tx) - (#.Some _) - [(update-tx-value var value tx) - []] - - #.None - [(#.Cons [var (read!! var) value] tx) - []]))) - -(structure: #export _ (Functor STM) - (def: (map f fa) - (function (_ tx) - (let [[tx' a] (fa tx)] - [tx' (f a)])))) - -(structure: #export _ (Apply STM) - (def: functor Functor) - - (def: (apply ff fa) - (function (_ tx) - (let [[tx' f] (ff tx) - [tx'' a] (fa tx')] - [tx'' (f a)])))) - -(structure: #export _ (Monad STM) - (def: functor Functor) - - (def: (wrap a) - (function (_ tx) [tx a])) - - (def: (join mma) - (function (_ tx) - (let [[tx' ma] (mma tx)] - (ma tx'))))) - -(def: #export (update f var) - {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} - (All [a] (-> (-> a a) (Var a) (STM [a a]))) - (do Monad - [a (read var) - #let [a' (f a)] - _ (write a' var)] - (wrap [a a']))) - -(def: (can-commit? tx) - (-> Tx Bit) - (list.every? (function (_ [_var _original _current]) - (is? _original (read!! _var))) - tx)) - -(def: (commit-var! [_var _original _current]) - (-> (Ex [a] (Tx-Frame a)) Any) - (if (is? _original _current) - [] - (io.run (write! _current _var)))) - -(def: fresh-tx Tx (list)) - -(type: Commit (Ex [a] [(STM a) (Promise a)])) - -(def: pending-commits - (Atom (Rec Commits (Promise [Commit Commits]))) - (atom (promise #.None))) - -(def: commit-processor-flag - (Atom Bit) - (atom #0)) - -(def: (issue-commit commit) - (-> Commit (IO Any)) - (let [entry [commit (promise #.None)]] - (loop [|commits| (io.run (atom.read pending-commits))] - (case (promise.poll |commits|) - #.None - (do io.Monad - [resolved? (promise.resolve entry |commits|)] - (if resolved? - (atom.write (product.right entry) pending-commits) - (recur |commits|))) - - (#.Some [head tail]) - (recur tail))))) - -(def: (process-commit [stm-proc output]) - (-> [(STM Any) (Promise Any)] Any) - (let [[finished-tx value] (stm-proc fresh-tx)] - (io.run (if (can-commit? finished-tx) - (exec (list/map commit-var! finished-tx) - (promise.resolve value output)) - (issue-commit [stm-proc output]))))) - -(def: init-processor! - (IO Any) - (do io.Monad - [flag (atom.read commit-processor-flag)] - (if flag - (wrap []) - (do @ - [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] - (if was-first? - (exec (|> (io.run (atom.read pending-commits)) - (promise.await (function (recur [head tail]) - (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head)) - (promise.await recur tail)))))) - (wrap [])) - (wrap []))) - ))) - -(def: #export (commit stm-proc) - {#.doc (doc "Commits a transaction and returns its result (asynchronously)." - "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." - "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} - (All [a] (-> (STM a) (Promise a))) - (let [output (promise #.None)] - (exec (io.run init-processor!) - (io.run (issue-commit [stm-proc output])) - output))) diff --git a/stdlib/source/lux/concurrency/task.lux b/stdlib/source/lux/concurrency/task.lux deleted file mode 100644 index c03ab7647..000000000 --- a/stdlib/source/lux/concurrency/task.lux +++ /dev/null @@ -1,82 +0,0 @@ -(.module: - [lux #* - [control - [functor (#+ Functor)] - [apply (#+ Apply)] - [monad (#+ Monad do)] - ["ex" exception (#+ Exception)]] - [data - ["." error (#+ Error)]] - ["." macro - ["s" syntax (#+ syntax: Syntax)]]] - [// - ["." promise (#+ Promise)]]) - -(type: #export (Task a) - (Promise (Error a))) - -(def: #export (fail error) - (All [a] (-> Text (Task a))) - (:: promise.Monad wrap (#error.Error error))) - -(def: #export (throw exception message) - (All [e a] (-> (Exception e) e (Task a))) - (:: promise.Monad wrap - (ex.throw exception message))) - -(def: #export (return value) - (All [a] (-> a (Task a))) - (:: promise.Monad wrap (#error.Success value))) - -(def: #export (try computation) - (All [a] (-> (Task a) (Task (Error a)))) - (:: promise.Functor map (|>> #error.Success) computation)) - -(structure: #export _ (Functor Task) - (def: (map f fa) - (:: promise.Functor map - (function (_ fa') - (case fa' - (#error.Error error) - (#error.Error error) - - (#error.Success a) - (#error.Success (f a)))) - fa))) - -(structure: #export _ (Apply Task) - (def: functor Functor) - - (def: (apply ff fa) - (do promise.Monad - [ff' ff - fa' fa] - (wrap (do error.Monad - [f ff' - a fa'] - (wrap (f a))))))) - -(structure: #export _ (Monad Task) - (def: functor Functor) - - (def: wrap return) - - (def: (join mma) - (do promise.Monad - [mma' mma] - (case mma' - (#error.Error error) - (wrap (#error.Error error)) - - (#error.Success ma) - ma)))) - -(syntax: #export (task {type s.any}) - {#.doc (doc "Makes an uninitialized Task (in this example, of Any)." - (task Any))} - (wrap (list (` (: (..Task (~ type)) - (promise.promise #.None)))))) - -(def: #export (from-promise promise) - (All [a] (-> (Promise a) (Task a))) - (:: promise.Functor map (|>> #error.Success) promise)) diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux new file mode 100644 index 000000000..0af0d09f9 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -0,0 +1,377 @@ +(.module: {#.doc "The actor model of concurrency."} + [lux #* + [control monad + ["p" parser] + ["ex" exception (#+ exception:)]] + ["." io (#- run) ("io/." Monad)] + [data + ["." product] + ["e" error] + [text + format] + [collection + ["." list ("list/." Monoid Monad Fold)]]] + ["." macro (#+ with-gensyms Monad) + ["." code] + ["s" syntax (#+ syntax: Syntax)] + [syntax + ["cs" common] + [common + ["csr" reader] + ["csw" writer]]]] + [type + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise promise) ("promise/." Monad)] + ["." task (#+ Task)]]) + +(exception: #export poisoned) + +(exception: #export (dead {actor-name Text} + {message-name Text}) + (ex.report ["Actor" actor-name] + ["Message" message-name])) + +## [Types] +(with-expansions + [ (as-is (-> s (Actor s) (Task s))) + (as-is [Text s (List )]) + (as-is (Rec Mailbox (Promise [ Mailbox])))] + + (def: (obituary mailbox) + (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) + (case (promise.poll mailbox) + (#.Some [head tail]) + (#.Cons head (obituary tail)) + + #.None + #.Nil)) + + (abstract: #export (Actor s) + {#.doc "An actor, defined as all the necessities it requires."} + {#mailbox (Atom ) + #obituary (Promise )} + + ## TODO: Delete after new-luxc becomes the new standard compiler. + (def: (actor mailbox obituary) + (All [s] (-> (Atom ) (Promise ) (Actor s))) + (:abstraction {#mailbox mailbox + #obituary obituary})) + + (type: #export (Message s) + ) + + (type: #export (Obituary s) + ) + + (type: #export (Behavior s) + {#.doc "An actor's behavior when messages are received."} + {#handle (-> (Message s) s (Actor s) (Task s)) + #end (-> Text s (Promise Any))}) + + (def: #export (spawn behavior init) + {#.doc "Given a behavior and initial state, spawns an actor and returns it."} + (All [s] (-> (Behavior s) s (IO (Actor s)))) + (io (let [[handle end] behavior + self (actor (atom (promise #.None)) + (promise #.None)) + process (loop [state init + |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))] + (do promise.Monad + [[head tail] |mailbox| + ?state' (handle head state self)] + (case ?state' + (#e.Error error) + (do @ + [_ (end error state)] + (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] + (get@ #obituary (:representation self)))) + (wrap []))) + + (#e.Success state') + (recur state' tail))))] + self))) + + (def: #export (alive? actor) + (All [s] (-> (Actor s) Bit)) + (case (promise.poll (get@ #obituary (:representation actor))) + #.None + #1 + + _ + #0)) + + (def: #export (send message actor) + {#.doc "Communicate with an actor through message passing."} + (All [s] (-> (Message s) (Actor s) (IO Bit))) + (if (alive? actor) + (let [entry [message (promise #.None)]] + (do Monad + [|mailbox| (atom.read (get@ #mailbox (:representation actor)))] + (loop [|mailbox| |mailbox|] + (case (promise.poll |mailbox|) + #.None + (do @ + [resolved? (promise.resolve entry |mailbox|)] + (if resolved? + (do @ + [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] + (wrap #1)) + (recur |mailbox|))) + + (#.Some [_ |mailbox|']) + (recur |mailbox|'))))) + (io/wrap #0))) + )) + +## [Values] +(def: (default-handle message state self) + (All [s] (-> (Message s) s (Actor s) (Task s))) + (message state self)) + +(def: (default-end cause state) + (All [s] (-> Text s (Promise Any))) + (promise/wrap [])) + +(def: #export default-behavior + (All [s] (Behavior s)) + {#handle default-handle + #end default-end}) + +(def: #export (poison actor) + {#.doc (doc "Kills the actor by sending a message that will kill it upon processing," + "but allows the actor to handle previous messages.")} + (All [s] (-> (Actor s) (IO Bit))) + (send (function (_ state self) + (task.throw poisoned [])) + actor)) + +## [Syntax] +(do-template [ ] + [(def: #export ( name) + (-> Name cs.Annotations cs.Annotations) + (|>> (#.Cons [(name-of ) + (code.tag name)]))) + + (def: #export ( name) + (-> Name (Meta Name)) + (do Monad + [[_ annotations _] (macro.find-def name)] + (case (macro.get-tag-ann (name-of ) annotations) + (#.Some actor-name) + (wrap actor-name) + + _ + (macro.fail (format "Definition is not " ".")))))] + + [with-actor resolve-actor #..actor "an actor"] + [with-message resolve-message #..message "a message"] + ) + +(def: actor-decl^ + (Syntax [Text (List Text)]) + (p.either (s.form (p.and s.local-identifier (p.some s.local-identifier))) + (p.and s.local-identifier (:: p.Monad wrap (list))))) + +(do-template [ ] + [(def: #export + (-> Text Text) + (|>> (format "@")))] + + [state-name "State"] + [behavior-name "Behavior"] + [new-name "new"] + ) + +(type: HandleC + [[Text Text Text] Code]) + +(type: StopC + [[Text Text] Code]) + +(type: BehaviorC + [(Maybe HandleC) (Maybe StopC)]) + +(def: behavior^ + (s.Syntax BehaviorC) + (let [handle-args ($_ p.and s.local-identifier s.local-identifier s.local-identifier) + stop-args ($_ p.and s.local-identifier s.local-identifier)] + (p.and (p.maybe (s.form (p.and (s.form (p.after (s.this (' handle)) handle-args)) + s.any))) + (p.maybe (s.form (p.and (s.form (p.after (s.this (' stop)) stop-args)) + s.any)))))) + +(syntax: #export (actor: + {export csr.export} + {[_name _vars] actor-decl^} + {annotations (p.default cs.empty-annotations csr.annotations)} + state-type + {[?handle ?stop] behavior^}) + {#.doc (doc "Defines an actor, with its behavior and internal state." + (actor: #export Counter + Nat + + ((stop cause state) + (:: promise.Monad wrap + (log! (if (ex.match? ..poisoned cause) + (format "Counter was poisoned: " (%n state)) + cause))))) + + (actor: #export (Stack a) + (List a) + + ((handle message state self) + (do task.Monad + [#let [_ (log! "BEFORE")] + output (message state self) + #let [_ (log! "AFTER")]] + (wrap output)))))} + (with-gensyms [g!_ g!init] + (do @ + [module macro.current-module-name + #let [g!type (code.local-identifier (state-name _name)) + g!behavior (code.local-identifier (behavior-name _name)) + g!actor (code.local-identifier _name) + g!new (code.local-identifier (new-name _name)) + g!vars (list/map code.local-identifier _vars)]] + (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars)) + (~ state-type))) + (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars)) + (~ (|> annotations + (with-actor [module _name]) + csw.annotations)) + (..Actor ((~ g!type) (~+ g!vars))))) + (` (def: (~+ (csw.export export)) (~ g!behavior) + (All [(~+ g!vars)] + (..Behavior ((~ g!type) (~+ g!vars)))) + {#..handle (~ (case ?handle + #.None + (` (~! ..default-handle)) + + (#.Some [[messageN stateN selfN] bodyC]) + (` (function ((~ g!_) + (~ (code.local-identifier messageN)) + (~ (code.local-identifier stateN)) + (~ (code.local-identifier selfN))) + (do task.Monad + [] + (~ bodyC)))))) + #..end (~ (case ?stop + #.None + (` (~! ..default-end)) + + (#.Some [[causeN stateN] bodyC]) + (` (function ((~ g!_) + (~ (code.local-identifier causeN)) + (~ (code.local-identifier stateN))) + (do promise.Monad + [] + (~ bodyC))))))})) + (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) + (All [(~+ g!vars)] + (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) + (..spawn (~ g!behavior) (~ g!init)))))) + ))) + +(type: Signature + {#vars (List Text) + #name Text + #inputs (List cs.Typed-Input) + #state Text + #self Text + #output Code}) + +(def: signature^ + (s.Syntax Signature) + (s.form ($_ p.and + (p.default (list) (s.tuple (p.some s.local-identifier))) + s.local-identifier + (p.some csr.typed-input) + s.local-identifier + s.local-identifier + s.any))) + +(def: reference^ + (s.Syntax [Name (List Text)]) + (p.either (s.form (p.and s.identifier (p.some s.local-identifier))) + (p.and s.identifier (:: p.Monad wrap (list))))) + +(syntax: #export (message: + {export csr.export} + {[actor-name actor-vars] reference^} + {signature signature^} + {annotations (p.default cs.empty-annotations csr.annotations)} + body) + {#.doc (doc "A message can access the actor's state through the state parameter." + "A message can also access the actor itself through the self parameter." + "A message's output must be a task containing a 2-tuple with the updated state and a return value." + "A message may succeed or fail (in case of failure, the actor dies)." + + (message: #export Counter + (count! [increment Nat] state self Nat) + (let [state' (n/+ increment state)] + (task.return [state' state']))) + + (message: #export (Stack a) + (push [value a] state self (List a)) + (let [state' (#.Cons value state)] + (task.return [state' state']))))} + (with-gensyms [g!_ g!return g!error g!task g!sent?] + (do @ + [current-module macro.current-module-name + actor-name (resolve-actor actor-name) + #let [message-name [current-module (get@ #name signature)] + g!type (code.identifier (product.both id state-name actor-name)) + g!message (code.local-identifier (get@ #name signature)) + g!actor-vars (list/map code.local-identifier actor-vars) + actorC (` ((~ (code.identifier actor-name)) (~+ g!actor-vars))) + g!all-vars (|> (get@ #vars signature) (list/map code.local-identifier) (list/compose g!actor-vars)) + g!inputsC (|> (get@ #inputs signature) (list/map product.left)) + g!inputsT (|> (get@ #inputs signature) (list/map product.right)) + g!state (|> signature (get@ #state) code.local-identifier) + g!self (|> signature (get@ #self) code.local-identifier) + g!actor-refs (: (List Code) + (if (list.empty? actor-vars) + (list) + (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`)))))) + ref-replacements (|> (if (list.empty? actor-vars) + (list) + (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`))))) + (: (List Code)) + (list.zip2 g!all-vars) + (: (List [Code Code]))) + g!outputT (list/fold (function (_ [g!var g!ref] outputT) + (code.replace g!var g!ref outputT)) + (get@ #output signature) + ref-replacements)]] + (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self)) + (~ (|> annotations + (with-message actor-name) + csw.annotations)) + (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) + (let [(~ g!task) (task.task (~ g!outputT))] + (io.run (do io.Monad + [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) + (do promise.Monad + [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) + (~ g!outputT)]) + (do task.Monad + [] + (~ body)))] + (case (~ g!return) + (#.Right [(~ g!state) (~ g!return)]) + (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) + (task.return (~ g!state))) + + (#.Left (~ g!error)) + (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) + (task.fail (~ g!error)))) + )) + (~ g!self))] + (if (~ g!sent?) + ((~' wrap) (~ g!task)) + ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name))) + (~ (code.text (%name message-name)))])))))))) + )) + ))) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux new file mode 100644 index 000000000..b1692b6e3 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -0,0 +1,59 @@ +(.module: + [lux #* + [control + [monad (#+ do)]] + ["." function] + ["." io (#- run)] + [type + abstract] + [platform + [compiler + ["." host]]] + [host (#+ import:)]]) + +(`` (for {(~~ (static host.jvm)) + (import: (java/util/concurrent/atomic/AtomicReference a) + (new [a]) + (get [] a) + (compareAndSet [a a] boolean))})) + +(`` (abstract: #export (Atom a) + {#.doc "Atomic references that are safe to mutate concurrently."} + + (for {(~~ (static host.jvm)) + (AtomicReference a)}) + + (def: #export (atom value) + (All [a] (-> a (Atom a))) + (:abstraction (for {(~~ (static host.jvm)) + (AtomicReference::new value)}))) + + (def: #export (read atom) + (All [a] (-> (Atom a) (IO a))) + (io (for {(~~ (static host.jvm)) + (AtomicReference::get (:representation atom))}))) + + (def: #export (compare-and-swap current new atom) + {#.doc (doc "Only mutates an atom if you can present it's current value." + "That guarantees that atom was not updated since you last read from it.")} + (All [a] (-> a a (Atom a) (IO Bit))) + (io (AtomicReference::compareAndSet current new (:representation atom)))) + )) + +(def: #export (update f atom) + {#.doc (doc "Updates an atom by applying a function to its current value." + "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds." + "The retries will be done with the new values of the atom, as they show up.")} + (All [a] (-> (-> a a) (Atom a) (IO a))) + (loop [_ []] + (do io.Monad + [old (read atom) + #let [new (f old)] + swapped? (compare-and-swap old new atom)] + (if swapped? + (wrap new) + (recur []))))) + +(def: #export (write value atom) + (All [a] (-> a (Atom a) (IO Any))) + (update (function.constant value) atom)) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux new file mode 100644 index 000000000..8db54f28f --- /dev/null +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -0,0 +1,132 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + ["." io (#+ IO io)] + [data + [collection + [list ("list/." Monoid)]]] + [type (#+ :share) + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise)]]) + +(abstract: #export (Channel a) + {#.doc "An asynchronous channel to distribute values."} + (Atom (List (-> a (IO Any)))) + + (def: #export (channel _) + (All [a] (-> Any (Channel a))) + (:abstraction (atom (list)))) + + (def: #export (listen listener channel) + (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) + ## TODO: Simplify when possible. + (do io.Monad + [_ (atom.update (|>> (#.Cons listener)) + (:representation channel))] + (wrap []))) + + (def: #export (publish channel value) + {#.doc "Publish to a channel."} + (All [a] (-> (Channel a) a (IO Any))) + (do io.Monad + [listeners (atom.read (:representation channel))] + (monad.map @ (function (_ listener) (listener value)) listeners))) + ) + +(def: #export (filter predicate input) + (All [a] (-> (-> a Bit) (Channel a) (Channel a))) + (let [output (channel [])] + (exec (io.run (listen (function (_ value) + (if (predicate value) + (publish output value) + (io []))) + input)) + output))) + +(def: #export (pipe output input) + {#.doc "Copy/pipe the contents of a channel on to another."} + (All [a] (-> (Channel a) (Channel a) (IO Any))) + (listen (publish output) + input)) + +(def: #export (merge inputs) + {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} + (All [a] (-> (List (Channel a)) (IO (Channel a)))) + (let [output (channel [])] + (do io.Monad + [_ (monad.map @ (pipe output) inputs)] + (wrap output)))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Channel a))) + (let [output (channel [])] + (exec (promise.await (publish output) promise) + output))) + +(def: #export (poll time action) + (All [a] (-> Nat (IO a) (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad + [value action + _ (publish output value)] + (wrap (promise.await recur (promise.wait time)))))) + output))) + +(def: #export (periodic time) + (-> Nat (Channel Any)) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad + [_ (publish output [])] + (wrap (promise.await recur (promise.wait time)))))) + output))) + +(def: #export (iterate f init) + (All [a] (-> (-> a (Promise a)) a (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [zero init] + (do io.Monad + [_ (publish output zero)] + (wrap (promise.await recur (f zero)))))) + output))) + +(structure: #export _ (Functor Channel) + (def: (map f input) + (let [output (channel [])] + (exec (io.run (listen (|>> f (publish output)) + input)) + output)))) + +(structure: #export _ (Apply Channel) + (def: functor Functor) + + (def: (apply ff fa) + (let [output (channel [])] + (exec (io.run (listen (function (_ f) + (listen (|>> f (publish output)) + fa)) + ff)) + output)))) + +(structure: #export _ (Monad Channel) + (def: functor Functor) + + (def: (wrap a) + (let [output (channel [])] + (exec (io.run (publish output a)) + output))) + + (def: (join mma) + (let [output (channel [])] + (exec (io.run (listen (listen (publish output)) + mma)) + output)))) diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux new file mode 100644 index 000000000..a67734747 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/process.lux @@ -0,0 +1,110 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + ["ex" exception (#+ exception:)]] + [data + [collection + ["." list]]] + [platform + [compiler + ["." host]]] + ["." io (#+ IO io)] + [host (#+ import: object)]] + [// + ["." atom (#+ Atom)]]) + +(`` (for {(~~ (static host.jvm)) + (as-is (import: java/lang/Object) + + (import: java/lang/Runtime + (#static getRuntime [] Runtime) + (availableProcessors [] int)) + + (import: java/lang/Runnable) + + (import: java/util/concurrent/TimeUnit + (#enum MILLISECONDS)) + + (import: java/util/concurrent/Executor + (execute [Runnable] #io void)) + + (import: (java/util/concurrent/ScheduledFuture a)) + + (import: java/util/concurrent/ScheduledThreadPoolExecutor + (new [int]) + (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} + + ## Default + (type: Process + {#creation Nat + #delay Nat + #action (IO Any)}) + )) + +(def: #export parallelism + Nat + (`` (for {(~~ (static host.jvm)) + (|> (Runtime::getRuntime) + (Runtime::availableProcessors) + .nat)} + + ## Default + 1))) + +(def: runner + (`` (for {(~~ (static host.jvm)) + (ScheduledThreadPoolExecutor::new (.int ..parallelism))} + + ## Default + (: (Atom (List Process)) + (atom.atom (list)))))) + +(def: #export (schedule milli-seconds action) + (-> Nat (IO Any) (IO Any)) + (`` (for {(~~ (static host.jvm)) + (let [runnable (object [] [Runnable] + [] + (Runnable [] (run) void + (io.run action)))] + (case milli-seconds + 0 (Executor::execute runnable runner) + _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS + runner)))} + + ## Default + (atom.update (|>> (#.Cons {#creation ("lux io current-time") + #delay milli-seconds + #action action})) + runner)))) + +(`` (for {(~~ (static host.jvm)) + (as-is)} + + ## Default + (as-is (exception: #export (cannot-continue-running-processes) "") + + (def: #export run! + (IO Any) + (loop [_ []] + (do io.Monad + [processes (atom.read runner)] + (case processes + ## And... we're done! + #.Nil + (wrap []) + + _ + (do @ + [#let [now ("lux io current-time") + [ready pending] (list.partition (function (_ process) + (|> (get@ #creation process) + (n/+ (get@ #delay process)) + (n/<= now))) + processes)] + swapped? (atom.compare-and-swap! processes pending runner)] + (if swapped? + (monad.seq @ ready) + (error! (ex.construct cannot-continue-running-processes [])))) + )))) + ))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux new file mode 100644 index 000000000..1a471022f --- /dev/null +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -0,0 +1,174 @@ +(.module: + [lux (#- and or) + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + [data + ["." product]] + ["." function] + [type + abstract] + ["." io (#+ IO io)]] + [// + ["." process] + ["." atom (#+ Atom atom)]]) + +(abstract: #export (Promise a) + {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} + (Atom [(Maybe a) (List (-> a (IO Any)))]) + + (def: #export (promise ?value) + (All [a] (-> (Maybe a) (Promise a))) + (:abstraction (atom [?value (list)]))) + + (def: #export (poll (^:representation promise)) + {#.doc "Polls a promise's value."} + (All [a] (-> (Promise a) (Maybe a))) + (|> (atom.read promise) + io.run + product.left)) + + (def: #export (resolve value (^:representation promise)) + {#.doc "Sets an promise's value if it has not been done yet."} + (All [a] (-> a (Promise a) (IO Bit))) + (do io.Monad + [(^@ old [_value _observers]) (atom.read promise)] + (case _value + (#.Some _) + (wrap #0) + + #.None + (do @ + [#let [new [(#.Some value) #.None]] + succeeded? (atom.compare-and-swap old new promise)] + (if succeeded? + (do @ + [_ (monad.map @ (function (_ f) (f value)) + _observers)] + (wrap #1)) + (resolve value (:abstraction promise))))))) + + (def: #export (await f (^:representation promise)) + (All [a] (-> (-> a (IO Any)) (Promise a) Any)) + (let [(^@ old [_value _observers]) (io.run (atom.read promise))] + (case _value + (#.Some value) + (io.run (f value)) + + #.None + (let [new [_value (#.Cons f _observers)]] + (if (io.run (atom.compare-and-swap old new promise)) + [] + (await f (:abstraction promise))))))) + ) + +(def: #export (resolved? promise) + {#.doc "Checks whether a promise's value has already been resolved."} + (All [a] (-> (Promise a) Bit)) + (case (poll promise) + #.None + #0 + + (#.Some _) + #1)) + +(structure: #export _ (Functor Promise) + (def: (map f fa) + (let [fb (promise #.None)] + (exec (await (function (_ a) (resolve (f a) fb)) + fa) + fb)))) + +(structure: #export _ (Apply Promise) + (def: functor Functor) + + (def: (apply ff fa) + (let [fb (promise #.None)] + (exec (await (function (_ f) + (io (await (function (_ a) (resolve (f a) fb)) + fa))) + ff) + fb)))) + +(structure: #export _ (Monad Promise) + (def: functor Functor) + + (def: (wrap a) + (promise (#.Some a))) + + (def: (join mma) + (let [ma (promise #.None)] + (exec (await (function (_ ma') + (io (await (function (_ a') (resolve a' ma)) + ma'))) + mma) + ma)))) + +(def: #export (and left right) + {#.doc "Sequencing combinator."} + (All [a b] (-> (Promise a) (Promise b) (Promise [a b]))) + (do Monad + [a left + b right] + (wrap [a b]))) + +(def: #export (or left right) + {#.doc "Heterogeneous alternative combinator."} + (All [a b] (-> (Promise a) (Promise b) (Promise (| a b)))) + (let [a|b (promise #.None)] + (with-expansions + [ (do-template [ ] + [(await (function (_ value) (resolve ( value) a|b)) + )] + + [left #.Left] + [right #.Right] + )] + (exec + a|b)))) + +(def: #export (either left right) + {#.doc "Homogeneous alternative combinator."} + (All [a] (-> (Promise a) (Promise a) (Promise a))) + (let [left||right (promise #.None)] + (`` (exec (~~ (do-template [] + [(await (function (_ value) (resolve value left||right)) + )] + + [left] + [right])) + left||right)))) + +(def: #export (schedule millis-delay computation) + {#.doc (doc "Runs an I/O computation on its own process (after a specified delay)." + "Returns a Promise that will eventually host its result.")} + (All [a] (-> Nat (IO a) (Promise a))) + (let [!out (promise #.None)] + (exec (|> (do io.Monad + [value computation] + (resolve value !out)) + (process.schedule millis-delay) + io.run) + !out))) + +(def: #export future + {#.doc (doc "Runs an I/O computation on its own process." + "Returns a Promise that will eventually host its result.")} + (All [a] (-> (IO a) (Promise a))) + (schedule 0)) + +(def: #export (delay time-millis value) + {#.doc "Delivers a value after a certain period has passed."} + (All [a] (-> Nat a (Promise a))) + (schedule time-millis (io value))) + +(def: #export (wait time-millis) + {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."} + (-> Nat (Promise Any)) + (delay time-millis [])) + +(def: #export (time-out time-millis promise) + {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."} + (All [a] (-> Nat (Promise a) (Promise (Maybe a)))) + (..or (wait time-millis) promise)) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux new file mode 100644 index 000000000..46762ecf3 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -0,0 +1,149 @@ +(.module: + [lux #* + [control [monad (#+ do)]] + ["." io (#+ IO)] + [type + abstract + ["." refinement]]] + [// + ["." atom (#+ Atom)] + ["." promise (#+ Promise)]]) + +(type: State + {#open-positions Nat + #waiting-list (List (Promise Any))}) + +(abstract: #export Semaphore + {#.doc "A tool for controlling access to resources by multiple concurrent processes."} + + (Atom State) + + (def: #export (semaphore init-open-positions) + (-> Nat Semaphore) + (:abstraction (atom.atom {#open-positions init-open-positions + #waiting-list (list)}))) + + (def: #export (wait semaphore) + (Ex [k] (-> Semaphore (Promise Any))) + (let [semaphore (:representation semaphore)] + (io.run + (loop [signal (: (Promise Any) + (promise.promise #.None))] + (do io.Monad + [state (atom.read semaphore) + #let [[ready? state'] (: [Bit State] + (case (get@ #open-positions state) + 0 [#0 (update@ #waiting-list (|>> (#.Cons signal)) + state)] + _ [#1 (update@ #open-positions dec + state)]))] + success? (atom.compare-and-swap state state' semaphore) + _ (if ready? + (promise.resolve [] signal) + (wrap #0))] + (if success? + (wrap signal) + (recur signal))))))) + + (def: #export (signal semaphore) + (Ex [k] (-> Semaphore (Promise Any))) + (let [semaphore (:representation semaphore)] + (promise.future + (loop [_ []] + (do io.Monad + [state (atom.read semaphore) + #let [[?signal state'] (: [(Maybe (Promise Any)) State] + (case (get@ #waiting-list state) + #.Nil + [#.None (update@ #open-positions inc state)] + + (#.Cons head tail) + [(#.Some head) (set@ #waiting-list tail state)]))] + success? (atom.compare-and-swap state state' semaphore)] + (if success? + (do @ + [_ (case ?signal + #.None + (wrap #1) + + (#.Some signal) + (promise.resolve [] signal))] + (wrap [])) + (recur []))))))) + ) + +(abstract: #export Mutex + {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} + + Semaphore + + (def: #export (mutex _) + (-> Any Mutex) + (:abstraction (semaphore 1))) + + (def: acquire + (-> Mutex (Promise Any)) + (|>> :representation wait)) + + (def: release + (-> Mutex (Promise Any)) + (|>> :representation signal)) + + (def: #export (synchronize mutex procedure) + (All [a] (-> Mutex (IO (Promise a)) (Promise a))) + (do promise.Monad + [_ (acquire mutex) + output (io.run procedure) + _ (release mutex)] + (wrap output))) + ) + +(def: #export limit (refinement.refinement (n/> 0))) +(`` (type: #export Limit (~~ (refinement.type limit)))) + +(abstract: #export Barrier + {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} + + {#limit Limit + #count (Atom Nat) + #start-turnstile Semaphore + #end-turnstile Semaphore} + + (def: #export (barrier limit) + (-> Limit Barrier) + (:abstraction {#limit limit + #count (atom.atom 0) + #start-turnstile (semaphore 0) + #end-turnstile (semaphore 0)})) + + (def: (un-block times turnstile) + (-> Nat Semaphore (Promise Any)) + (loop [step 0] + (if (n/< times step) + (do promise.Monad + [_ (signal turnstile)] + (recur (inc step))) + (:: promise.Monad wrap [])))) + + (do-template [ ] + [(def: ( (^:representation barrier)) + (-> Barrier (Promise Any)) + (do promise.Monad + [#let [limit (refinement.un-refine (get@ #limit barrier)) + goal + count (io.run (atom.update (get@ #count barrier)))] + _ (if (n/= goal count) + (un-block limit (get@ barrier)) + (wrap []))] + (wait (get@ barrier))))] + + [start inc limit #start-turnstile] + [end dec 0 #end-turnstile] + ) + + (def: #export (block barrier) + (-> Barrier (Promise Any)) + (do promise.Monad + [_ (start barrier)] + (end barrier))) + ) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux new file mode 100644 index 000000000..3203b2d52 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -0,0 +1,245 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + ["." io (#+ IO io)] + [data + ["." product] + ["." maybe] + [collection + ["." list ("list/." Functor Fold)]]] + [type + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise promise)] + ["." frp ("frp/." Functor)]]) + +(type: #export (Observer a) + (-> a (IO Any))) + +(abstract: #export (Var a) + {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} + + (Atom [a (List (Observer a))]) + + (def: #export (var value) + {#.doc "Creates a new STM var, with a default value."} + (All [a] (-> a (Var a))) + (:abstraction (atom.atom [value (list)]))) + + (def: read!! + (All [a] (-> (Var a) a)) + (|>> :representation atom.read io.run product.left)) + + (def: #export (read! (^:representation var)) + {#.doc "Reads var immediately, without going through a transaction."} + (All [a] (-> (Var a) (IO a))) + (|> var + atom.read + (:: io.Functor map product.left))) + + (def: (write! new-value (^:representation var)) + (All [a] (-> a (Var a) (IO Any))) + (do io.Monad + [(^@ old [_value _observers]) (atom.read var) + succeeded? (atom.compare-and-swap old [new-value _observers] var)] + (if succeeded? + (do @ + [_ (monad.map @ (function (_ f) (f new-value)) _observers)] + (wrap [])) + (write! new-value (:abstraction var))))) + + ## TODO: Remove when possible + (def: (helper|follow var) + (All [a] (-> (Var a) (frp.Channel a))) + (frp.channel [])) + (def: #export (follow target) + {#.doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp.Channel a)))) + (do io.Monad + [#let [channel (helper|follow target) + target (:representation target)] + _ (atom.update (function (_ [value observers]) + [value (#.Cons (frp.publish channel) observers)]) + target)] + (wrap channel))) + ) + +(type: (Tx-Frame a) + {#var (Var a) + #original a + #current a}) + +(type: Tx + (List (Ex [a] (Tx-Frame a)))) + +(type: #export (STM a) + {#.doc "A computation which updates a transaction and produces a value."} + (-> Tx [Tx a])) + +(def: (find-var-value var tx) + (All [a] (-> (Var a) Tx (Maybe a))) + (|> tx + (list.find (function (_ [_var _original _current]) + (is? (:coerce (Var Any) var) + (:coerce (Var Any) _var)))) + (:: maybe.Monad map (function (_ [_var _original _current]) + _current)) + (:assume) + )) + +(def: #export (read var) + (All [a] (-> (Var a) (STM a))) + (function (_ tx) + (case (find-var-value var tx) + (#.Some value) + [tx value] + + #.None + (let [value (read!! var)] + [(#.Cons [var value value] tx) + value])))) + +(def: (update-tx-value var value tx) + (All [a] (-> (Var a) a Tx Tx)) + (case tx + #.Nil + #.Nil + + (#.Cons [_var _original _current] tx') + (if (is? (:coerce (Var Any) var) + (:coerce (Var Any) _var)) + (#.Cons {#var (:coerce (Var Any) _var) + #original (:coerce Any _original) + #current (:coerce Any value)} + tx') + (#.Cons {#var _var + #original _original + #current _current} + (update-tx-value var value tx'))) + )) + +(def: #export (write value var) + {#.doc "Writes value to var."} + (All [a] (-> a (Var a) (STM Any))) + (function (_ tx) + (case (find-var-value var tx) + (#.Some _) + [(update-tx-value var value tx) + []] + + #.None + [(#.Cons [var (read!! var) value] tx) + []]))) + +(structure: #export _ (Functor STM) + (def: (map f fa) + (function (_ tx) + (let [[tx' a] (fa tx)] + [tx' (f a)])))) + +(structure: #export _ (Apply STM) + (def: functor Functor) + + (def: (apply ff fa) + (function (_ tx) + (let [[tx' f] (ff tx) + [tx'' a] (fa tx')] + [tx'' (f a)])))) + +(structure: #export _ (Monad STM) + (def: functor Functor) + + (def: (wrap a) + (function (_ tx) [tx a])) + + (def: (join mma) + (function (_ tx) + (let [[tx' ma] (mma tx)] + (ma tx'))))) + +(def: #export (update f var) + {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} + (All [a] (-> (-> a a) (Var a) (STM [a a]))) + (do Monad + [a (read var) + #let [a' (f a)] + _ (write a' var)] + (wrap [a a']))) + +(def: (can-commit? tx) + (-> Tx Bit) + (list.every? (function (_ [_var _original _current]) + (is? _original (read!! _var))) + tx)) + +(def: (commit-var! [_var _original _current]) + (-> (Ex [a] (Tx-Frame a)) Any) + (if (is? _original _current) + [] + (io.run (write! _current _var)))) + +(def: fresh-tx Tx (list)) + +(type: Commit (Ex [a] [(STM a) (Promise a)])) + +(def: pending-commits + (Atom (Rec Commits (Promise [Commit Commits]))) + (atom (promise #.None))) + +(def: commit-processor-flag + (Atom Bit) + (atom #0)) + +(def: (issue-commit commit) + (-> Commit (IO Any)) + (let [entry [commit (promise #.None)]] + (loop [|commits| (io.run (atom.read pending-commits))] + (case (promise.poll |commits|) + #.None + (do io.Monad + [resolved? (promise.resolve entry |commits|)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|))) + + (#.Some [head tail]) + (recur tail))))) + +(def: (process-commit [stm-proc output]) + (-> [(STM Any) (Promise Any)] Any) + (let [[finished-tx value] (stm-proc fresh-tx)] + (io.run (if (can-commit? finished-tx) + (exec (list/map commit-var! finished-tx) + (promise.resolve value output)) + (issue-commit [stm-proc output]))))) + +(def: init-processor! + (IO Any) + (do io.Monad + [flag (atom.read commit-processor-flag)] + (if flag + (wrap []) + (do @ + [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] + (if was-first? + (exec (|> (io.run (atom.read pending-commits)) + (promise.await (function (recur [head tail]) + (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head)) + (promise.await recur tail)))))) + (wrap [])) + (wrap []))) + ))) + +(def: #export (commit stm-proc) + {#.doc (doc "Commits a transaction and returns its result (asynchronously)." + "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." + "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} + (All [a] (-> (STM a) (Promise a))) + (let [output (promise #.None)] + (exec (io.run init-processor!) + (io.run (issue-commit [stm-proc output])) + output))) diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux new file mode 100644 index 000000000..c03ab7647 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/task.lux @@ -0,0 +1,82 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + [monad (#+ Monad do)] + ["ex" exception (#+ Exception)]] + [data + ["." error (#+ Error)]] + ["." macro + ["s" syntax (#+ syntax: Syntax)]]] + [// + ["." promise (#+ Promise)]]) + +(type: #export (Task a) + (Promise (Error a))) + +(def: #export (fail error) + (All [a] (-> Text (Task a))) + (:: promise.Monad wrap (#error.Error error))) + +(def: #export (throw exception message) + (All [e a] (-> (Exception e) e (Task a))) + (:: promise.Monad wrap + (ex.throw exception message))) + +(def: #export (return value) + (All [a] (-> a (Task a))) + (:: promise.Monad wrap (#error.Success value))) + +(def: #export (try computation) + (All [a] (-> (Task a) (Task (Error a)))) + (:: promise.Functor map (|>> #error.Success) computation)) + +(structure: #export _ (Functor Task) + (def: (map f fa) + (:: promise.Functor map + (function (_ fa') + (case fa' + (#error.Error error) + (#error.Error error) + + (#error.Success a) + (#error.Success (f a)))) + fa))) + +(structure: #export _ (Apply Task) + (def: functor Functor) + + (def: (apply ff fa) + (do promise.Monad + [ff' ff + fa' fa] + (wrap (do error.Monad + [f ff' + a fa'] + (wrap (f a))))))) + +(structure: #export _ (Monad Task) + (def: functor Functor) + + (def: wrap return) + + (def: (join mma) + (do promise.Monad + [mma' mma] + (case mma' + (#error.Error error) + (wrap (#error.Error error)) + + (#error.Success ma) + ma)))) + +(syntax: #export (task {type s.any}) + {#.doc (doc "Makes an uninitialized Task (in this example, of Any)." + (task Any))} + (wrap (list (` (: (..Task (~ type)) + (promise.promise #.None)))))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Task a))) + (:: promise.Functor map (|>> #error.Success) promise)) diff --git a/stdlib/source/lux/test.lux b/stdlib/source/lux/test.lux index b928b1860..ea4e9b6de 100644 --- a/stdlib/source/lux/test.lux +++ b/stdlib/source/lux/test.lux @@ -2,7 +2,10 @@ [lux #* [control ["." monad (#+ do Monad)] - ["p" parser]] + ["p" parser] + [concurrency + ["." process] + ["." promise (#+ Promise)]]] [data ["." product] ["." maybe] @@ -19,9 +22,6 @@ ["." macro (#+ with-gensyms) ["s" syntax (#+ syntax: Syntax)] ["." code]] - [concurrency - ["." process] - ["." promise (#+ Promise)]] ["." io (#+ IO io)]]) ## [Types] diff --git a/stdlib/source/lux/world/console.lux b/stdlib/source/lux/world/console.lux index 209063dfd..5c0aff910 100644 --- a/stdlib/source/lux/world/console.lux +++ b/stdlib/source/lux/world/console.lux @@ -3,6 +3,8 @@ [control [monad (#+ do)] ["ex" exception (#+ exception:)] + [concurrency + ["." promise (#+ Promise)]] [security ["." taint (#+ Dirty taint)] [capability (#+ Capability)]]] @@ -11,8 +13,6 @@ ["." text format]] ["." io (#+ IO Process io)] - [concurrency - ["." promise (#+ Promise)]] [host (#+ import:)] [platform [compiler diff --git a/stdlib/source/lux/world/file.lux b/stdlib/source/lux/world/file.lux index e0975799d..ac033fd89 100644 --- a/stdlib/source/lux/world/file.lux +++ b/stdlib/source/lux/world/file.lux @@ -3,6 +3,8 @@ [control ["." monad (#+ Monad do)] ["ex" exception (#+ Exception exception:)] + [concurrency + ["." promise (#+ Promise)]] [security ["." taint (#+ Dirty taint)] ["." capability (#+ Capability)]]] @@ -20,8 +22,6 @@ [world ["." binary (#+ Binary)]] ["." io (#+ IO) ("io/." Functor)] - [concurrency - ["." promise (#+ Promise)]] [host (#+ import:)] [platform [compiler diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux index add7427cb..f9bde2e2c 100644 --- a/stdlib/source/lux/world/net/tcp.jvm.lux +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -2,12 +2,12 @@ [lux #* [control monad + [concurrency + ["." promise (#+ Promise promise)] + [task (#+ Task)] + ["." frp]] [security ["." taint (#+ Dirty taint)]]] - [concurrency - ["." promise (#+ Promise promise)] - [task (#+ Task)] - ["." frp]] [data ["." error (#+ Error)]] [world diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux index f27ca1c5e..3e9015b56 100644 --- a/stdlib/source/lux/world/net/udp.jvm.lux +++ b/stdlib/source/lux/world/net/udp.jvm.lux @@ -3,11 +3,11 @@ [control monad ["ex" exception (#+ exception:)] + [concurrency + ["." promise (#+ Promise)] + [task (#+ Task)]] [security ["." taint (#+ Dirty taint)]]] - [concurrency - ["." promise (#+ Promise)] - [task (#+ Task)]] [data ["." error (#+ Error)] ["." maybe] diff --git a/stdlib/test/test/lux/concurrency/actor.lux b/stdlib/test/test/lux/concurrency/actor.lux deleted file mode 100644 index a43845380..000000000 --- a/stdlib/test/test/lux/concurrency/actor.lux +++ /dev/null @@ -1,75 +0,0 @@ -(.module: - [lux #* - ["." io (#+ IO io)] - [control - ["M" monad (#+ do Monad)] - ["ex" exception]] - [data - ["e" error] - [text - format]] - [concurrency - ["P" promise ("promise/." Monad)] - ["T" task] - ["&" actor (#+ actor: message:)]]] - lux/test) - -(actor: Counter - Nat - - ((handle message state self) - (do T.Monad - [#let [_ (log! "BEFORE")] - output (message state self) - #let [_ (log! "AFTER")]] - (wrap output))) - - ((stop cause state) - (promise/wrap (log! (if (ex.match? &.poisoned cause) - (format "Counter was poisoned: " (%n state)) - cause))))) - -(message: #export Counter - (count! {increment Nat} state self Nat) - (let [state' (n/+ increment state)] - (T.return [state' state']))) - -(context: "Actors" - ($_ seq - (test "Can check if an actor is alive." - (io.run (do io.Monad - [counter (new@Counter 0)] - (wrap (&.alive? counter))))) - - (test "Can poison actors." - (io.run (do io.Monad - [counter (new@Counter 0) - poisoned? (&.poison counter)] - (wrap (and poisoned? - (not (&.alive? counter))))))) - - (test "Cannot poison an already dead actor." - (io.run (do io.Monad - [counter (new@Counter 0) - first-time (&.poison counter) - second-time (&.poison counter)] - (wrap (and first-time - (not second-time)))))) - - (wrap (do P.Monad - [result (do T.Monad - [#let [counter (io.run (new@Counter 0))] - output-1 (count! 1 counter) - output-2 (count! 1 counter) - output-3 (count! 1 counter)] - (wrap (and (n/= 1 output-1) - (n/= 2 output-2) - (n/= 3 output-3))))] - (assert "Can send messages to actors." - (case result - (#e.Success outcome) - outcome - - (#e.Error error) - #0)))) - )) diff --git a/stdlib/test/test/lux/concurrency/atom.lux b/stdlib/test/test/lux/concurrency/atom.lux deleted file mode 100644 index a10edcae7..000000000 --- a/stdlib/test/test/lux/concurrency/atom.lux +++ /dev/null @@ -1,34 +0,0 @@ -(.module: - [lux #* - ["." io] - [control - ["M" monad (#+ do Monad)]] - [concurrency - ["&" atom]] - [math - ["r" random]]] - lux/test) - -(context: "Atoms" - (<| (times 100) - (do @ - [value r.nat - swap-value r.nat - set-value r.nat - #let [box (&.atom value)]] - ($_ seq - (test "Can obtain the value of an atom." - (n/= value (io.run (&.read box)))) - - (test "Can swap the value of an atom." - (and (io.run (&.compare-and-swap value swap-value box)) - (n/= swap-value (io.run (&.read box))))) - - (test "Can update the value of an atom." - (exec (io.run (&.update inc box)) - (n/= (inc swap-value) (io.run (&.read box))))) - - (test "Can immediately set the value of an atom." - (exec (io.run (&.write set-value box)) - (n/= set-value (io.run (&.read box))))) - )))) diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux deleted file mode 100644 index 46db40889..000000000 --- a/stdlib/test/test/lux/concurrency/frp.lux +++ /dev/null @@ -1,122 +0,0 @@ -(.module: - [lux #* - ["." io (#+ IO io)] - [control - ["." monad (#+ do Monad)]] - [data - ["." number] - [collection - ["." list]]] - [concurrency - ["." promise ("promise/." Monad)] - ["." frp (#+ Channel)] - ["." atom (#+ Atom atom)]]] - lux/test) - -(def: (write! values channel) - (All [a] (-> (List a) (Channel a) (IO Any))) - (do io.Monad - [_ (monad.map @ (frp.publish channel) values)] - (wrap []))) - -(def: (read! channel) - (All [a] (-> (Channel a) (IO (Atom (List a))))) - (do io.Monad - [#let [output (atom (list))] - _ (frp.listen (function (_ value) - ## TODO: Simplify when possible. - (do @ - [_ (atom.update (|>> (#.Cons value)) output)] - (wrap []))) - channel)] - (wrap output))) - -(context: "FRP" - (let [(^open "list/.") (list.Equivalence number.Equivalence)] - ($_ seq - (wrap (do promise.Monad - [#let [values (list +0 +1 +2 +3 +4 +5)] - output (promise.future - (do io.Monad - [#let [input (: (Channel Int) (frp.channel []))] - output (read! input) - _ (write! values input)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can pipe one channel into another." - (list/= values - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [input (: (Channel Int) (frp.channel [])) - elems (frp.filter i/even? input)] - output (read! elems) - _ (write! (list +0 +1 +2 +3 +4 +5) input)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can filter a channel's elements." - (list/= (list +0 +2 +4) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [left (: (Channel Int) (frp.channel [])) - right (: (Channel Int) (frp.channel []))] - merged (frp.merge (list left right)) - output (read! merged) - _ (write! (list +0 +1 +2 +3 +4 +5) left) - _ (write! (list +0 -1 -2 -3 -4 -5) right)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Can merge channels." - (list/= (list +0 +1 +2 +3 +4 +5 +0 -1 -2 -3 -4 -5) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [inputs (: (Channel Int) (frp.channel [])) - mapped (:: frp.Functor map inc inputs)] - output (read! mapped) - _ (write! (list +0 +1 +2 +3 +4 +5) inputs)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Functor goes over every element in a channel." - (list/= (list +1 +2 +3 +4 +5 +6) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (do io.Monad - [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) - >a< (: (Channel Int) (frp.channel []))] - output (read! (let [(^open ".") frp.Apply] - (apply >f< >a<))) - _ (write! (list inc) >f<) - _ (write! (list +12345) >a<)] - (wrap output))) - _ (promise.wait 100) - output (promise.future (atom.read output))] - (assert "Apply works over all channel values." - (list/= (list +12346) - (list.reverse output))))) - - (wrap (do promise.Monad - [output (promise.future - (read! (do frp.Monad - [f (frp.from-promise (promise.delay 100 inc)) - a (frp.from-promise (promise.delay 200 +12345))] - (frp.from-promise (promise.delay 300 (f a)))))) - _ (promise.wait 700) - output (promise.future (atom.read output))] - (assert "Valid monad." - (list/= (list +12346) - (list.reverse output))))) - ))) diff --git a/stdlib/test/test/lux/concurrency/promise.lux b/stdlib/test/test/lux/concurrency/promise.lux deleted file mode 100644 index e857d0708..000000000 --- a/stdlib/test/test/lux/concurrency/promise.lux +++ /dev/null @@ -1,72 +0,0 @@ -(.module: - [lux #* - ["." io] - [control - ["M" monad (#+ do Monad)] - pipe] - [concurrency - ["&" promise ("&/." Monad)]] - [math - ["r" random]]] - lux/test) - -(context: "Promises" - ($_ seq - (wrap (do &.Monad - [running? (&.future (io.io #1))] - (assert "Can run IO actions in separate threads." - running?))) - - (wrap (do &.Monad - [_ (&.wait 500)] - (assert "Can wait for a specified amount of time." - #1))) - - (wrap (do &.Monad - [[left right] (&.and (&.future (io.io #1)) - (&.future (io.io #0)))] - (assert "Can combine promises sequentially." - (and left (not right))))) - - (wrap (do &.Monad - [?left (&.or (&.delay 100 #1) - (&.delay 200 #0)) - ?right (&.or (&.delay 200 #1) - (&.delay 100 #0))] - (assert "Can combine promises alternatively." - (case [?left ?right] - [(#.Left #1) (#.Right #0)] - #1 - - _ - #0)))) - - (wrap (do &.Monad - [?left (&.either (&.delay 100 #1) - (&.delay 200 #0)) - ?right (&.either (&.delay 200 #1) - (&.delay 100 #0))] - (assert "Can combine promises alternatively [Part 2]." - (and ?left (not ?right))))) - - (test "Can poll a promise for its value." - (and (|> (&.poll (&/wrap #1)) - (case> (#.Some #1) #1 _ #0)) - (|> (&.poll (&.delay 200 #1)) - (case> #.None #1 _ #0)))) - - (test "Cannot re-resolve a resolved promise." - (and (not (io.run (&.resolve #0 (&/wrap #1)))) - (io.run (&.resolve #1 (: (&.Promise Bit) (&.promise #.None)))))) - - (wrap (do &.Monad - [?none (&.time-out 100 (&.delay 200 #1)) - ?some (&.time-out 200 (&.delay 100 #1))] - (assert "Can establish maximum waiting times for promises to be fulfilled." - (case [?none ?some] - [#.None (#.Some #1)] - #1 - - _ - #0)))) - )) diff --git a/stdlib/test/test/lux/concurrency/semaphore.lux b/stdlib/test/test/lux/concurrency/semaphore.lux deleted file mode 100644 index f309fcd0c..000000000 --- a/stdlib/test/test/lux/concurrency/semaphore.lux +++ /dev/null @@ -1,143 +0,0 @@ -(.module: - [lux #* - [control - ["." monad (#+ do)]] - [data - ["." maybe] - ["." text ("text/." Equivalence Monoid) - format] - [collection - ["." list ("list/." Functor)]]] - [concurrency - ["/" semaphore] - ["." promise (#+ Promise)] - ["." atom (#+ Atom)]] - ["." io] - [math - ["r" random]]] - lux/test) - -(def: (wait-many-times times semaphore) - (-> Nat /.Semaphore (Promise Any)) - (loop [steps times] - (if (n/> 0 steps) - (do promise.Monad - [_ (/.wait semaphore)] - (recur (dec steps))) - (:: promise.Monad wrap [])))) - -(context: "Semaphore." - (<| (times 100) - (do @ - [open-positions (|> r.nat (:: @ map (|>> (n/% 10) (n/max 1))))] - ($_ seq - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [_ (wait-many-times open-positions semaphore)] - (assert "Can wait on a semaphore up to the number of open positions without blocking." - #1)))) - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [result (<| (promise.time-out 100) - (wait-many-times (inc open-positions) semaphore))] - (assert "Waiting on a semaphore more than the number of open positions blocks the process." - (case result - (#.Some _) - #0 - - #.None - #1))))) - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [_ (: (Promise Any) - (loop [steps (n/* 2 open-positions)] - (if (n/> 0 steps) - (do @ - [_ (/.wait semaphore) - _ (/.signal semaphore)] - (recur (dec steps))) - (wrap []))))] - (assert "Signaling a semaphore replenishes its open positions." - #1)))) - (let [semaphore (/.semaphore open-positions)] - (wrap (do promise.Monad - [#let [resource (atom.atom "") - blocked (do @ - [_ (wait-many-times open-positions semaphore) - _ (/.wait semaphore) - #let [_ (io.run (atom.update (|>> (format "B")) - resource))]] - (wrap []))] - _ (promise.wait 100) - _ (exec (io.run (atom.update (|>> (format "A")) - resource)) - (/.signal semaphore)) - _ blocked] - (assert "A blocked process can be un-blocked by a signal somewhere else." - (text/= "BA" - (io.run (atom.read resource))))))) - )))) - -(context: "Mutex." - (<| (times 100) - (do @ - [repetitions (|> r.nat (:: @ map (|>> (n/% 100) (n/max 10))))] - ($_ seq - (let [mutex (/.mutex [])] - (wrap (do promise.Monad - [#let [resource (atom.atom "") - expected-As (text.join-with "" (list.repeat repetitions "A")) - expected-Bs (text.join-with "" (list.repeat repetitions "B")) - processA (<| (/.synchronize mutex) - io.io - promise.future - (do io.Monad - [_ (<| (monad.seq @) - (list.repeat repetitions) - (atom.update (|>> (format "A")) resource))] - (wrap []))) - processB (<| (/.synchronize mutex) - io.io - promise.future - (do io.Monad - [_ (<| (monad.seq @) - (list.repeat repetitions) - (atom.update (|>> (format "B")) resource))] - (wrap [])))] - _ processA - _ processB - #let [outcome (io.run (atom.read resource))]] - (assert "Mutexes only allow one process to execute at a time." - (or (text/= (format expected-As expected-Bs) - outcome) - (text/= (format expected-Bs expected-As) - outcome)))))) - )))) - -(def: (waiter resource barrier id) - (-> (Atom Text) /.Barrier Nat (Promise Any)) - (do promise.Monad - [_ (/.block barrier) - #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]] - (wrap []))) - -(context: "Barrier." - (let [limit 10 - barrier (/.barrier (maybe.assume (/.limit limit))) - resource (atom.atom "")] - ($_ seq - (wrap (do promise.Monad - [#let [ids (list.n/range 0 (dec limit)) - waiters (list/map (function (_ id) - (let [process (waiter resource barrier id)] - (exec (io.run (atom.update (|>> (format "_")) resource)) - process))) - ids)] - _ (monad.seq @ waiters) - #let [outcome (io.run (atom.read resource))]] - (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all." - (and (text.ends-with? "__________" outcome) - (list.every? (function (_ id) - (text.contains? (%n id) outcome)) - ids) - ))))))) diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux deleted file mode 100644 index ee84f193e..000000000 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ /dev/null @@ -1,77 +0,0 @@ -(.module: - [lux #* - ["." io (#+ IO)] - [control - ["M" monad (#+ do Monad)]] - [data - ["." number] - [collection - ["." list ("list/." Functor)]]] - [concurrency - ["." atom (#+ Atom atom)] - ["&" stm] - ["." process] - ["." promise] - ["." frp (#+ Channel)]] - [math - ["r" random]]] - lux/test) - -(def: (read! channel) - (All [a] (-> (Channel a) (IO (Atom (List a))))) - (do io.Monad - [#let [output (atom (list))] - _ (frp.listen (function (_ value) - ## TODO: Simplify when possible. - (do @ - [_ (atom.update (|>> (#.Cons value)) output)] - (wrap []))) - channel)] - (wrap output))) - -(def: iterations-per-process Nat 100) - -(context: "STM" - ($_ seq - (wrap (do promise.Monad - [output (&.commit (&.read (&.var 0)))] - (assert "Can read STM vars." - (n/= 0 output)))) - (wrap (do promise.Monad - [#let [_var (&.var 0)] - output (&.commit (do &.Monad - [_ (&.write 5 _var)] - (&.read _var)))] - (assert "Can write STM vars." - (n/= 5 output)))) - (wrap (do promise.Monad - [#let [_var (&.var 5)] - output (&.commit (do &.Monad - [_ (&.update (n/* 3) _var)] - (&.read _var)))] - (assert "Can update STM vars." - (n/= 15 output)))) - (wrap (do promise.Monad - [#let [_var (&.var 0) - changes (io.run (read! (io.run (&.follow _var))))] - _ (&.commit (&.write 5 _var)) - _ (&.commit (&.update (n/* 3) _var)) - changes (promise.future (atom.read changes))] - (assert "Can follow all the changes to STM vars." - (:: (list.Equivalence number.Equivalence) = - (list 5 15) - (list.reverse changes))))) - (wrap (let [_concurrency-var (&.var 0)] - (do promise.Monad - [_ (|> process.parallelism - (list.n/range 1) - (list/map (function (_ _) - (|> iterations-per-process - (list.n/range 1) - (M.map @ (function (_ _) (&.commit (&.update inc _concurrency-var))))))) - (M.seq @)) - last-val (&.commit (&.read _concurrency-var))] - (assert "Can modify STM vars concurrently from multiple threads." - (|> process.parallelism - (n/* iterations-per-process) - (n/= last-val)))))))) diff --git a/stdlib/test/test/lux/control/concurrency/actor.lux b/stdlib/test/test/lux/control/concurrency/actor.lux new file mode 100644 index 000000000..90c3d6dd4 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/actor.lux @@ -0,0 +1,75 @@ +(.module: + [lux #* + ["." io (#+ IO io)] + [control + ["M" monad (#+ do Monad)] + ["ex" exception] + [concurrency + ["P" promise ("promise/." Monad)] + ["T" task] + ["&" actor (#+ actor: message:)]]] + [data + ["e" error] + [text + format]]] + lux/test) + +(actor: Counter + Nat + + ((handle message state self) + (do T.Monad + [#let [_ (log! "BEFORE")] + output (message state self) + #let [_ (log! "AFTER")]] + (wrap output))) + + ((stop cause state) + (promise/wrap (log! (if (ex.match? &.poisoned cause) + (format "Counter was poisoned: " (%n state)) + cause))))) + +(message: #export Counter + (count! {increment Nat} state self Nat) + (let [state' (n/+ increment state)] + (T.return [state' state']))) + +(context: "Actors" + ($_ seq + (test "Can check if an actor is alive." + (io.run (do io.Monad + [counter (new@Counter 0)] + (wrap (&.alive? counter))))) + + (test "Can poison actors." + (io.run (do io.Monad + [counter (new@Counter 0) + poisoned? (&.poison counter)] + (wrap (and poisoned? + (not (&.alive? counter))))))) + + (test "Cannot poison an already dead actor." + (io.run (do io.Monad + [counter (new@Counter 0) + first-time (&.poison counter) + second-time (&.poison counter)] + (wrap (and first-time + (not second-time)))))) + + (wrap (do P.Monad + [result (do T.Monad + [#let [counter (io.run (new@Counter 0))] + output-1 (count! 1 counter) + output-2 (count! 1 counter) + output-3 (count! 1 counter)] + (wrap (and (n/= 1 output-1) + (n/= 2 output-2) + (n/= 3 output-3))))] + (assert "Can send messages to actors." + (case result + (#e.Success outcome) + outcome + + (#e.Error error) + #0)))) + )) diff --git a/stdlib/test/test/lux/control/concurrency/atom.lux b/stdlib/test/test/lux/control/concurrency/atom.lux new file mode 100644 index 000000000..720547e27 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/atom.lux @@ -0,0 +1,34 @@ +(.module: + [lux #* + ["." io] + [control + ["M" monad (#+ do Monad)] + [concurrency + ["&" atom]]] + [math + ["r" random]]] + lux/test) + +(context: "Atoms" + (<| (times 100) + (do @ + [value r.nat + swap-value r.nat + set-value r.nat + #let [box (&.atom value)]] + ($_ seq + (test "Can obtain the value of an atom." + (n/= value (io.run (&.read box)))) + + (test "Can swap the value of an atom." + (and (io.run (&.compare-and-swap value swap-value box)) + (n/= swap-value (io.run (&.read box))))) + + (test "Can update the value of an atom." + (exec (io.run (&.update inc box)) + (n/= (inc swap-value) (io.run (&.read box))))) + + (test "Can immediately set the value of an atom." + (exec (io.run (&.write set-value box)) + (n/= set-value (io.run (&.read box))))) + )))) diff --git a/stdlib/test/test/lux/control/concurrency/frp.lux b/stdlib/test/test/lux/control/concurrency/frp.lux new file mode 100644 index 000000000..04ddd5986 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/frp.lux @@ -0,0 +1,122 @@ +(.module: + [lux #* + ["." io (#+ IO io)] + [control + ["." monad (#+ do Monad)] + [concurrency + ["." promise ("promise/." Monad)] + ["." frp (#+ Channel)] + ["." atom (#+ Atom atom)]]] + [data + ["." number] + [collection + ["." list]]]] + lux/test) + +(def: (write! values channel) + (All [a] (-> (List a) (Channel a) (IO Any))) + (do io.Monad + [_ (monad.map @ (frp.publish channel) values)] + (wrap []))) + +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function (_ value) + ## TODO: Simplify when possible. + (do @ + [_ (atom.update (|>> (#.Cons value)) output)] + (wrap []))) + channel)] + (wrap output))) + +(context: "FRP" + (let [(^open "list/.") (list.Equivalence number.Equivalence)] + ($_ seq + (wrap (do promise.Monad + [#let [values (list +0 +1 +2 +3 +4 +5)] + output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel []))] + output (read! input) + _ (write! values input)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Can pipe one channel into another." + (list/= values + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [input (: (Channel Int) (frp.channel [])) + elems (frp.filter i/even? input)] + output (read! elems) + _ (write! (list +0 +1 +2 +3 +4 +5) input)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Can filter a channel's elements." + (list/= (list +0 +2 +4) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [left (: (Channel Int) (frp.channel [])) + right (: (Channel Int) (frp.channel []))] + merged (frp.merge (list left right)) + output (read! merged) + _ (write! (list +0 +1 +2 +3 +4 +5) left) + _ (write! (list +0 -1 -2 -3 -4 -5) right)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Can merge channels." + (list/= (list +0 +1 +2 +3 +4 +5 +0 -1 -2 -3 -4 -5) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [inputs (: (Channel Int) (frp.channel [])) + mapped (:: frp.Functor map inc inputs)] + output (read! mapped) + _ (write! (list +0 +1 +2 +3 +4 +5) inputs)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Functor goes over every element in a channel." + (list/= (list +1 +2 +3 +4 +5 +6) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (do io.Monad + [#let [>f< (: (Channel (-> Int Int)) (frp.channel [])) + >a< (: (Channel Int) (frp.channel []))] + output (read! (let [(^open ".") frp.Apply] + (apply >f< >a<))) + _ (write! (list inc) >f<) + _ (write! (list +12345) >a<)] + (wrap output))) + _ (promise.wait 100) + output (promise.future (atom.read output))] + (assert "Apply works over all channel values." + (list/= (list +12346) + (list.reverse output))))) + + (wrap (do promise.Monad + [output (promise.future + (read! (do frp.Monad + [f (frp.from-promise (promise.delay 100 inc)) + a (frp.from-promise (promise.delay 200 +12345))] + (frp.from-promise (promise.delay 300 (f a)))))) + _ (promise.wait 700) + output (promise.future (atom.read output))] + (assert "Valid monad." + (list/= (list +12346) + (list.reverse output))))) + ))) diff --git a/stdlib/test/test/lux/control/concurrency/promise.lux b/stdlib/test/test/lux/control/concurrency/promise.lux new file mode 100644 index 000000000..d666f3b31 --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/promise.lux @@ -0,0 +1,72 @@ +(.module: + [lux #* + ["." io] + [control + ["M" monad (#+ do Monad)] + pipe + [concurrency + ["&" promise ("&/." Monad)]]] + [math + ["r" random]]] + lux/test) + +(context: "Promises" + ($_ seq + (wrap (do &.Monad + [running? (&.future (io.io #1))] + (assert "Can run IO actions in separate threads." + running?))) + + (wrap (do &.Monad + [_ (&.wait 500)] + (assert "Can wait for a specified amount of time." + #1))) + + (wrap (do &.Monad + [[left right] (&.and (&.future (io.io #1)) + (&.future (io.io #0)))] + (assert "Can combine promises sequentially." + (and left (not right))))) + + (wrap (do &.Monad + [?left (&.or (&.delay 100 #1) + (&.delay 200 #0)) + ?right (&.or (&.delay 200 #1) + (&.delay 100 #0))] + (assert "Can combine promises alternatively." + (case [?left ?right] + [(#.Left #1) (#.Right #0)] + #1 + + _ + #0)))) + + (wrap (do &.Monad + [?left (&.either (&.delay 100 #1) + (&.delay 200 #0)) + ?right (&.either (&.delay 200 #1) + (&.delay 100 #0))] + (assert "Can combine promises alternatively [Part 2]." + (and ?left (not ?right))))) + + (test "Can poll a promise for its value." + (and (|> (&.poll (&/wrap #1)) + (case> (#.Some #1) #1 _ #0)) + (|> (&.poll (&.delay 200 #1)) + (case> #.None #1 _ #0)))) + + (test "Cannot re-resolve a resolved promise." + (and (not (io.run (&.resolve #0 (&/wrap #1)))) + (io.run (&.resolve #1 (: (&.Promise Bit) (&.promise #.None)))))) + + (wrap (do &.Monad + [?none (&.time-out 100 (&.delay 200 #1)) + ?some (&.time-out 200 (&.delay 100 #1))] + (assert "Can establish maximum waiting times for promises to be fulfilled." + (case [?none ?some] + [#.None (#.Some #1)] + #1 + + _ + #0)))) + )) diff --git a/stdlib/test/test/lux/control/concurrency/semaphore.lux b/stdlib/test/test/lux/control/concurrency/semaphore.lux new file mode 100644 index 000000000..5c09f4bac --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/semaphore.lux @@ -0,0 +1,143 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + [concurrency + ["/" semaphore] + ["." promise (#+ Promise)] + ["." atom (#+ Atom)]]] + [data + ["." maybe] + ["." text ("text/." Equivalence Monoid) + format] + [collection + ["." list ("list/." Functor)]]] + ["." io] + [math + ["r" random]]] + lux/test) + +## (def: (wait-many-times times semaphore) +## (-> Nat /.Semaphore (Promise Any)) +## (loop [steps times] +## (if (n/> 0 steps) +## (do promise.Monad +## [_ (/.wait semaphore)] +## (recur (dec steps))) +## (:: promise.Monad wrap [])))) + +## (context: "Semaphore." +## (<| (times 100) +## (do @ +## [open-positions (|> r.nat (:: @ map (|>> (n/% 10) (n/max 1))))] +## ($_ seq +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [_ (wait-many-times open-positions semaphore)] +## (assert "Can wait on a semaphore up to the number of open positions without blocking." +## true)))) +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [result (<| (promise.time-out 100) +## (wait-many-times (inc open-positions) semaphore))] +## (assert "Waiting on a semaphore more than the number of open positions blocks the process." +## (case result +## (#.Some _) +## false + +## #.None +## true))))) +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [_ (: (Promise Any) +## (loop [steps (n/* 2 open-positions)] +## (if (n/> 0 steps) +## (do @ +## [_ (/.wait semaphore) +## _ (/.signal semaphore)] +## (recur (dec steps))) +## (wrap []))))] +## (assert "Signaling a semaphore replenishes its open positions." +## true)))) +## (let [semaphore (/.semaphore open-positions)] +## (wrap (do promise.Monad +## [#let [resource (atom.atom "") +## blocked (do @ +## [_ (wait-many-times open-positions semaphore) +## _ (/.wait semaphore) +## #let [_ (io.run (atom.update (|>> (format "B")) +## resource))]] +## (wrap []))] +## _ (promise.wait 100) +## _ (exec (io.run (atom.update (|>> (format "A")) +## resource)) +## (/.signal semaphore)) +## _ blocked] +## (assert "A blocked process can be un-blocked by a signal somewhere else." +## (text/= "BA" +## (io.run (atom.read resource))))))) +## )))) + +## (context: "Mutex." +## (<| (times 100) +## (do @ +## [repetitions (|> r.nat (:: @ map (|>> (n/% 100) (n/max 10))))] +## ($_ seq +## (let [mutex (/.mutex [])] +## (wrap (do promise.Monad +## [#let [resource (atom.atom "") +## expected-As (text.join-with "" (list.repeat repetitions "A")) +## expected-Bs (text.join-with "" (list.repeat repetitions "B")) +## processA (<| (/.synchronize mutex) +## io.io +## promise.future +## (do io.Monad +## [_ (<| (monad.seq @) +## (list.repeat repetitions) +## (atom.update (|>> (format "A")) resource))] +## (wrap []))) +## processB (<| (/.synchronize mutex) +## io.io +## promise.future +## (do io.Monad +## [_ (<| (monad.seq @) +## (list.repeat repetitions) +## (atom.update (|>> (format "B")) resource))] +## (wrap [])))] +## _ processA +## _ processB +## #let [outcome (io.run (atom.read resource))]] +## (assert "Mutexes only allow one process to execute at a time." +## (or (text/= (format expected-As expected-Bs) +## outcome) +## (text/= (format expected-Bs expected-As) +## outcome)))))) +## )))) + +## (def: (waiter resource barrier id) +## (-> (Atom Text) /.Barrier Nat (Promise Any)) +## (do promise.Monad +## [_ (/.block barrier) +## #let [_ (io.run (atom.update (|>> (format (%n id))) resource))]] +## (wrap []))) + +## (context: "Barrier." +## (let [limit 10 +## barrier (/.barrier (maybe.assume (/.limit limit))) +## resource (atom.atom "")] +## ($_ seq +## (wrap (do promise.Monad +## [#let [ids (list.n/range 0 (dec limit)) +## waiters (list/map (function (_ id) +## (let [process (waiter resource barrier id)] +## (exec (io.run (atom.update (|>> (format "_")) resource)) +## process))) +## ids)] +## _ (monad.seq @ waiters) +## #let [outcome (io.run (atom.read resource))]] +## (assert "A barrier can stop all processes from acting, until an amount of waiting processes is reached, and then the barrier is un-blocked for all." +## (and (text.ends-with? "__________" outcome) +## (list.every? (function (_ id) +## (text.contains? (%n id) outcome)) +## ids) +## ))))))) diff --git a/stdlib/test/test/lux/control/concurrency/stm.lux b/stdlib/test/test/lux/control/concurrency/stm.lux new file mode 100644 index 000000000..c84ce44cc --- /dev/null +++ b/stdlib/test/test/lux/control/concurrency/stm.lux @@ -0,0 +1,77 @@ +(.module: + [lux #* + ["." io (#+ IO)] + [control + ["M" monad (#+ do Monad)] + [concurrency + ["." atom (#+ Atom atom)] + ["&" stm] + ["." process] + ["." promise] + ["." frp (#+ Channel)]]] + [data + ["." number] + [collection + ["." list ("list/." Functor)]]] + [math + ["r" random]]] + lux/test) + +(def: (read! channel) + (All [a] (-> (Channel a) (IO (Atom (List a))))) + (do io.Monad + [#let [output (atom (list))] + _ (frp.listen (function (_ value) + ## TODO: Simplify when possible. + (do @ + [_ (atom.update (|>> (#.Cons value)) output)] + (wrap []))) + channel)] + (wrap output))) + +(def: iterations-per-process Nat 100) + +(context: "STM" + ($_ seq + (wrap (do promise.Monad + [output (&.commit (&.read (&.var 0)))] + (assert "Can read STM vars." + (n/= 0 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 0)] + output (&.commit (do &.Monad + [_ (&.write 5 _var)] + (&.read _var)))] + (assert "Can write STM vars." + (n/= 5 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 5)] + output (&.commit (do &.Monad + [_ (&.update (n/* 3) _var)] + (&.read _var)))] + (assert "Can update STM vars." + (n/= 15 output)))) + (wrap (do promise.Monad + [#let [_var (&.var 0) + changes (io.run (read! (io.run (&.follow _var))))] + _ (&.commit (&.write 5 _var)) + _ (&.commit (&.update (n/* 3) _var)) + changes (promise.future (atom.read changes))] + (assert "Can follow all the changes to STM vars." + (:: (list.Equivalence number.Equivalence) = + (list 5 15) + (list.reverse changes))))) + (wrap (let [_concurrency-var (&.var 0)] + (do promise.Monad + [_ (|> process.parallelism + (list.n/range 1) + (list/map (function (_ _) + (|> iterations-per-process + (list.n/range 1) + (M.map @ (function (_ _) (&.commit (&.update inc _concurrency-var))))))) + (M.seq @)) + last-val (&.commit (&.read _concurrency-var))] + (assert "Can modify STM vars concurrently from multiple threads." + (|> process.parallelism + (n/* iterations-per-process) + (n/= last-val)))))))) -- cgit v1.2.3