diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 377 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 59 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 132 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/process.lux | 110 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 174 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 149 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 245 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/task.lux | 82 |
8 files changed, 1328 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux new file mode 100644 index 000000000..0af0d09f9 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -0,0 +1,377 @@ +(.module: {#.doc "The actor model of concurrency."} + [lux #* + [control monad + ["p" parser] + ["ex" exception (#+ exception:)]] + ["." io (#- run) ("io/." Monad<IO>)] + [data + ["." product] + ["e" error] + [text + format] + [collection + ["." list ("list/." Monoid<List> Monad<List> Fold<List>)]]] + ["." macro (#+ with-gensyms Monad<Meta>) + ["." code] + ["s" syntax (#+ syntax: Syntax)] + [syntax + ["cs" common] + [common + ["csr" reader] + ["csw" writer]]]] + [type + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise promise) ("promise/." Monad<Promise>)] + ["." task (#+ Task)]]) + +(exception: #export poisoned) + +(exception: #export (dead {actor-name Text} + {message-name Text}) + (ex.report ["Actor" actor-name] + ["Message" message-name])) + +## [Types] +(with-expansions + [<Message> (as-is (-> s (Actor s) (Task s))) + <Obituary> (as-is [Text s (List <Message>)]) + <Mailbox> (as-is (Rec Mailbox (Promise [<Message> Mailbox])))] + + (def: (obituary mailbox) + (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a))) + (case (promise.poll mailbox) + (#.Some [head tail]) + (#.Cons head (obituary tail)) + + #.None + #.Nil)) + + (abstract: #export (Actor s) + {#.doc "An actor, defined as all the necessities it requires."} + {#mailbox (Atom <Mailbox>) + #obituary (Promise <Obituary>)} + + ## TODO: Delete after new-luxc becomes the new standard compiler. + (def: (actor mailbox obituary) + (All [s] (-> (Atom <Mailbox>) (Promise <Obituary>) (Actor s))) + (:abstraction {#mailbox mailbox + #obituary obituary})) + + (type: #export (Message s) + <Message>) + + (type: #export (Obituary s) + <Obituary>) + + (type: #export (Behavior s) + {#.doc "An actor's behavior when messages are received."} + {#handle (-> (Message s) s (Actor s) (Task s)) + #end (-> Text s (Promise Any))}) + + (def: #export (spawn behavior init) + {#.doc "Given a behavior and initial state, spawns an actor and returns it."} + (All [s] (-> (Behavior s) s (IO (Actor s)))) + (io (let [[handle end] behavior + self (actor (atom (promise #.None)) + (promise #.None)) + process (loop [state init + |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))] + (do promise.Monad<Promise> + [[head tail] |mailbox| + ?state' (handle head state self)] + (case ?state' + (#e.Error error) + (do @ + [_ (end error state)] + (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))] + (get@ #obituary (:representation self)))) + (wrap []))) + + (#e.Success state') + (recur state' tail))))] + self))) + + (def: #export (alive? actor) + (All [s] (-> (Actor s) Bit)) + (case (promise.poll (get@ #obituary (:representation actor))) + #.None + #1 + + _ + #0)) + + (def: #export (send message actor) + {#.doc "Communicate with an actor through message passing."} + (All [s] (-> (Message s) (Actor s) (IO Bit))) + (if (alive? actor) + (let [entry [message (promise #.None)]] + (do Monad<IO> + [|mailbox| (atom.read (get@ #mailbox (:representation actor)))] + (loop [|mailbox| |mailbox|] + (case (promise.poll |mailbox|) + #.None + (do @ + [resolved? (promise.resolve entry |mailbox|)] + (if resolved? + (do @ + [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] + (wrap #1)) + (recur |mailbox|))) + + (#.Some [_ |mailbox|']) + (recur |mailbox|'))))) + (io/wrap #0))) + )) + +## [Values] +(def: (default-handle message state self) + (All [s] (-> (Message s) s (Actor s) (Task s))) + (message state self)) + +(def: (default-end cause state) + (All [s] (-> Text s (Promise Any))) + (promise/wrap [])) + +(def: #export default-behavior + (All [s] (Behavior s)) + {#handle default-handle + #end default-end}) + +(def: #export (poison actor) + {#.doc (doc "Kills the actor by sending a message that will kill it upon processing," + "but allows the actor to handle previous messages.")} + (All [s] (-> (Actor s) (IO Bit))) + (send (function (_ state self) + (task.throw poisoned [])) + actor)) + +## [Syntax] +(do-template [<with> <resolve> <tag> <desc>] + [(def: #export (<with> name) + (-> Name cs.Annotations cs.Annotations) + (|>> (#.Cons [(name-of <tag>) + (code.tag name)]))) + + (def: #export (<resolve> name) + (-> Name (Meta Name)) + (do Monad<Meta> + [[_ annotations _] (macro.find-def name)] + (case (macro.get-tag-ann (name-of <tag>) annotations) + (#.Some actor-name) + (wrap actor-name) + + _ + (macro.fail (format "Definition is not " <desc> ".")))))] + + [with-actor resolve-actor #..actor "an actor"] + [with-message resolve-message #..message "a message"] + ) + +(def: actor-decl^ + (Syntax [Text (List Text)]) + (p.either (s.form (p.and s.local-identifier (p.some s.local-identifier))) + (p.and s.local-identifier (:: p.Monad<Parser> wrap (list))))) + +(do-template [<name> <desc>] + [(def: #export <name> + (-> Text Text) + (|>> (format <desc> "@")))] + + [state-name "State"] + [behavior-name "Behavior"] + [new-name "new"] + ) + +(type: HandleC + [[Text Text Text] Code]) + +(type: StopC + [[Text Text] Code]) + +(type: BehaviorC + [(Maybe HandleC) (Maybe StopC)]) + +(def: behavior^ + (s.Syntax BehaviorC) + (let [handle-args ($_ p.and s.local-identifier s.local-identifier s.local-identifier) + stop-args ($_ p.and s.local-identifier s.local-identifier)] + (p.and (p.maybe (s.form (p.and (s.form (p.after (s.this (' handle)) handle-args)) + s.any))) + (p.maybe (s.form (p.and (s.form (p.after (s.this (' stop)) stop-args)) + s.any)))))) + +(syntax: #export (actor: + {export csr.export} + {[_name _vars] actor-decl^} + {annotations (p.default cs.empty-annotations csr.annotations)} + state-type + {[?handle ?stop] behavior^}) + {#.doc (doc "Defines an actor, with its behavior and internal state." + (actor: #export Counter + Nat + + ((stop cause state) + (:: promise.Monad<Promise> wrap + (log! (if (ex.match? ..poisoned cause) + (format "Counter was poisoned: " (%n state)) + cause))))) + + (actor: #export (Stack a) + (List a) + + ((handle message state self) + (do task.Monad<Task> + [#let [_ (log! "BEFORE")] + output (message state self) + #let [_ (log! "AFTER")]] + (wrap output)))))} + (with-gensyms [g!_ g!init] + (do @ + [module macro.current-module-name + #let [g!type (code.local-identifier (state-name _name)) + g!behavior (code.local-identifier (behavior-name _name)) + g!actor (code.local-identifier _name) + g!new (code.local-identifier (new-name _name)) + g!vars (list/map code.local-identifier _vars)]] + (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars)) + (~ state-type))) + (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars)) + (~ (|> annotations + (with-actor [module _name]) + csw.annotations)) + (..Actor ((~ g!type) (~+ g!vars))))) + (` (def: (~+ (csw.export export)) (~ g!behavior) + (All [(~+ g!vars)] + (..Behavior ((~ g!type) (~+ g!vars)))) + {#..handle (~ (case ?handle + #.None + (` (~! ..default-handle)) + + (#.Some [[messageN stateN selfN] bodyC]) + (` (function ((~ g!_) + (~ (code.local-identifier messageN)) + (~ (code.local-identifier stateN)) + (~ (code.local-identifier selfN))) + (do task.Monad<Task> + [] + (~ bodyC)))))) + #..end (~ (case ?stop + #.None + (` (~! ..default-end)) + + (#.Some [[causeN stateN] bodyC]) + (` (function ((~ g!_) + (~ (code.local-identifier causeN)) + (~ (code.local-identifier stateN))) + (do promise.Monad<Promise> + [] + (~ bodyC))))))})) + (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init)) + (All [(~+ g!vars)] + (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars))))) + (..spawn (~ g!behavior) (~ g!init)))))) + ))) + +(type: Signature + {#vars (List Text) + #name Text + #inputs (List cs.Typed-Input) + #state Text + #self Text + #output Code}) + +(def: signature^ + (s.Syntax Signature) + (s.form ($_ p.and + (p.default (list) (s.tuple (p.some s.local-identifier))) + s.local-identifier + (p.some csr.typed-input) + s.local-identifier + s.local-identifier + s.any))) + +(def: reference^ + (s.Syntax [Name (List Text)]) + (p.either (s.form (p.and s.identifier (p.some s.local-identifier))) + (p.and s.identifier (:: p.Monad<Parser> wrap (list))))) + +(syntax: #export (message: + {export csr.export} + {[actor-name actor-vars] reference^} + {signature signature^} + {annotations (p.default cs.empty-annotations csr.annotations)} + body) + {#.doc (doc "A message can access the actor's state through the state parameter." + "A message can also access the actor itself through the self parameter." + "A message's output must be a task containing a 2-tuple with the updated state and a return value." + "A message may succeed or fail (in case of failure, the actor dies)." + + (message: #export Counter + (count! [increment Nat] state self Nat) + (let [state' (n/+ increment state)] + (task.return [state' state']))) + + (message: #export (Stack a) + (push [value a] state self (List a)) + (let [state' (#.Cons value state)] + (task.return [state' state']))))} + (with-gensyms [g!_ g!return g!error g!task g!sent?] + (do @ + [current-module macro.current-module-name + actor-name (resolve-actor actor-name) + #let [message-name [current-module (get@ #name signature)] + g!type (code.identifier (product.both id state-name actor-name)) + g!message (code.local-identifier (get@ #name signature)) + g!actor-vars (list/map code.local-identifier actor-vars) + actorC (` ((~ (code.identifier actor-name)) (~+ g!actor-vars))) + g!all-vars (|> (get@ #vars signature) (list/map code.local-identifier) (list/compose g!actor-vars)) + g!inputsC (|> (get@ #inputs signature) (list/map product.left)) + g!inputsT (|> (get@ #inputs signature) (list/map product.right)) + g!state (|> signature (get@ #state) code.local-identifier) + g!self (|> signature (get@ #self) code.local-identifier) + g!actor-refs (: (List Code) + (if (list.empty? actor-vars) + (list) + (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`)))))) + ref-replacements (|> (if (list.empty? actor-vars) + (list) + (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`))))) + (: (List Code)) + (list.zip2 g!all-vars) + (: (List [Code Code]))) + g!outputT (list/fold (function (_ [g!var g!ref] outputT) + (code.replace g!var g!ref outputT)) + (get@ #output signature) + ref-replacements)]] + (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self)) + (~ (|> annotations + (with-message actor-name) + csw.annotations)) + (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature))))) + (let [(~ g!task) (task.task (~ g!outputT))] + (io.run (do io.Monad<IO> + [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self)) + (do promise.Monad<Promise> + [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs)) + (~ g!outputT)]) + (do task.Monad<Task> + [] + (~ body)))] + (case (~ g!return) + (#.Right [(~ g!state) (~ g!return)]) + (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task))) + (task.return (~ g!state))) + + (#.Left (~ g!error)) + (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task))) + (task.fail (~ g!error)))) + )) + (~ g!self))] + (if (~ g!sent?) + ((~' wrap) (~ g!task)) + ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name))) + (~ (code.text (%name message-name)))])))))))) + )) + ))) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux new file mode 100644 index 000000000..b1692b6e3 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -0,0 +1,59 @@ +(.module: + [lux #* + [control + [monad (#+ do)]] + ["." function] + ["." io (#- run)] + [type + abstract] + [platform + [compiler + ["." host]]] + [host (#+ import:)]]) + +(`` (for {(~~ (static host.jvm)) + (import: (java/util/concurrent/atomic/AtomicReference a) + (new [a]) + (get [] a) + (compareAndSet [a a] boolean))})) + +(`` (abstract: #export (Atom a) + {#.doc "Atomic references that are safe to mutate concurrently."} + + (for {(~~ (static host.jvm)) + (AtomicReference a)}) + + (def: #export (atom value) + (All [a] (-> a (Atom a))) + (:abstraction (for {(~~ (static host.jvm)) + (AtomicReference::new value)}))) + + (def: #export (read atom) + (All [a] (-> (Atom a) (IO a))) + (io (for {(~~ (static host.jvm)) + (AtomicReference::get (:representation atom))}))) + + (def: #export (compare-and-swap current new atom) + {#.doc (doc "Only mutates an atom if you can present it's current value." + "That guarantees that atom was not updated since you last read from it.")} + (All [a] (-> a a (Atom a) (IO Bit))) + (io (AtomicReference::compareAndSet current new (:representation atom)))) + )) + +(def: #export (update f atom) + {#.doc (doc "Updates an atom by applying a function to its current value." + "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds." + "The retries will be done with the new values of the atom, as they show up.")} + (All [a] (-> (-> a a) (Atom a) (IO a))) + (loop [_ []] + (do io.Monad<IO> + [old (read atom) + #let [new (f old)] + swapped? (compare-and-swap old new atom)] + (if swapped? + (wrap new) + (recur []))))) + +(def: #export (write value atom) + (All [a] (-> a (Atom a) (IO Any))) + (update (function.constant value) atom)) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux new file mode 100644 index 000000000..8db54f28f --- /dev/null +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -0,0 +1,132 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + ["." io (#+ IO io)] + [data + [collection + [list ("list/." Monoid<List>)]]] + [type (#+ :share) + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise)]]) + +(abstract: #export (Channel a) + {#.doc "An asynchronous channel to distribute values."} + (Atom (List (-> a (IO Any)))) + + (def: #export (channel _) + (All [a] (-> Any (Channel a))) + (:abstraction (atom (list)))) + + (def: #export (listen listener channel) + (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) + ## TODO: Simplify when possible. + (do io.Monad<IO> + [_ (atom.update (|>> (#.Cons listener)) + (:representation channel))] + (wrap []))) + + (def: #export (publish channel value) + {#.doc "Publish to a channel."} + (All [a] (-> (Channel a) a (IO Any))) + (do io.Monad<IO> + [listeners (atom.read (:representation channel))] + (monad.map @ (function (_ listener) (listener value)) listeners))) + ) + +(def: #export (filter predicate input) + (All [a] (-> (-> a Bit) (Channel a) (Channel a))) + (let [output (channel [])] + (exec (io.run (listen (function (_ value) + (if (predicate value) + (publish output value) + (io []))) + input)) + output))) + +(def: #export (pipe output input) + {#.doc "Copy/pipe the contents of a channel on to another."} + (All [a] (-> (Channel a) (Channel a) (IO Any))) + (listen (publish output) + input)) + +(def: #export (merge inputs) + {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} + (All [a] (-> (List (Channel a)) (IO (Channel a)))) + (let [output (channel [])] + (do io.Monad<IO> + [_ (monad.map @ (pipe output) inputs)] + (wrap output)))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Channel a))) + (let [output (channel [])] + (exec (promise.await (publish output) promise) + output))) + +(def: #export (poll time action) + (All [a] (-> Nat (IO a) (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad<IO> + [value action + _ (publish output value)] + (wrap (promise.await recur (promise.wait time)))))) + output))) + +(def: #export (periodic time) + (-> Nat (Channel Any)) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad<IO> + [_ (publish output [])] + (wrap (promise.await recur (promise.wait time)))))) + output))) + +(def: #export (iterate f init) + (All [a] (-> (-> a (Promise a)) a (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [zero init] + (do io.Monad<IO> + [_ (publish output zero)] + (wrap (promise.await recur (f zero)))))) + output))) + +(structure: #export _ (Functor Channel) + (def: (map f input) + (let [output (channel [])] + (exec (io.run (listen (|>> f (publish output)) + input)) + output)))) + +(structure: #export _ (Apply Channel) + (def: functor Functor<Channel>) + + (def: (apply ff fa) + (let [output (channel [])] + (exec (io.run (listen (function (_ f) + (listen (|>> f (publish output)) + fa)) + ff)) + output)))) + +(structure: #export _ (Monad Channel) + (def: functor Functor<Channel>) + + (def: (wrap a) + (let [output (channel [])] + (exec (io.run (publish output a)) + output))) + + (def: (join mma) + (let [output (channel [])] + (exec (io.run (listen (listen (publish output)) + mma)) + output)))) diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux new file mode 100644 index 000000000..a67734747 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/process.lux @@ -0,0 +1,110 @@ +(.module: + [lux #* + [control + ["." monad (#+ do)] + ["ex" exception (#+ exception:)]] + [data + [collection + ["." list]]] + [platform + [compiler + ["." host]]] + ["." io (#+ IO io)] + [host (#+ import: object)]] + [// + ["." atom (#+ Atom)]]) + +(`` (for {(~~ (static host.jvm)) + (as-is (import: java/lang/Object) + + (import: java/lang/Runtime + (#static getRuntime [] Runtime) + (availableProcessors [] int)) + + (import: java/lang/Runnable) + + (import: java/util/concurrent/TimeUnit + (#enum MILLISECONDS)) + + (import: java/util/concurrent/Executor + (execute [Runnable] #io void)) + + (import: (java/util/concurrent/ScheduledFuture a)) + + (import: java/util/concurrent/ScheduledThreadPoolExecutor + (new [int]) + (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))} + + ## Default + (type: Process + {#creation Nat + #delay Nat + #action (IO Any)}) + )) + +(def: #export parallelism + Nat + (`` (for {(~~ (static host.jvm)) + (|> (Runtime::getRuntime) + (Runtime::availableProcessors) + .nat)} + + ## Default + 1))) + +(def: runner + (`` (for {(~~ (static host.jvm)) + (ScheduledThreadPoolExecutor::new (.int ..parallelism))} + + ## Default + (: (Atom (List Process)) + (atom.atom (list)))))) + +(def: #export (schedule milli-seconds action) + (-> Nat (IO Any) (IO Any)) + (`` (for {(~~ (static host.jvm)) + (let [runnable (object [] [Runnable] + [] + (Runnable [] (run) void + (io.run action)))] + (case milli-seconds + 0 (Executor::execute runnable runner) + _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS + runner)))} + + ## Default + (atom.update (|>> (#.Cons {#creation ("lux io current-time") + #delay milli-seconds + #action action})) + runner)))) + +(`` (for {(~~ (static host.jvm)) + (as-is)} + + ## Default + (as-is (exception: #export (cannot-continue-running-processes) "") + + (def: #export run! + (IO Any) + (loop [_ []] + (do io.Monad<IO> + [processes (atom.read runner)] + (case processes + ## And... we're done! + #.Nil + (wrap []) + + _ + (do @ + [#let [now ("lux io current-time") + [ready pending] (list.partition (function (_ process) + (|> (get@ #creation process) + (n/+ (get@ #delay process)) + (n/<= now))) + processes)] + swapped? (atom.compare-and-swap! processes pending runner)] + (if swapped? + (monad.seq @ ready) + (error! (ex.construct cannot-continue-running-processes [])))) + )))) + ))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux new file mode 100644 index 000000000..1a471022f --- /dev/null +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -0,0 +1,174 @@ +(.module: + [lux (#- and or) + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + [data + ["." product]] + ["." function] + [type + abstract] + ["." io (#+ IO io)]] + [// + ["." process] + ["." atom (#+ Atom atom)]]) + +(abstract: #export (Promise a) + {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} + (Atom [(Maybe a) (List (-> a (IO Any)))]) + + (def: #export (promise ?value) + (All [a] (-> (Maybe a) (Promise a))) + (:abstraction (atom [?value (list)]))) + + (def: #export (poll (^:representation promise)) + {#.doc "Polls a promise's value."} + (All [a] (-> (Promise a) (Maybe a))) + (|> (atom.read promise) + io.run + product.left)) + + (def: #export (resolve value (^:representation promise)) + {#.doc "Sets an promise's value if it has not been done yet."} + (All [a] (-> a (Promise a) (IO Bit))) + (do io.Monad<IO> + [(^@ old [_value _observers]) (atom.read promise)] + (case _value + (#.Some _) + (wrap #0) + + #.None + (do @ + [#let [new [(#.Some value) #.None]] + succeeded? (atom.compare-and-swap old new promise)] + (if succeeded? + (do @ + [_ (monad.map @ (function (_ f) (f value)) + _observers)] + (wrap #1)) + (resolve value (:abstraction promise))))))) + + (def: #export (await f (^:representation promise)) + (All [a] (-> (-> a (IO Any)) (Promise a) Any)) + (let [(^@ old [_value _observers]) (io.run (atom.read promise))] + (case _value + (#.Some value) + (io.run (f value)) + + #.None + (let [new [_value (#.Cons f _observers)]] + (if (io.run (atom.compare-and-swap old new promise)) + [] + (await f (:abstraction promise))))))) + ) + +(def: #export (resolved? promise) + {#.doc "Checks whether a promise's value has already been resolved."} + (All [a] (-> (Promise a) Bit)) + (case (poll promise) + #.None + #0 + + (#.Some _) + #1)) + +(structure: #export _ (Functor Promise) + (def: (map f fa) + (let [fb (promise #.None)] + (exec (await (function (_ a) (resolve (f a) fb)) + fa) + fb)))) + +(structure: #export _ (Apply Promise) + (def: functor Functor<Promise>) + + (def: (apply ff fa) + (let [fb (promise #.None)] + (exec (await (function (_ f) + (io (await (function (_ a) (resolve (f a) fb)) + fa))) + ff) + fb)))) + +(structure: #export _ (Monad Promise) + (def: functor Functor<Promise>) + + (def: (wrap a) + (promise (#.Some a))) + + (def: (join mma) + (let [ma (promise #.None)] + (exec (await (function (_ ma') + (io (await (function (_ a') (resolve a' ma)) + ma'))) + mma) + ma)))) + +(def: #export (and left right) + {#.doc "Sequencing combinator."} + (All [a b] (-> (Promise a) (Promise b) (Promise [a b]))) + (do Monad<Promise> + [a left + b right] + (wrap [a b]))) + +(def: #export (or left right) + {#.doc "Heterogeneous alternative combinator."} + (All [a b] (-> (Promise a) (Promise b) (Promise (| a b)))) + (let [a|b (promise #.None)] + (with-expansions + [<sides> (do-template [<promise> <tag>] + [(await (function (_ value) (resolve (<tag> value) a|b)) + <promise>)] + + [left #.Left] + [right #.Right] + )] + (exec <sides> + a|b)))) + +(def: #export (either left right) + {#.doc "Homogeneous alternative combinator."} + (All [a] (-> (Promise a) (Promise a) (Promise a))) + (let [left||right (promise #.None)] + (`` (exec (~~ (do-template [<promise>] + [(await (function (_ value) (resolve value left||right)) + <promise>)] + + [left] + [right])) + left||right)))) + +(def: #export (schedule millis-delay computation) + {#.doc (doc "Runs an I/O computation on its own process (after a specified delay)." + "Returns a Promise that will eventually host its result.")} + (All [a] (-> Nat (IO a) (Promise a))) + (let [!out (promise #.None)] + (exec (|> (do io.Monad<IO> + [value computation] + (resolve value !out)) + (process.schedule millis-delay) + io.run) + !out))) + +(def: #export future + {#.doc (doc "Runs an I/O computation on its own process." + "Returns a Promise that will eventually host its result.")} + (All [a] (-> (IO a) (Promise a))) + (schedule 0)) + +(def: #export (delay time-millis value) + {#.doc "Delivers a value after a certain period has passed."} + (All [a] (-> Nat a (Promise a))) + (schedule time-millis (io value))) + +(def: #export (wait time-millis) + {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."} + (-> Nat (Promise Any)) + (delay time-millis [])) + +(def: #export (time-out time-millis promise) + {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."} + (All [a] (-> Nat (Promise a) (Promise (Maybe a)))) + (..or (wait time-millis) promise)) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux new file mode 100644 index 000000000..46762ecf3 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -0,0 +1,149 @@ +(.module: + [lux #* + [control [monad (#+ do)]] + ["." io (#+ IO)] + [type + abstract + ["." refinement]]] + [// + ["." atom (#+ Atom)] + ["." promise (#+ Promise)]]) + +(type: State + {#open-positions Nat + #waiting-list (List (Promise Any))}) + +(abstract: #export Semaphore + {#.doc "A tool for controlling access to resources by multiple concurrent processes."} + + (Atom State) + + (def: #export (semaphore init-open-positions) + (-> Nat Semaphore) + (:abstraction (atom.atom {#open-positions init-open-positions + #waiting-list (list)}))) + + (def: #export (wait semaphore) + (Ex [k] (-> Semaphore (Promise Any))) + (let [semaphore (:representation semaphore)] + (io.run + (loop [signal (: (Promise Any) + (promise.promise #.None))] + (do io.Monad<IO> + [state (atom.read semaphore) + #let [[ready? state'] (: [Bit State] + (case (get@ #open-positions state) + 0 [#0 (update@ #waiting-list (|>> (#.Cons signal)) + state)] + _ [#1 (update@ #open-positions dec + state)]))] + success? (atom.compare-and-swap state state' semaphore) + _ (if ready? + (promise.resolve [] signal) + (wrap #0))] + (if success? + (wrap signal) + (recur signal))))))) + + (def: #export (signal semaphore) + (Ex [k] (-> Semaphore (Promise Any))) + (let [semaphore (:representation semaphore)] + (promise.future + (loop [_ []] + (do io.Monad<IO> + [state (atom.read semaphore) + #let [[?signal state'] (: [(Maybe (Promise Any)) State] + (case (get@ #waiting-list state) + #.Nil + [#.None (update@ #open-positions inc state)] + + (#.Cons head tail) + [(#.Some head) (set@ #waiting-list tail state)]))] + success? (atom.compare-and-swap state state' semaphore)] + (if success? + (do @ + [_ (case ?signal + #.None + (wrap #1) + + (#.Some signal) + (promise.resolve [] signal))] + (wrap [])) + (recur []))))))) + ) + +(abstract: #export Mutex + {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} + + Semaphore + + (def: #export (mutex _) + (-> Any Mutex) + (:abstraction (semaphore 1))) + + (def: acquire + (-> Mutex (Promise Any)) + (|>> :representation wait)) + + (def: release + (-> Mutex (Promise Any)) + (|>> :representation signal)) + + (def: #export (synchronize mutex procedure) + (All [a] (-> Mutex (IO (Promise a)) (Promise a))) + (do promise.Monad<Promise> + [_ (acquire mutex) + output (io.run procedure) + _ (release mutex)] + (wrap output))) + ) + +(def: #export limit (refinement.refinement (n/> 0))) +(`` (type: #export Limit (~~ (refinement.type limit)))) + +(abstract: #export Barrier + {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} + + {#limit Limit + #count (Atom Nat) + #start-turnstile Semaphore + #end-turnstile Semaphore} + + (def: #export (barrier limit) + (-> Limit Barrier) + (:abstraction {#limit limit + #count (atom.atom 0) + #start-turnstile (semaphore 0) + #end-turnstile (semaphore 0)})) + + (def: (un-block times turnstile) + (-> Nat Semaphore (Promise Any)) + (loop [step 0] + (if (n/< times step) + (do promise.Monad<Promise> + [_ (signal turnstile)] + (recur (inc step))) + (:: promise.Monad<Promise> wrap [])))) + + (do-template [<phase> <update> <goal> <turnstile>] + [(def: (<phase> (^:representation barrier)) + (-> Barrier (Promise Any)) + (do promise.Monad<Promise> + [#let [limit (refinement.un-refine (get@ #limit barrier)) + goal <goal> + count (io.run (atom.update <update> (get@ #count barrier)))] + _ (if (n/= goal count) + (un-block limit (get@ <turnstile> barrier)) + (wrap []))] + (wait (get@ <turnstile> barrier))))] + + [start inc limit #start-turnstile] + [end dec 0 #end-turnstile] + ) + + (def: #export (block barrier) + (-> Barrier (Promise Any)) + (do promise.Monad<Promise> + [_ (start barrier)] + (end barrier))) + ) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux new file mode 100644 index 000000000..3203b2d52 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -0,0 +1,245 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + ["." io (#+ IO io)] + [data + ["." product] + ["." maybe] + [collection + ["." list ("list/." Functor<List> Fold<List>)]]] + [type + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise promise)] + ["." frp ("frp/." Functor<Channel>)]]) + +(type: #export (Observer a) + (-> a (IO Any))) + +(abstract: #export (Var a) + {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} + + (Atom [a (List (Observer a))]) + + (def: #export (var value) + {#.doc "Creates a new STM var, with a default value."} + (All [a] (-> a (Var a))) + (:abstraction (atom.atom [value (list)]))) + + (def: read!! + (All [a] (-> (Var a) a)) + (|>> :representation atom.read io.run product.left)) + + (def: #export (read! (^:representation var)) + {#.doc "Reads var immediately, without going through a transaction."} + (All [a] (-> (Var a) (IO a))) + (|> var + atom.read + (:: io.Functor<IO> map product.left))) + + (def: (write! new-value (^:representation var)) + (All [a] (-> a (Var a) (IO Any))) + (do io.Monad<IO> + [(^@ old [_value _observers]) (atom.read var) + succeeded? (atom.compare-and-swap old [new-value _observers] var)] + (if succeeded? + (do @ + [_ (monad.map @ (function (_ f) (f new-value)) _observers)] + (wrap [])) + (write! new-value (:abstraction var))))) + + ## TODO: Remove when possible + (def: (helper|follow var) + (All [a] (-> (Var a) (frp.Channel a))) + (frp.channel [])) + (def: #export (follow target) + {#.doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp.Channel a)))) + (do io.Monad<IO> + [#let [channel (helper|follow target) + target (:representation target)] + _ (atom.update (function (_ [value observers]) + [value (#.Cons (frp.publish channel) observers)]) + target)] + (wrap channel))) + ) + +(type: (Tx-Frame a) + {#var (Var a) + #original a + #current a}) + +(type: Tx + (List (Ex [a] (Tx-Frame a)))) + +(type: #export (STM a) + {#.doc "A computation which updates a transaction and produces a value."} + (-> Tx [Tx a])) + +(def: (find-var-value var tx) + (All [a] (-> (Var a) Tx (Maybe a))) + (|> tx + (list.find (function (_ [_var _original _current]) + (is? (:coerce (Var Any) var) + (:coerce (Var Any) _var)))) + (:: maybe.Monad<Maybe> map (function (_ [_var _original _current]) + _current)) + (:assume) + )) + +(def: #export (read var) + (All [a] (-> (Var a) (STM a))) + (function (_ tx) + (case (find-var-value var tx) + (#.Some value) + [tx value] + + #.None + (let [value (read!! var)] + [(#.Cons [var value value] tx) + value])))) + +(def: (update-tx-value var value tx) + (All [a] (-> (Var a) a Tx Tx)) + (case tx + #.Nil + #.Nil + + (#.Cons [_var _original _current] tx') + (if (is? (:coerce (Var Any) var) + (:coerce (Var Any) _var)) + (#.Cons {#var (:coerce (Var Any) _var) + #original (:coerce Any _original) + #current (:coerce Any value)} + tx') + (#.Cons {#var _var + #original _original + #current _current} + (update-tx-value var value tx'))) + )) + +(def: #export (write value var) + {#.doc "Writes value to var."} + (All [a] (-> a (Var a) (STM Any))) + (function (_ tx) + (case (find-var-value var tx) + (#.Some _) + [(update-tx-value var value tx) + []] + + #.None + [(#.Cons [var (read!! var) value] tx) + []]))) + +(structure: #export _ (Functor STM) + (def: (map f fa) + (function (_ tx) + (let [[tx' a] (fa tx)] + [tx' (f a)])))) + +(structure: #export _ (Apply STM) + (def: functor Functor<STM>) + + (def: (apply ff fa) + (function (_ tx) + (let [[tx' f] (ff tx) + [tx'' a] (fa tx')] + [tx'' (f a)])))) + +(structure: #export _ (Monad STM) + (def: functor Functor<STM>) + + (def: (wrap a) + (function (_ tx) [tx a])) + + (def: (join mma) + (function (_ tx) + (let [[tx' ma] (mma tx)] + (ma tx'))))) + +(def: #export (update f var) + {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} + (All [a] (-> (-> a a) (Var a) (STM [a a]))) + (do Monad<STM> + [a (read var) + #let [a' (f a)] + _ (write a' var)] + (wrap [a a']))) + +(def: (can-commit? tx) + (-> Tx Bit) + (list.every? (function (_ [_var _original _current]) + (is? _original (read!! _var))) + tx)) + +(def: (commit-var! [_var _original _current]) + (-> (Ex [a] (Tx-Frame a)) Any) + (if (is? _original _current) + [] + (io.run (write! _current _var)))) + +(def: fresh-tx Tx (list)) + +(type: Commit (Ex [a] [(STM a) (Promise a)])) + +(def: pending-commits + (Atom (Rec Commits (Promise [Commit Commits]))) + (atom (promise #.None))) + +(def: commit-processor-flag + (Atom Bit) + (atom #0)) + +(def: (issue-commit commit) + (-> Commit (IO Any)) + (let [entry [commit (promise #.None)]] + (loop [|commits| (io.run (atom.read pending-commits))] + (case (promise.poll |commits|) + #.None + (do io.Monad<IO> + [resolved? (promise.resolve entry |commits|)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|))) + + (#.Some [head tail]) + (recur tail))))) + +(def: (process-commit [stm-proc output]) + (-> [(STM Any) (Promise Any)] Any) + (let [[finished-tx value] (stm-proc fresh-tx)] + (io.run (if (can-commit? finished-tx) + (exec (list/map commit-var! finished-tx) + (promise.resolve value output)) + (issue-commit [stm-proc output]))))) + +(def: init-processor! + (IO Any) + (do io.Monad<IO> + [flag (atom.read commit-processor-flag)] + (if flag + (wrap []) + (do @ + [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] + (if was-first? + (exec (|> (io.run (atom.read pending-commits)) + (promise.await (function (recur [head tail]) + (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head)) + (promise.await recur tail)))))) + (wrap [])) + (wrap []))) + ))) + +(def: #export (commit stm-proc) + {#.doc (doc "Commits a transaction and returns its result (asynchronously)." + "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." + "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} + (All [a] (-> (STM a) (Promise a))) + (let [output (promise #.None)] + (exec (io.run init-processor!) + (io.run (issue-commit [stm-proc output])) + output))) diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux new file mode 100644 index 000000000..c03ab7647 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/task.lux @@ -0,0 +1,82 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + [monad (#+ Monad do)] + ["ex" exception (#+ Exception)]] + [data + ["." error (#+ Error)]] + ["." macro + ["s" syntax (#+ syntax: Syntax)]]] + [// + ["." promise (#+ Promise)]]) + +(type: #export (Task a) + (Promise (Error a))) + +(def: #export (fail error) + (All [a] (-> Text (Task a))) + (:: promise.Monad<Promise> wrap (#error.Error error))) + +(def: #export (throw exception message) + (All [e a] (-> (Exception e) e (Task a))) + (:: promise.Monad<Promise> wrap + (ex.throw exception message))) + +(def: #export (return value) + (All [a] (-> a (Task a))) + (:: promise.Monad<Promise> wrap (#error.Success value))) + +(def: #export (try computation) + (All [a] (-> (Task a) (Task (Error a)))) + (:: promise.Functor<Promise> map (|>> #error.Success) computation)) + +(structure: #export _ (Functor Task) + (def: (map f fa) + (:: promise.Functor<Promise> map + (function (_ fa') + (case fa' + (#error.Error error) + (#error.Error error) + + (#error.Success a) + (#error.Success (f a)))) + fa))) + +(structure: #export _ (Apply Task) + (def: functor Functor<Task>) + + (def: (apply ff fa) + (do promise.Monad<Promise> + [ff' ff + fa' fa] + (wrap (do error.Monad<Error> + [f ff' + a fa'] + (wrap (f a))))))) + +(structure: #export _ (Monad Task) + (def: functor Functor<Task>) + + (def: wrap return) + + (def: (join mma) + (do promise.Monad<Promise> + [mma' mma] + (case mma' + (#error.Error error) + (wrap (#error.Error error)) + + (#error.Success ma) + ma)))) + +(syntax: #export (task {type s.any}) + {#.doc (doc "Makes an uninitialized Task (in this example, of Any)." + (task Any))} + (wrap (list (` (: (..Task (~ type)) + (promise.promise #.None)))))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Task a))) + (:: promise.Functor<Promise> map (|>> #error.Success) promise)) |