aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux377
-rw-r--r--stdlib/source/lux/control/concurrency/atom.lux59
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux132
-rw-r--r--stdlib/source/lux/control/concurrency/process.lux110
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux174
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux149
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux245
-rw-r--r--stdlib/source/lux/control/concurrency/task.lux82
8 files changed, 1328 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
new file mode 100644
index 000000000..0af0d09f9
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -0,0 +1,377 @@
+(.module: {#.doc "The actor model of concurrency."}
+ [lux #*
+ [control monad
+ ["p" parser]
+ ["ex" exception (#+ exception:)]]
+ ["." io (#- run) ("io/." Monad<IO>)]
+ [data
+ ["." product]
+ ["e" error]
+ [text
+ format]
+ [collection
+ ["." list ("list/." Monoid<List> Monad<List> Fold<List>)]]]
+ ["." macro (#+ with-gensyms Monad<Meta>)
+ ["." code]
+ ["s" syntax (#+ syntax: Syntax)]
+ [syntax
+ ["cs" common]
+ [common
+ ["csr" reader]
+ ["csw" writer]]]]
+ [type
+ abstract]]
+ [//
+ ["." atom (#+ Atom atom)]
+ ["." promise (#+ Promise promise) ("promise/." Monad<Promise>)]
+ ["." task (#+ Task)]])
+
+(exception: #export poisoned)
+
+(exception: #export (dead {actor-name Text}
+ {message-name Text})
+ (ex.report ["Actor" actor-name]
+ ["Message" message-name]))
+
+## [Types]
+(with-expansions
+ [<Message> (as-is (-> s (Actor s) (Task s)))
+ <Obituary> (as-is [Text s (List <Message>)])
+ <Mailbox> (as-is (Rec Mailbox (Promise [<Message> Mailbox])))]
+
+ (def: (obituary mailbox)
+ (All [a] (-> (Rec Mailbox (Promise [a Mailbox])) (List a)))
+ (case (promise.poll mailbox)
+ (#.Some [head tail])
+ (#.Cons head (obituary tail))
+
+ #.None
+ #.Nil))
+
+ (abstract: #export (Actor s)
+ {#.doc "An actor, defined as all the necessities it requires."}
+ {#mailbox (Atom <Mailbox>)
+ #obituary (Promise <Obituary>)}
+
+ ## TODO: Delete after new-luxc becomes the new standard compiler.
+ (def: (actor mailbox obituary)
+ (All [s] (-> (Atom <Mailbox>) (Promise <Obituary>) (Actor s)))
+ (:abstraction {#mailbox mailbox
+ #obituary obituary}))
+
+ (type: #export (Message s)
+ <Message>)
+
+ (type: #export (Obituary s)
+ <Obituary>)
+
+ (type: #export (Behavior s)
+ {#.doc "An actor's behavior when messages are received."}
+ {#handle (-> (Message s) s (Actor s) (Task s))
+ #end (-> Text s (Promise Any))})
+
+ (def: #export (spawn behavior init)
+ {#.doc "Given a behavior and initial state, spawns an actor and returns it."}
+ (All [s] (-> (Behavior s) s (IO (Actor s))))
+ (io (let [[handle end] behavior
+ self (actor (atom (promise #.None))
+ (promise #.None))
+ process (loop [state init
+ |mailbox| (io.run (atom.read (get@ #mailbox (:representation self))))]
+ (do promise.Monad<Promise>
+ [[head tail] |mailbox|
+ ?state' (handle head state self)]
+ (case ?state'
+ (#e.Error error)
+ (do @
+ [_ (end error state)]
+ (exec (io.run (promise.resolve [error state (#.Cons head (obituary tail))]
+ (get@ #obituary (:representation self))))
+ (wrap [])))
+
+ (#e.Success state')
+ (recur state' tail))))]
+ self)))
+
+ (def: #export (alive? actor)
+ (All [s] (-> (Actor s) Bit))
+ (case (promise.poll (get@ #obituary (:representation actor)))
+ #.None
+ #1
+
+ _
+ #0))
+
+ (def: #export (send message actor)
+ {#.doc "Communicate with an actor through message passing."}
+ (All [s] (-> (Message s) (Actor s) (IO Bit)))
+ (if (alive? actor)
+ (let [entry [message (promise #.None)]]
+ (do Monad<IO>
+ [|mailbox| (atom.read (get@ #mailbox (:representation actor)))]
+ (loop [|mailbox| |mailbox|]
+ (case (promise.poll |mailbox|)
+ #.None
+ (do @
+ [resolved? (promise.resolve entry |mailbox|)]
+ (if resolved?
+ (do @
+ [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))]
+ (wrap #1))
+ (recur |mailbox|)))
+
+ (#.Some [_ |mailbox|'])
+ (recur |mailbox|')))))
+ (io/wrap #0)))
+ ))
+
+## [Values]
+(def: (default-handle message state self)
+ (All [s] (-> (Message s) s (Actor s) (Task s)))
+ (message state self))
+
+(def: (default-end cause state)
+ (All [s] (-> Text s (Promise Any)))
+ (promise/wrap []))
+
+(def: #export default-behavior
+ (All [s] (Behavior s))
+ {#handle default-handle
+ #end default-end})
+
+(def: #export (poison actor)
+ {#.doc (doc "Kills the actor by sending a message that will kill it upon processing,"
+ "but allows the actor to handle previous messages.")}
+ (All [s] (-> (Actor s) (IO Bit)))
+ (send (function (_ state self)
+ (task.throw poisoned []))
+ actor))
+
+## [Syntax]
+(do-template [<with> <resolve> <tag> <desc>]
+ [(def: #export (<with> name)
+ (-> Name cs.Annotations cs.Annotations)
+ (|>> (#.Cons [(name-of <tag>)
+ (code.tag name)])))
+
+ (def: #export (<resolve> name)
+ (-> Name (Meta Name))
+ (do Monad<Meta>
+ [[_ annotations _] (macro.find-def name)]
+ (case (macro.get-tag-ann (name-of <tag>) annotations)
+ (#.Some actor-name)
+ (wrap actor-name)
+
+ _
+ (macro.fail (format "Definition is not " <desc> ".")))))]
+
+ [with-actor resolve-actor #..actor "an actor"]
+ [with-message resolve-message #..message "a message"]
+ )
+
+(def: actor-decl^
+ (Syntax [Text (List Text)])
+ (p.either (s.form (p.and s.local-identifier (p.some s.local-identifier)))
+ (p.and s.local-identifier (:: p.Monad<Parser> wrap (list)))))
+
+(do-template [<name> <desc>]
+ [(def: #export <name>
+ (-> Text Text)
+ (|>> (format <desc> "@")))]
+
+ [state-name "State"]
+ [behavior-name "Behavior"]
+ [new-name "new"]
+ )
+
+(type: HandleC
+ [[Text Text Text] Code])
+
+(type: StopC
+ [[Text Text] Code])
+
+(type: BehaviorC
+ [(Maybe HandleC) (Maybe StopC)])
+
+(def: behavior^
+ (s.Syntax BehaviorC)
+ (let [handle-args ($_ p.and s.local-identifier s.local-identifier s.local-identifier)
+ stop-args ($_ p.and s.local-identifier s.local-identifier)]
+ (p.and (p.maybe (s.form (p.and (s.form (p.after (s.this (' handle)) handle-args))
+ s.any)))
+ (p.maybe (s.form (p.and (s.form (p.after (s.this (' stop)) stop-args))
+ s.any))))))
+
+(syntax: #export (actor:
+ {export csr.export}
+ {[_name _vars] actor-decl^}
+ {annotations (p.default cs.empty-annotations csr.annotations)}
+ state-type
+ {[?handle ?stop] behavior^})
+ {#.doc (doc "Defines an actor, with its behavior and internal state."
+ (actor: #export Counter
+ Nat
+
+ ((stop cause state)
+ (:: promise.Monad<Promise> wrap
+ (log! (if (ex.match? ..poisoned cause)
+ (format "Counter was poisoned: " (%n state))
+ cause)))))
+
+ (actor: #export (Stack a)
+ (List a)
+
+ ((handle message state self)
+ (do task.Monad<Task>
+ [#let [_ (log! "BEFORE")]
+ output (message state self)
+ #let [_ (log! "AFTER")]]
+ (wrap output)))))}
+ (with-gensyms [g!_ g!init]
+ (do @
+ [module macro.current-module-name
+ #let [g!type (code.local-identifier (state-name _name))
+ g!behavior (code.local-identifier (behavior-name _name))
+ g!actor (code.local-identifier _name)
+ g!new (code.local-identifier (new-name _name))
+ g!vars (list/map code.local-identifier _vars)]]
+ (wrap (list (` (type: (~+ (csw.export export)) ((~ g!type) (~+ g!vars))
+ (~ state-type)))
+ (` (type: (~+ (csw.export export)) ((~ g!actor) (~+ g!vars))
+ (~ (|> annotations
+ (with-actor [module _name])
+ csw.annotations))
+ (..Actor ((~ g!type) (~+ g!vars)))))
+ (` (def: (~+ (csw.export export)) (~ g!behavior)
+ (All [(~+ g!vars)]
+ (..Behavior ((~ g!type) (~+ g!vars))))
+ {#..handle (~ (case ?handle
+ #.None
+ (` (~! ..default-handle))
+
+ (#.Some [[messageN stateN selfN] bodyC])
+ (` (function ((~ g!_)
+ (~ (code.local-identifier messageN))
+ (~ (code.local-identifier stateN))
+ (~ (code.local-identifier selfN)))
+ (do task.Monad<Task>
+ []
+ (~ bodyC))))))
+ #..end (~ (case ?stop
+ #.None
+ (` (~! ..default-end))
+
+ (#.Some [[causeN stateN] bodyC])
+ (` (function ((~ g!_)
+ (~ (code.local-identifier causeN))
+ (~ (code.local-identifier stateN)))
+ (do promise.Monad<Promise>
+ []
+ (~ bodyC))))))}))
+ (` (def: (~+ (csw.export export)) ((~ g!new) (~ g!init))
+ (All [(~+ g!vars)]
+ (-> ((~ g!type) (~+ g!vars)) (io.IO ((~ g!actor) (~+ g!vars)))))
+ (..spawn (~ g!behavior) (~ g!init))))))
+ )))
+
+(type: Signature
+ {#vars (List Text)
+ #name Text
+ #inputs (List cs.Typed-Input)
+ #state Text
+ #self Text
+ #output Code})
+
+(def: signature^
+ (s.Syntax Signature)
+ (s.form ($_ p.and
+ (p.default (list) (s.tuple (p.some s.local-identifier)))
+ s.local-identifier
+ (p.some csr.typed-input)
+ s.local-identifier
+ s.local-identifier
+ s.any)))
+
+(def: reference^
+ (s.Syntax [Name (List Text)])
+ (p.either (s.form (p.and s.identifier (p.some s.local-identifier)))
+ (p.and s.identifier (:: p.Monad<Parser> wrap (list)))))
+
+(syntax: #export (message:
+ {export csr.export}
+ {[actor-name actor-vars] reference^}
+ {signature signature^}
+ {annotations (p.default cs.empty-annotations csr.annotations)}
+ body)
+ {#.doc (doc "A message can access the actor's state through the state parameter."
+ "A message can also access the actor itself through the self parameter."
+ "A message's output must be a task containing a 2-tuple with the updated state and a return value."
+ "A message may succeed or fail (in case of failure, the actor dies)."
+
+ (message: #export Counter
+ (count! [increment Nat] state self Nat)
+ (let [state' (n/+ increment state)]
+ (task.return [state' state'])))
+
+ (message: #export (Stack a)
+ (push [value a] state self (List a))
+ (let [state' (#.Cons value state)]
+ (task.return [state' state']))))}
+ (with-gensyms [g!_ g!return g!error g!task g!sent?]
+ (do @
+ [current-module macro.current-module-name
+ actor-name (resolve-actor actor-name)
+ #let [message-name [current-module (get@ #name signature)]
+ g!type (code.identifier (product.both id state-name actor-name))
+ g!message (code.local-identifier (get@ #name signature))
+ g!actor-vars (list/map code.local-identifier actor-vars)
+ actorC (` ((~ (code.identifier actor-name)) (~+ g!actor-vars)))
+ g!all-vars (|> (get@ #vars signature) (list/map code.local-identifier) (list/compose g!actor-vars))
+ g!inputsC (|> (get@ #inputs signature) (list/map product.left))
+ g!inputsT (|> (get@ #inputs signature) (list/map product.right))
+ g!state (|> signature (get@ #state) code.local-identifier)
+ g!self (|> signature (get@ #self) code.local-identifier)
+ g!actor-refs (: (List Code)
+ (if (list.empty? actor-vars)
+ (list)
+ (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`))))))
+ ref-replacements (|> (if (list.empty? actor-vars)
+ (list)
+ (|> actor-vars list.size list.indices (list/map (|>> code.nat (~) ($) (`)))))
+ (: (List Code))
+ (list.zip2 g!all-vars)
+ (: (List [Code Code])))
+ g!outputT (list/fold (function (_ [g!var g!ref] outputT)
+ (code.replace g!var g!ref outputT))
+ (get@ #output signature)
+ ref-replacements)]]
+ (wrap (list (` (def: (~+ (csw.export export)) ((~ g!message) (~+ g!inputsC) (~ g!self))
+ (~ (|> annotations
+ (with-message actor-name)
+ csw.annotations))
+ (All [(~+ g!all-vars)] (-> (~+ g!inputsT) (~ actorC) (Task (~ (get@ #output signature)))))
+ (let [(~ g!task) (task.task (~ g!outputT))]
+ (io.run (do io.Monad<IO>
+ [(~ g!sent?) (..send (function ((~ g!_) (~ g!state) (~ g!self))
+ (do promise.Monad<Promise>
+ [(~ g!return) (: (Task [((~ g!type) (~+ g!actor-refs))
+ (~ g!outputT)])
+ (do task.Monad<Task>
+ []
+ (~ body)))]
+ (case (~ g!return)
+ (#.Right [(~ g!state) (~ g!return)])
+ (exec (io.run (promise.resolve (#.Right (~ g!return)) (~ g!task)))
+ (task.return (~ g!state)))
+
+ (#.Left (~ g!error))
+ (exec (io.run (promise.resolve (#.Left (~ g!error)) (~ g!task)))
+ (task.fail (~ g!error))))
+ ))
+ (~ g!self))]
+ (if (~ g!sent?)
+ ((~' wrap) (~ g!task))
+ ((~' wrap) (task.throw ..dead [(~ (code.text (%name actor-name)))
+ (~ (code.text (%name message-name)))]))))))))
+ ))
+ )))
diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux
new file mode 100644
index 000000000..b1692b6e3
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/atom.lux
@@ -0,0 +1,59 @@
+(.module:
+ [lux #*
+ [control
+ [monad (#+ do)]]
+ ["." function]
+ ["." io (#- run)]
+ [type
+ abstract]
+ [platform
+ [compiler
+ ["." host]]]
+ [host (#+ import:)]])
+
+(`` (for {(~~ (static host.jvm))
+ (import: (java/util/concurrent/atomic/AtomicReference a)
+ (new [a])
+ (get [] a)
+ (compareAndSet [a a] boolean))}))
+
+(`` (abstract: #export (Atom a)
+ {#.doc "Atomic references that are safe to mutate concurrently."}
+
+ (for {(~~ (static host.jvm))
+ (AtomicReference a)})
+
+ (def: #export (atom value)
+ (All [a] (-> a (Atom a)))
+ (:abstraction (for {(~~ (static host.jvm))
+ (AtomicReference::new value)})))
+
+ (def: #export (read atom)
+ (All [a] (-> (Atom a) (IO a)))
+ (io (for {(~~ (static host.jvm))
+ (AtomicReference::get (:representation atom))})))
+
+ (def: #export (compare-and-swap current new atom)
+ {#.doc (doc "Only mutates an atom if you can present it's current value."
+ "That guarantees that atom was not updated since you last read from it.")}
+ (All [a] (-> a a (Atom a) (IO Bit)))
+ (io (AtomicReference::compareAndSet current new (:representation atom))))
+ ))
+
+(def: #export (update f atom)
+ {#.doc (doc "Updates an atom by applying a function to its current value."
+ "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds."
+ "The retries will be done with the new values of the atom, as they show up.")}
+ (All [a] (-> (-> a a) (Atom a) (IO a)))
+ (loop [_ []]
+ (do io.Monad<IO>
+ [old (read atom)
+ #let [new (f old)]
+ swapped? (compare-and-swap old new atom)]
+ (if swapped?
+ (wrap new)
+ (recur [])))))
+
+(def: #export (write value atom)
+ (All [a] (-> a (Atom a) (IO Any)))
+ (update (function.constant value) atom))
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
new file mode 100644
index 000000000..8db54f28f
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -0,0 +1,132 @@
+(.module:
+ [lux #*
+ [control
+ [functor (#+ Functor)]
+ [apply (#+ Apply)]
+ ["." monad (#+ do Monad)]]
+ ["." io (#+ IO io)]
+ [data
+ [collection
+ [list ("list/." Monoid<List>)]]]
+ [type (#+ :share)
+ abstract]]
+ [//
+ ["." atom (#+ Atom atom)]
+ ["." promise (#+ Promise)]])
+
+(abstract: #export (Channel a)
+ {#.doc "An asynchronous channel to distribute values."}
+ (Atom (List (-> a (IO Any))))
+
+ (def: #export (channel _)
+ (All [a] (-> Any (Channel a)))
+ (:abstraction (atom (list))))
+
+ (def: #export (listen listener channel)
+ (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any)))
+ ## TODO: Simplify when possible.
+ (do io.Monad<IO>
+ [_ (atom.update (|>> (#.Cons listener))
+ (:representation channel))]
+ (wrap [])))
+
+ (def: #export (publish channel value)
+ {#.doc "Publish to a channel."}
+ (All [a] (-> (Channel a) a (IO Any)))
+ (do io.Monad<IO>
+ [listeners (atom.read (:representation channel))]
+ (monad.map @ (function (_ listener) (listener value)) listeners)))
+ )
+
+(def: #export (filter predicate input)
+ (All [a] (-> (-> a Bit) (Channel a) (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run (listen (function (_ value)
+ (if (predicate value)
+ (publish output value)
+ (io [])))
+ input))
+ output)))
+
+(def: #export (pipe output input)
+ {#.doc "Copy/pipe the contents of a channel on to another."}
+ (All [a] (-> (Channel a) (Channel a) (IO Any)))
+ (listen (publish output)
+ input))
+
+(def: #export (merge inputs)
+ {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."}
+ (All [a] (-> (List (Channel a)) (IO (Channel a))))
+ (let [output (channel [])]
+ (do io.Monad<IO>
+ [_ (monad.map @ (pipe output) inputs)]
+ (wrap output))))
+
+(def: #export (from-promise promise)
+ (All [a] (-> (Promise a) (Channel a)))
+ (let [output (channel [])]
+ (exec (promise.await (publish output) promise)
+ output)))
+
+(def: #export (poll time action)
+ (All [a] (-> Nat (IO a) (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [_ []]
+ (do io.Monad<IO>
+ [value action
+ _ (publish output value)]
+ (wrap (promise.await recur (promise.wait time))))))
+ output)))
+
+(def: #export (periodic time)
+ (-> Nat (Channel Any))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [_ []]
+ (do io.Monad<IO>
+ [_ (publish output [])]
+ (wrap (promise.await recur (promise.wait time))))))
+ output)))
+
+(def: #export (iterate f init)
+ (All [a] (-> (-> a (Promise a)) a (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [zero init]
+ (do io.Monad<IO>
+ [_ (publish output zero)]
+ (wrap (promise.await recur (f zero))))))
+ output)))
+
+(structure: #export _ (Functor Channel)
+ (def: (map f input)
+ (let [output (channel [])]
+ (exec (io.run (listen (|>> f (publish output))
+ input))
+ output))))
+
+(structure: #export _ (Apply Channel)
+ (def: functor Functor<Channel>)
+
+ (def: (apply ff fa)
+ (let [output (channel [])]
+ (exec (io.run (listen (function (_ f)
+ (listen (|>> f (publish output))
+ fa))
+ ff))
+ output))))
+
+(structure: #export _ (Monad Channel)
+ (def: functor Functor<Channel>)
+
+ (def: (wrap a)
+ (let [output (channel [])]
+ (exec (io.run (publish output a))
+ output)))
+
+ (def: (join mma)
+ (let [output (channel [])]
+ (exec (io.run (listen (listen (publish output))
+ mma))
+ output))))
diff --git a/stdlib/source/lux/control/concurrency/process.lux b/stdlib/source/lux/control/concurrency/process.lux
new file mode 100644
index 000000000..a67734747
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/process.lux
@@ -0,0 +1,110 @@
+(.module:
+ [lux #*
+ [control
+ ["." monad (#+ do)]
+ ["ex" exception (#+ exception:)]]
+ [data
+ [collection
+ ["." list]]]
+ [platform
+ [compiler
+ ["." host]]]
+ ["." io (#+ IO io)]
+ [host (#+ import: object)]]
+ [//
+ ["." atom (#+ Atom)]])
+
+(`` (for {(~~ (static host.jvm))
+ (as-is (import: java/lang/Object)
+
+ (import: java/lang/Runtime
+ (#static getRuntime [] Runtime)
+ (availableProcessors [] int))
+
+ (import: java/lang/Runnable)
+
+ (import: java/util/concurrent/TimeUnit
+ (#enum MILLISECONDS))
+
+ (import: java/util/concurrent/Executor
+ (execute [Runnable] #io void))
+
+ (import: (java/util/concurrent/ScheduledFuture a))
+
+ (import: java/util/concurrent/ScheduledThreadPoolExecutor
+ (new [int])
+ (schedule [Runnable long TimeUnit] #io (ScheduledFuture Object))))}
+
+ ## Default
+ (type: Process
+ {#creation Nat
+ #delay Nat
+ #action (IO Any)})
+ ))
+
+(def: #export parallelism
+ Nat
+ (`` (for {(~~ (static host.jvm))
+ (|> (Runtime::getRuntime)
+ (Runtime::availableProcessors)
+ .nat)}
+
+ ## Default
+ 1)))
+
+(def: runner
+ (`` (for {(~~ (static host.jvm))
+ (ScheduledThreadPoolExecutor::new (.int ..parallelism))}
+
+ ## Default
+ (: (Atom (List Process))
+ (atom.atom (list))))))
+
+(def: #export (schedule milli-seconds action)
+ (-> Nat (IO Any) (IO Any))
+ (`` (for {(~~ (static host.jvm))
+ (let [runnable (object [] [Runnable]
+ []
+ (Runnable [] (run) void
+ (io.run action)))]
+ (case milli-seconds
+ 0 (Executor::execute runnable runner)
+ _ (ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) TimeUnit::MILLISECONDS
+ runner)))}
+
+ ## Default
+ (atom.update (|>> (#.Cons {#creation ("lux io current-time")
+ #delay milli-seconds
+ #action action}))
+ runner))))
+
+(`` (for {(~~ (static host.jvm))
+ (as-is)}
+
+ ## Default
+ (as-is (exception: #export (cannot-continue-running-processes) "")
+
+ (def: #export run!
+ (IO Any)
+ (loop [_ []]
+ (do io.Monad<IO>
+ [processes (atom.read runner)]
+ (case processes
+ ## And... we're done!
+ #.Nil
+ (wrap [])
+
+ _
+ (do @
+ [#let [now ("lux io current-time")
+ [ready pending] (list.partition (function (_ process)
+ (|> (get@ #creation process)
+ (n/+ (get@ #delay process))
+ (n/<= now)))
+ processes)]
+ swapped? (atom.compare-and-swap! processes pending runner)]
+ (if swapped?
+ (monad.seq @ ready)
+ (error! (ex.construct cannot-continue-running-processes []))))
+ ))))
+ )))
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
new file mode 100644
index 000000000..1a471022f
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -0,0 +1,174 @@
+(.module:
+ [lux (#- and or)
+ [control
+ [functor (#+ Functor)]
+ [apply (#+ Apply)]
+ ["." monad (#+ do Monad)]]
+ [data
+ ["." product]]
+ ["." function]
+ [type
+ abstract]
+ ["." io (#+ IO io)]]
+ [//
+ ["." process]
+ ["." atom (#+ Atom atom)]])
+
+(abstract: #export (Promise a)
+ {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."}
+ (Atom [(Maybe a) (List (-> a (IO Any)))])
+
+ (def: #export (promise ?value)
+ (All [a] (-> (Maybe a) (Promise a)))
+ (:abstraction (atom [?value (list)])))
+
+ (def: #export (poll (^:representation promise))
+ {#.doc "Polls a promise's value."}
+ (All [a] (-> (Promise a) (Maybe a)))
+ (|> (atom.read promise)
+ io.run
+ product.left))
+
+ (def: #export (resolve value (^:representation promise))
+ {#.doc "Sets an promise's value if it has not been done yet."}
+ (All [a] (-> a (Promise a) (IO Bit)))
+ (do io.Monad<IO>
+ [(^@ old [_value _observers]) (atom.read promise)]
+ (case _value
+ (#.Some _)
+ (wrap #0)
+
+ #.None
+ (do @
+ [#let [new [(#.Some value) #.None]]
+ succeeded? (atom.compare-and-swap old new promise)]
+ (if succeeded?
+ (do @
+ [_ (monad.map @ (function (_ f) (f value))
+ _observers)]
+ (wrap #1))
+ (resolve value (:abstraction promise)))))))
+
+ (def: #export (await f (^:representation promise))
+ (All [a] (-> (-> a (IO Any)) (Promise a) Any))
+ (let [(^@ old [_value _observers]) (io.run (atom.read promise))]
+ (case _value
+ (#.Some value)
+ (io.run (f value))
+
+ #.None
+ (let [new [_value (#.Cons f _observers)]]
+ (if (io.run (atom.compare-and-swap old new promise))
+ []
+ (await f (:abstraction promise)))))))
+ )
+
+(def: #export (resolved? promise)
+ {#.doc "Checks whether a promise's value has already been resolved."}
+ (All [a] (-> (Promise a) Bit))
+ (case (poll promise)
+ #.None
+ #0
+
+ (#.Some _)
+ #1))
+
+(structure: #export _ (Functor Promise)
+ (def: (map f fa)
+ (let [fb (promise #.None)]
+ (exec (await (function (_ a) (resolve (f a) fb))
+ fa)
+ fb))))
+
+(structure: #export _ (Apply Promise)
+ (def: functor Functor<Promise>)
+
+ (def: (apply ff fa)
+ (let [fb (promise #.None)]
+ (exec (await (function (_ f)
+ (io (await (function (_ a) (resolve (f a) fb))
+ fa)))
+ ff)
+ fb))))
+
+(structure: #export _ (Monad Promise)
+ (def: functor Functor<Promise>)
+
+ (def: (wrap a)
+ (promise (#.Some a)))
+
+ (def: (join mma)
+ (let [ma (promise #.None)]
+ (exec (await (function (_ ma')
+ (io (await (function (_ a') (resolve a' ma))
+ ma')))
+ mma)
+ ma))))
+
+(def: #export (and left right)
+ {#.doc "Sequencing combinator."}
+ (All [a b] (-> (Promise a) (Promise b) (Promise [a b])))
+ (do Monad<Promise>
+ [a left
+ b right]
+ (wrap [a b])))
+
+(def: #export (or left right)
+ {#.doc "Heterogeneous alternative combinator."}
+ (All [a b] (-> (Promise a) (Promise b) (Promise (| a b))))
+ (let [a|b (promise #.None)]
+ (with-expansions
+ [<sides> (do-template [<promise> <tag>]
+ [(await (function (_ value) (resolve (<tag> value) a|b))
+ <promise>)]
+
+ [left #.Left]
+ [right #.Right]
+ )]
+ (exec <sides>
+ a|b))))
+
+(def: #export (either left right)
+ {#.doc "Homogeneous alternative combinator."}
+ (All [a] (-> (Promise a) (Promise a) (Promise a)))
+ (let [left||right (promise #.None)]
+ (`` (exec (~~ (do-template [<promise>]
+ [(await (function (_ value) (resolve value left||right))
+ <promise>)]
+
+ [left]
+ [right]))
+ left||right))))
+
+(def: #export (schedule millis-delay computation)
+ {#.doc (doc "Runs an I/O computation on its own process (after a specified delay)."
+ "Returns a Promise that will eventually host its result.")}
+ (All [a] (-> Nat (IO a) (Promise a)))
+ (let [!out (promise #.None)]
+ (exec (|> (do io.Monad<IO>
+ [value computation]
+ (resolve value !out))
+ (process.schedule millis-delay)
+ io.run)
+ !out)))
+
+(def: #export future
+ {#.doc (doc "Runs an I/O computation on its own process."
+ "Returns a Promise that will eventually host its result.")}
+ (All [a] (-> (IO a) (Promise a)))
+ (schedule 0))
+
+(def: #export (delay time-millis value)
+ {#.doc "Delivers a value after a certain period has passed."}
+ (All [a] (-> Nat a (Promise a)))
+ (schedule time-millis (io value)))
+
+(def: #export (wait time-millis)
+ {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."}
+ (-> Nat (Promise Any))
+ (delay time-millis []))
+
+(def: #export (time-out time-millis promise)
+ {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."}
+ (All [a] (-> Nat (Promise a) (Promise (Maybe a))))
+ (..or (wait time-millis) promise))
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
new file mode 100644
index 000000000..46762ecf3
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -0,0 +1,149 @@
+(.module:
+ [lux #*
+ [control [monad (#+ do)]]
+ ["." io (#+ IO)]
+ [type
+ abstract
+ ["." refinement]]]
+ [//
+ ["." atom (#+ Atom)]
+ ["." promise (#+ Promise)]])
+
+(type: State
+ {#open-positions Nat
+ #waiting-list (List (Promise Any))})
+
+(abstract: #export Semaphore
+ {#.doc "A tool for controlling access to resources by multiple concurrent processes."}
+
+ (Atom State)
+
+ (def: #export (semaphore init-open-positions)
+ (-> Nat Semaphore)
+ (:abstraction (atom.atom {#open-positions init-open-positions
+ #waiting-list (list)})))
+
+ (def: #export (wait semaphore)
+ (Ex [k] (-> Semaphore (Promise Any)))
+ (let [semaphore (:representation semaphore)]
+ (io.run
+ (loop [signal (: (Promise Any)
+ (promise.promise #.None))]
+ (do io.Monad<IO>
+ [state (atom.read semaphore)
+ #let [[ready? state'] (: [Bit State]
+ (case (get@ #open-positions state)
+ 0 [#0 (update@ #waiting-list (|>> (#.Cons signal))
+ state)]
+ _ [#1 (update@ #open-positions dec
+ state)]))]
+ success? (atom.compare-and-swap state state' semaphore)
+ _ (if ready?
+ (promise.resolve [] signal)
+ (wrap #0))]
+ (if success?
+ (wrap signal)
+ (recur signal)))))))
+
+ (def: #export (signal semaphore)
+ (Ex [k] (-> Semaphore (Promise Any)))
+ (let [semaphore (:representation semaphore)]
+ (promise.future
+ (loop [_ []]
+ (do io.Monad<IO>
+ [state (atom.read semaphore)
+ #let [[?signal state'] (: [(Maybe (Promise Any)) State]
+ (case (get@ #waiting-list state)
+ #.Nil
+ [#.None (update@ #open-positions inc state)]
+
+ (#.Cons head tail)
+ [(#.Some head) (set@ #waiting-list tail state)]))]
+ success? (atom.compare-and-swap state state' semaphore)]
+ (if success?
+ (do @
+ [_ (case ?signal
+ #.None
+ (wrap #1)
+
+ (#.Some signal)
+ (promise.resolve [] signal))]
+ (wrap []))
+ (recur [])))))))
+ )
+
+(abstract: #export Mutex
+ {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."}
+
+ Semaphore
+
+ (def: #export (mutex _)
+ (-> Any Mutex)
+ (:abstraction (semaphore 1)))
+
+ (def: acquire
+ (-> Mutex (Promise Any))
+ (|>> :representation wait))
+
+ (def: release
+ (-> Mutex (Promise Any))
+ (|>> :representation signal))
+
+ (def: #export (synchronize mutex procedure)
+ (All [a] (-> Mutex (IO (Promise a)) (Promise a)))
+ (do promise.Monad<Promise>
+ [_ (acquire mutex)
+ output (io.run procedure)
+ _ (release mutex)]
+ (wrap output)))
+ )
+
+(def: #export limit (refinement.refinement (n/> 0)))
+(`` (type: #export Limit (~~ (refinement.type limit))))
+
+(abstract: #export Barrier
+ {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
+
+ {#limit Limit
+ #count (Atom Nat)
+ #start-turnstile Semaphore
+ #end-turnstile Semaphore}
+
+ (def: #export (barrier limit)
+ (-> Limit Barrier)
+ (:abstraction {#limit limit
+ #count (atom.atom 0)
+ #start-turnstile (semaphore 0)
+ #end-turnstile (semaphore 0)}))
+
+ (def: (un-block times turnstile)
+ (-> Nat Semaphore (Promise Any))
+ (loop [step 0]
+ (if (n/< times step)
+ (do promise.Monad<Promise>
+ [_ (signal turnstile)]
+ (recur (inc step)))
+ (:: promise.Monad<Promise> wrap []))))
+
+ (do-template [<phase> <update> <goal> <turnstile>]
+ [(def: (<phase> (^:representation barrier))
+ (-> Barrier (Promise Any))
+ (do promise.Monad<Promise>
+ [#let [limit (refinement.un-refine (get@ #limit barrier))
+ goal <goal>
+ count (io.run (atom.update <update> (get@ #count barrier)))]
+ _ (if (n/= goal count)
+ (un-block limit (get@ <turnstile> barrier))
+ (wrap []))]
+ (wait (get@ <turnstile> barrier))))]
+
+ [start inc limit #start-turnstile]
+ [end dec 0 #end-turnstile]
+ )
+
+ (def: #export (block barrier)
+ (-> Barrier (Promise Any))
+ (do promise.Monad<Promise>
+ [_ (start barrier)]
+ (end barrier)))
+ )
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
new file mode 100644
index 000000000..3203b2d52
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -0,0 +1,245 @@
+(.module:
+ [lux #*
+ [control
+ [functor (#+ Functor)]
+ [apply (#+ Apply)]
+ ["." monad (#+ do Monad)]]
+ ["." io (#+ IO io)]
+ [data
+ ["." product]
+ ["." maybe]
+ [collection
+ ["." list ("list/." Functor<List> Fold<List>)]]]
+ [type
+ abstract]]
+ [//
+ ["." atom (#+ Atom atom)]
+ ["." promise (#+ Promise promise)]
+ ["." frp ("frp/." Functor<Channel>)]])
+
+(type: #export (Observer a)
+ (-> a (IO Any)))
+
+(abstract: #export (Var a)
+ {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
+
+ (Atom [a (List (Observer a))])
+
+ (def: #export (var value)
+ {#.doc "Creates a new STM var, with a default value."}
+ (All [a] (-> a (Var a)))
+ (:abstraction (atom.atom [value (list)])))
+
+ (def: read!!
+ (All [a] (-> (Var a) a))
+ (|>> :representation atom.read io.run product.left))
+
+ (def: #export (read! (^:representation var))
+ {#.doc "Reads var immediately, without going through a transaction."}
+ (All [a] (-> (Var a) (IO a)))
+ (|> var
+ atom.read
+ (:: io.Functor<IO> map product.left)))
+
+ (def: (write! new-value (^:representation var))
+ (All [a] (-> a (Var a) (IO Any)))
+ (do io.Monad<IO>
+ [(^@ old [_value _observers]) (atom.read var)
+ succeeded? (atom.compare-and-swap old [new-value _observers] var)]
+ (if succeeded?
+ (do @
+ [_ (monad.map @ (function (_ f) (f new-value)) _observers)]
+ (wrap []))
+ (write! new-value (:abstraction var)))))
+
+ ## TODO: Remove when possible
+ (def: (helper|follow var)
+ (All [a] (-> (Var a) (frp.Channel a)))
+ (frp.channel []))
+ (def: #export (follow target)
+ {#.doc "Creates a channel that will receive all changes to the value of the given var."}
+ (All [a] (-> (Var a) (IO (frp.Channel a))))
+ (do io.Monad<IO>
+ [#let [channel (helper|follow target)
+ target (:representation target)]
+ _ (atom.update (function (_ [value observers])
+ [value (#.Cons (frp.publish channel) observers)])
+ target)]
+ (wrap channel)))
+ )
+
+(type: (Tx-Frame a)
+ {#var (Var a)
+ #original a
+ #current a})
+
+(type: Tx
+ (List (Ex [a] (Tx-Frame a))))
+
+(type: #export (STM a)
+ {#.doc "A computation which updates a transaction and produces a value."}
+ (-> Tx [Tx a]))
+
+(def: (find-var-value var tx)
+ (All [a] (-> (Var a) Tx (Maybe a)))
+ (|> tx
+ (list.find (function (_ [_var _original _current])
+ (is? (:coerce (Var Any) var)
+ (:coerce (Var Any) _var))))
+ (:: maybe.Monad<Maybe> map (function (_ [_var _original _current])
+ _current))
+ (:assume)
+ ))
+
+(def: #export (read var)
+ (All [a] (-> (Var a) (STM a)))
+ (function (_ tx)
+ (case (find-var-value var tx)
+ (#.Some value)
+ [tx value]
+
+ #.None
+ (let [value (read!! var)]
+ [(#.Cons [var value value] tx)
+ value]))))
+
+(def: (update-tx-value var value tx)
+ (All [a] (-> (Var a) a Tx Tx))
+ (case tx
+ #.Nil
+ #.Nil
+
+ (#.Cons [_var _original _current] tx')
+ (if (is? (:coerce (Var Any) var)
+ (:coerce (Var Any) _var))
+ (#.Cons {#var (:coerce (Var Any) _var)
+ #original (:coerce Any _original)
+ #current (:coerce Any value)}
+ tx')
+ (#.Cons {#var _var
+ #original _original
+ #current _current}
+ (update-tx-value var value tx')))
+ ))
+
+(def: #export (write value var)
+ {#.doc "Writes value to var."}
+ (All [a] (-> a (Var a) (STM Any)))
+ (function (_ tx)
+ (case (find-var-value var tx)
+ (#.Some _)
+ [(update-tx-value var value tx)
+ []]
+
+ #.None
+ [(#.Cons [var (read!! var) value] tx)
+ []])))
+
+(structure: #export _ (Functor STM)
+ (def: (map f fa)
+ (function (_ tx)
+ (let [[tx' a] (fa tx)]
+ [tx' (f a)]))))
+
+(structure: #export _ (Apply STM)
+ (def: functor Functor<STM>)
+
+ (def: (apply ff fa)
+ (function (_ tx)
+ (let [[tx' f] (ff tx)
+ [tx'' a] (fa tx')]
+ [tx'' (f a)]))))
+
+(structure: #export _ (Monad STM)
+ (def: functor Functor<STM>)
+
+ (def: (wrap a)
+ (function (_ tx) [tx a]))
+
+ (def: (join mma)
+ (function (_ tx)
+ (let [[tx' ma] (mma tx)]
+ (ma tx')))))
+
+(def: #export (update f var)
+ {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
+ (All [a] (-> (-> a a) (Var a) (STM [a a])))
+ (do Monad<STM>
+ [a (read var)
+ #let [a' (f a)]
+ _ (write a' var)]
+ (wrap [a a'])))
+
+(def: (can-commit? tx)
+ (-> Tx Bit)
+ (list.every? (function (_ [_var _original _current])
+ (is? _original (read!! _var)))
+ tx))
+
+(def: (commit-var! [_var _original _current])
+ (-> (Ex [a] (Tx-Frame a)) Any)
+ (if (is? _original _current)
+ []
+ (io.run (write! _current _var))))
+
+(def: fresh-tx Tx (list))
+
+(type: Commit (Ex [a] [(STM a) (Promise a)]))
+
+(def: pending-commits
+ (Atom (Rec Commits (Promise [Commit Commits])))
+ (atom (promise #.None)))
+
+(def: commit-processor-flag
+ (Atom Bit)
+ (atom #0))
+
+(def: (issue-commit commit)
+ (-> Commit (IO Any))
+ (let [entry [commit (promise #.None)]]
+ (loop [|commits| (io.run (atom.read pending-commits))]
+ (case (promise.poll |commits|)
+ #.None
+ (do io.Monad<IO>
+ [resolved? (promise.resolve entry |commits|)]
+ (if resolved?
+ (atom.write (product.right entry) pending-commits)
+ (recur |commits|)))
+
+ (#.Some [head tail])
+ (recur tail)))))
+
+(def: (process-commit [stm-proc output])
+ (-> [(STM Any) (Promise Any)] Any)
+ (let [[finished-tx value] (stm-proc fresh-tx)]
+ (io.run (if (can-commit? finished-tx)
+ (exec (list/map commit-var! finished-tx)
+ (promise.resolve value output))
+ (issue-commit [stm-proc output])))))
+
+(def: init-processor!
+ (IO Any)
+ (do io.Monad<IO>
+ [flag (atom.read commit-processor-flag)]
+ (if flag
+ (wrap [])
+ (do @
+ [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
+ (if was-first?
+ (exec (|> (io.run (atom.read pending-commits))
+ (promise.await (function (recur [head tail])
+ (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head))
+ (promise.await recur tail))))))
+ (wrap []))
+ (wrap [])))
+ )))
+
+(def: #export (commit stm-proc)
+ {#.doc (doc "Commits a transaction and returns its result (asynchronously)."
+ "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
+ "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
+ (All [a] (-> (STM a) (Promise a)))
+ (let [output (promise #.None)]
+ (exec (io.run init-processor!)
+ (io.run (issue-commit [stm-proc output]))
+ output)))
diff --git a/stdlib/source/lux/control/concurrency/task.lux b/stdlib/source/lux/control/concurrency/task.lux
new file mode 100644
index 000000000..c03ab7647
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/task.lux
@@ -0,0 +1,82 @@
+(.module:
+ [lux #*
+ [control
+ [functor (#+ Functor)]
+ [apply (#+ Apply)]
+ [monad (#+ Monad do)]
+ ["ex" exception (#+ Exception)]]
+ [data
+ ["." error (#+ Error)]]
+ ["." macro
+ ["s" syntax (#+ syntax: Syntax)]]]
+ [//
+ ["." promise (#+ Promise)]])
+
+(type: #export (Task a)
+ (Promise (Error a)))
+
+(def: #export (fail error)
+ (All [a] (-> Text (Task a)))
+ (:: promise.Monad<Promise> wrap (#error.Error error)))
+
+(def: #export (throw exception message)
+ (All [e a] (-> (Exception e) e (Task a)))
+ (:: promise.Monad<Promise> wrap
+ (ex.throw exception message)))
+
+(def: #export (return value)
+ (All [a] (-> a (Task a)))
+ (:: promise.Monad<Promise> wrap (#error.Success value)))
+
+(def: #export (try computation)
+ (All [a] (-> (Task a) (Task (Error a))))
+ (:: promise.Functor<Promise> map (|>> #error.Success) computation))
+
+(structure: #export _ (Functor Task)
+ (def: (map f fa)
+ (:: promise.Functor<Promise> map
+ (function (_ fa')
+ (case fa'
+ (#error.Error error)
+ (#error.Error error)
+
+ (#error.Success a)
+ (#error.Success (f a))))
+ fa)))
+
+(structure: #export _ (Apply Task)
+ (def: functor Functor<Task>)
+
+ (def: (apply ff fa)
+ (do promise.Monad<Promise>
+ [ff' ff
+ fa' fa]
+ (wrap (do error.Monad<Error>
+ [f ff'
+ a fa']
+ (wrap (f a)))))))
+
+(structure: #export _ (Monad Task)
+ (def: functor Functor<Task>)
+
+ (def: wrap return)
+
+ (def: (join mma)
+ (do promise.Monad<Promise>
+ [mma' mma]
+ (case mma'
+ (#error.Error error)
+ (wrap (#error.Error error))
+
+ (#error.Success ma)
+ ma))))
+
+(syntax: #export (task {type s.any})
+ {#.doc (doc "Makes an uninitialized Task (in this example, of Any)."
+ (task Any))}
+ (wrap (list (` (: (..Task (~ type))
+ (promise.promise #.None))))))
+
+(def: #export (from-promise promise)
+ (All [a] (-> (Promise a) (Task a)))
+ (:: promise.Functor<Promise> map (|>> #error.Success) promise))