diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 389 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 102 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 295 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 199 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 173 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 273 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/thread.lux | 169 |
7 files changed, 0 insertions, 1600 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux deleted file mode 100644 index 9e17193b2..000000000 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ /dev/null @@ -1,389 +0,0 @@ -(.module: {#.doc "The actor model of concurrency."} - [lux #* - [abstract - monad] - [control - [pipe (#+ case>)] - ["." function] - ["." try (#+ Try)] - ["." exception (#+ exception:)] - ["." io (#+ IO io)] - ["<>" parser - ["<.>" code (#+ Parser)]]] - [data - ["." product] - [text - ["%" format (#+ format)]] - [collection - ["." list ("#\." monoid monad fold)]]] - ["." macro (#+ with_gensyms) - ["." code] - [syntax (#+ syntax:) - ["|.|" input] - ["|.|" export] - ["|.|" annotations]]] - [math - [number - ["n" nat]]] - ["." meta (#+ monad) - ["." annotation]] - [type (#+ :share) - ["." abstract (#+ abstract: :representation :abstraction)]]] - [// - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver) ("#\." monad)] - ["." frp (#+ Channel)]]) - -(exception: #export poisoned) -(exception: #export dead) - -(with_expansions - [<Mail> (as_is (-> s (Actor s) (Promise (Try s)))) - <Obituary> (as_is [Text s (List <Mail>)]) - <Mailbox> (as_is (Rec Mailbox - [(Promise [<Mail> Mailbox]) - (Resolver [<Mail> Mailbox])]))] - - (def: (pending [read write]) - (All [a] - (-> (Rec Mailbox - [(Promise [a Mailbox]) - (Resolver [a Mailbox])]) - (IO (List a)))) - (do {! io.monad} - [current (promise.poll read)] - (case current - (#.Some [head tail]) - (\ ! map (|>> (#.Cons head)) - (pending tail)) - - #.None - (wrap #.Nil)))) - - (abstract: #export (Actor s) - {#obituary [(Promise <Obituary>) - (Resolver <Obituary>)] - #mailbox (Atom <Mailbox>)} - - (type: #export (Mail s) - <Mail>) - - (type: #export (Obituary s) - <Obituary>) - - (type: #export (Behavior o s) - {#.doc "An actor's behavior when mail is received and when a fatal error occurs."} - {#on_init (-> o s) - #on_mail (-> (Mail s) s (Actor s) (Promise (Try s)))}) - - (def: #export (spawn! behavior init) - {#.doc "Given a behavior and initial state, spawns an actor and returns it."} - (All [o s] (-> (Behavior o s) o (IO (Actor s)))) - (io (let [[on_init on_mail] behavior - self (:share [o s] - (Behavior o s) - behavior - - (Actor s) - (:abstraction {#obituary (promise.promise []) - #mailbox (atom (promise.promise []))})) - process (loop [state (on_init init) - [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))] - (do {! promise.monad} - [[head tail] |mailbox| - ?state' (on_mail head state self)] - (case ?state' - (#try.Failure error) - (let [[_ resolve] (get@ #obituary (:representation self))] - (exec (io.run - (do io.monad - [pending (..pending tail)] - (resolve [error state (#.Cons head pending)]))) - (wrap []))) - - (#try.Success state') - (recur state' tail))))] - self))) - - (def: #export (alive? actor) - (All [s] (-> (Actor s) (IO Bit))) - (let [[obituary _] (get@ #obituary (:representation actor))] - (|> obituary - promise.poll - (\ io.functor map - (|>> (case> #.None - yes - - _ - no)))))) - - (def: #export (obituary actor) - (All [s] (-> (Actor s) (IO (Maybe (Obituary s))))) - (let [[obituary _] (get@ #obituary (:representation actor))] - (promise.poll obituary))) - - (def: #export await - (All [s] (-> (Actor s) (Promise (Obituary s)))) - (|>> :representation - (get@ #obituary) - product.left)) - - (def: #export (mail! mail actor) - {#.doc "Send mail to an actor.."} - (All [s] (-> (Mail s) (Actor s) (IO (Try Any)))) - (do {! io.monad} - [alive? (..alive? actor)] - (if alive? - (let [entry [mail (promise.promise [])]] - (do ! - [|mailbox|&resolve (atom.read (get@ #mailbox (:representation actor)))] - (loop [[|mailbox| resolve] |mailbox|&resolve] - (do ! - [|mailbox| (promise.poll |mailbox|)] - (case |mailbox| - #.None - (do ! - [resolved? (resolve entry)] - (if resolved? - (do ! - [_ (atom.write (product.right entry) (get@ #mailbox (:representation actor)))] - (wrap (exception.return []))) - (recur |mailbox|&resolve))) - - (#.Some [_ |mailbox|']) - (recur |mailbox|')))))) - (wrap (exception.throw ..dead []))))) - - (type: #export (Message s o) - (-> s (Actor s) (Promise (Try [s o])))) - - (def: (mail message) - (All [s o] (-> (Message s o) [(Promise (Try o)) (Mail s)])) - (let [[promise resolve] (:share [s o] - (Message s o) - message - - [(Promise (Try o)) - (Resolver (Try o))] - (promise.promise []))] - [promise - (function (_ state self) - (do {! promise.monad} - [outcome (message state self)] - (case outcome - (#try.Success [state' return]) - (exec (io.run (resolve (#try.Success return))) - (promise.resolved (#try.Success state'))) - - (#try.Failure error) - (exec (io.run (resolve (#try.Failure error))) - (promise.resolved (#try.Failure error))))))])) - - (def: #export (tell! message actor) - {#.doc "Communicate with an actor through message passing."} - (All [s o] (-> (Message s o) (Actor s) (Promise (Try o)))) - (let [[promise mail] (..mail message)] - (do promise.monad - [outcome (promise.future (..mail! mail actor))] - (case outcome - (#try.Success) - promise - - (#try.Failure error) - (wrap (#try.Failure error)))))) - ) - ) - -(def: (default_on_mail mail state self) - (All [s] (-> (Mail s) s (Actor s) (Promise (Try s)))) - (mail state self)) - -(def: #export default - (All [s] (Behavior s s)) - {#on_init function.identity - #on_mail ..default_on_mail}) - -(def: #export (poison! actor) - {#.doc (doc "Kills the actor by sending mail that will kill it upon processing," - "but allows the actor to handle previous mail.")} - (All [s] (-> (Actor s) (IO (Try Any)))) - (..mail! (function (_ state self) - (promise.resolved (exception.throw ..poisoned []))) - actor)) - -(def: actor_decl^ - (Parser [Text (List Text)]) - (<>.either (<code>.form (<>.and <code>.local_identifier (<>.some <code>.local_identifier))) - (<>.and <code>.local_identifier (\ <>.monad wrap (list))))) - -(type: On_MailC - [[Text Text Text] Code]) - -(type: BehaviorC - [(Maybe On_MailC) (List Code)]) - -(def: argument - (Parser Text) - <code>.local_identifier) - -(def: behavior^ - (Parser BehaviorC) - (let [on_mail_args ($_ <>.and ..argument ..argument ..argument)] - ($_ <>.and - (<>.maybe (<code>.form (<>.and (<code>.form (<>.after (<code>.this! (' on_mail)) on_mail_args)) - <code>.any))) - (<>.some <code>.any)))) - -(def: (on_mail g!_ ?on_mail) - (-> Code (Maybe On_MailC) Code) - (case ?on_mail - #.None - (` (~! ..default_on_mail)) - - (#.Some [[mailN stateN selfN] bodyC]) - (` (function ((~ g!_) - (~ (code.local_identifier mailN)) - (~ (code.local_identifier stateN)) - (~ (code.local_identifier selfN))) - (~ bodyC))))) - -(with_expansions [<examples> (as_is (actor: #export (Stack a) - (List a) - - ((on_mail mail state self) - (do (try.with promise.monad) - [#let [_ (log! "BEFORE")] - output (mail state self) - #let [_ (log! "AFTER")]] - (wrap output))) - - (message: #export (push {value a} state self (List a)) - (let [state' (#.Cons value state)] - (promise.resolved (#try.Success [state' state']))))) - - (actor: #export Counter - Nat - - (message: #export (count! {increment Nat} state self Any) - (let [state' (n.+ increment state)] - (promise.resolved (#try.Success [state' state'])))) - - (message: #export (read! state self Nat) - (promise.resolved (#try.Success [state state])))))] - (syntax: #export (actor: - {export |export|.parser} - {[name vars] actor_decl^} - {annotations (<>.default |annotations|.empty |annotations|.parser)} - state_type - {[?on_mail messages] behavior^}) - {#.doc (doc "Defines an actor, with its behavior and internal state." - "Messages for the actor must be defined after the on_mail handler." - <examples>)} - (with_gensyms [g!_] - (do meta.monad - [g!type (macro.gensym (format name "_abstract_type")) - #let [g!actor (code.local_identifier name) - g!vars (list\map code.local_identifier vars)]] - (wrap (list (` ((~! abstract:) (~+ (|export|.format export)) ((~ g!type) (~+ g!vars)) - (~ state_type) - - (def: (~+ (|export|.format export)) (~ g!actor) - (All [(~+ g!vars)] - (..Behavior (~ state_type) ((~ g!type) (~+ g!vars)))) - {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type))) - #..on_mail (~ (..on_mail g!_ ?on_mail))}) - - (~+ messages)))))))) - - (syntax: #export (actor {[state_type init] (<code>.record (<>.and <code>.any <code>.any))} - {[?on_mail messages] behavior^}) - (with_gensyms [g!_] - (wrap (list (` (: ((~! io.IO) (..Actor (~ state_type))) - (..spawn! (: (..Behavior (~ state_type) (~ state_type)) - {#..on_init (|>>) - #..on_mail (~ (..on_mail g!_ ?on_mail))}) - (: (~ state_type) - (~ init))))))))) - - (type: Signature - {#vars (List Text) - #name Text - #inputs (List |input|.Input) - #state Text - #self Text - #output Code}) - - (def: signature^ - (Parser Signature) - (<code>.form ($_ <>.and - (<>.default (list) (<code>.tuple (<>.some <code>.local_identifier))) - <code>.local_identifier - (<>.some |input|.parser) - <code>.local_identifier - <code>.local_identifier - <code>.any))) - - (def: reference^ - (Parser [Name (List Text)]) - (<>.either (<code>.form (<>.and <code>.identifier (<>.some <code>.local_identifier))) - (<>.and <code>.identifier (\ <>.monad wrap (list))))) - - (syntax: #export (message: - {export |export|.parser} - {signature signature^} - {annotations (<>.default |annotations|.empty |annotations|.parser)} - body) - {#.doc (doc "A message can access the actor's state through the state parameter." - "A message can also access the actor itself through the self parameter." - "A message's output must be a promise containing a 2-tuple with the updated state and a return value." - "A message may succeed or fail (in case of failure, the actor dies)." - - <examples>)} - (with_gensyms [g!_ g!return] - (do meta.monad - [actor_scope abstract.current - #let [g!type (code.local_identifier (get@ #abstract.name actor_scope)) - g!message (code.local_identifier (get@ #name signature)) - g!actor_vars (get@ #abstract.type_vars actor_scope) - g!all_vars (|> signature (get@ #vars) (list\map code.local_identifier) (list\compose g!actor_vars)) - g!inputsC (|> signature (get@ #inputs) (list\map product.left)) - g!inputsT (|> signature (get@ #inputs) (list\map product.right)) - g!state (|> signature (get@ #state) code.local_identifier) - g!self (|> signature (get@ #self) code.local_identifier)]] - (wrap (list (` (def: (~+ (|export|.format export)) ((~ g!message) (~+ g!inputsC)) - (~ (|annotations|.format annotations)) - (All [(~+ g!all_vars)] - (-> (~+ g!inputsT) - (..Message (~ (get@ #abstract.abstraction actor_scope)) - (~ (get@ #output signature))))) - (function ((~ g!_) (~ g!state) (~ g!self)) - (let [(~ g!state) (:as (~ (get@ #abstract.representation actor_scope)) - (~ g!state))] - (|> (~ body) - (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope)) - (~ (get@ #output signature))]))) - (:as ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope)) - (~ (get@ #output signature))])))))))) - )))))) - -(type: #export Stop - (IO Any)) - -(def: continue! true) -(def: stop! false) - -(def: #export (observe action channel actor) - (All [e s] (-> (-> e Stop (Mail s)) (Channel e) (Actor s) (IO Any))) - (let [signal (: (Atom Bit) - (atom.atom ..continue!)) - stop (: Stop - (atom.write ..stop! signal))] - (frp.subscribe (function (_ event) - (do {! io.monad} - [continue? (atom.read signal)] - (if continue? - (do ! - [outcome (..mail! (action event stop) actor)] - (wrap (try.to_maybe outcome))) - (wrap #.None)))) - channel))) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux deleted file mode 100644 index e3b711785..000000000 --- a/stdlib/source/lux/control/concurrency/atom.lux +++ /dev/null @@ -1,102 +0,0 @@ -(.module: - [lux #* - ["." ffi] - ["@" target] - [abstract - [monad (#+ do)]] - [control - ["." function] - ["." io (#- run) ("#\." functor)]] - [data - ["." product] - [collection - ["." array]]] - [type - abstract]]) - -(with_expansions [<jvm> (as_is (ffi.import: (java/util/concurrent/atomic/AtomicReference a) - ["#::." - (new [a]) - (get [] a) - (compareAndSet [a a] boolean)]))] - (for {@.old <jvm> - @.jvm <jvm>} - (as_is))) - -(with_expansions [<new> (for {@.js "js array new" - @.python "python array new" - @.lua "lua array new" - @.ruby "ruby array new" - @.php "php array new" - @.scheme "scheme array new"} - (as_is)) - <write> (for {@.js "js array write" - @.python "python array write" - @.lua "lua array write" - @.ruby "ruby array write" - @.php "php array write" - @.scheme "scheme array write"} - (as_is)) - - <read> (for {@.js "js array read" - @.python "python array read" - @.lua "lua array read" - @.ruby "ruby array read" - @.php "php array read" - @.scheme "scheme array read"} - (as_is))] - (abstract: #export (Atom a) - (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)] - (for {@.old <jvm> - @.jvm <jvm>} - (array.Array a))) - - {#.doc "Atomic references that are safe to mutate concurrently."} - - (def: #export (atom value) - (All [a] (-> a (Atom a))) - (:abstraction (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::new value)] - (for {@.old <jvm> - @.jvm <jvm>} - (<write> 0 value (<new> 1)))))) - - (def: #export (read atom) - (All [a] (-> (Atom a) (IO a))) - (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (:representation atom))] - (for {@.old <jvm> - @.jvm <jvm>} - (<read> 0 (:representation atom)))))) - - (def: #export (compare_and_swap current new atom) - {#.doc (doc "Only mutates an atom if you can present its current value." - "That guarantees that atom was not updated since you last read from it.")} - (All [a] (-> a a (Atom a) (IO Bit))) - (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))] - (for {@.old <jvm> - @.jvm <jvm>} - (let [old (<read> 0 (:representation atom))] - (if (is? old current) - (exec (<write> 0 new (:representation atom)) - true) - false)))))) - )) - -(def: #export (update f atom) - {#.doc (doc "Updates an atom by applying a function to its current value." - "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds." - "The retries will be done with the new values of the atom, as they show up.")} - (All [a] (-> (-> a a) (Atom a) (IO [a a]))) - (loop [_ []] - (do io.monad - [old (read atom) - #let [new (f old)] - swapped? (..compare_and_swap old new atom)] - (if swapped? - (wrap [old new]) - (recur []))))) - -(def: #export (write value atom) - (All [a] (-> a (Atom a) (IO a))) - (|> atom - (..update (function.constant value)) - (io\map product.left))) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux deleted file mode 100644 index 452c153f1..000000000 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ /dev/null @@ -1,295 +0,0 @@ -(.module: - [lux #* - [abstract - [predicate (#+ Predicate)] - [equivalence (#+ Equivalence)] - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ Monad do)]] - [control - ["." try (#+ Try)] - ["." exception (#+ exception:)] - ["." io (#+ IO io)]] - [data - ["." maybe ("#\." functor)]] - [type (#+ :share) - abstract]] - [// - ["." atom (#+ Atom)] - ["." promise (#+ Promise) ("#\." functor)]]) - -(type: #export (Channel a) - {#.doc "An asynchronous channel to distribute values."} - (Promise (Maybe [a (Channel a)]))) - -(exception: #export channel_is_already_closed) - -(interface: #export (Sink a) - (: (IO (Try Any)) - close) - (: (-> a (IO (Try Any))) - feed)) - -(def: (sink resolve) - (All [a] - (-> (promise.Resolver (Maybe [a (Channel a)])) - (Sink a))) - (let [sink (atom.atom resolve)] - (implementation - (def: close - (loop [_ []] - (do {! io.monad} - [current (atom.read sink) - stopped? (current #.None)] - (if stopped? - ## I closed the sink. - (wrap (exception.return [])) - ## Someone else interacted with the sink. - (do ! - [latter (atom.read sink)] - (if (is? current latter) - ## Someone else closed the sink. - (wrap (exception.throw ..channel_is_already_closed [])) - ## Someone else fed the sink while I was closing it. - (recur []))))))) - - (def: (feed value) - (loop [_ []] - (do {! io.monad} - [current (atom.read sink) - #let [[next resolve_next] (:share [a] - (promise.Resolver (Maybe [a (Channel a)])) - current - - [(Promise (Maybe [a (Channel a)])) - (promise.Resolver (Maybe [a (Channel a)]))] - (promise.promise []))] - fed? (current (#.Some [value next]))] - (if fed? - ## I fed the sink. - (do ! - [_ (atom.compare_and_swap current resolve_next sink)] - (wrap (exception.return []))) - ## Someone else interacted with the sink. - (do ! - [latter (atom.read sink)] - (if (is? current latter) - ## Someone else closed the sink while I was feeding it. - (wrap (exception.throw ..channel_is_already_closed [])) - ## Someone else fed the sink. - (recur [])))))))))) - -(def: #export (channel _) - (All [a] (-> Any [(Channel a) (Sink a)])) - (let [[promise resolve] (promise.promise [])] - [promise (..sink resolve)])) - -(implementation: #export functor - (Functor Channel) - - (def: (map f) - (promise\map - (maybe\map - (function (_ [head tail]) - [(f head) (map f tail)]))))) - -(implementation: #export apply - (Apply Channel) - - (def: &functor ..functor) - - (def: (apply ff fa) - (do promise.monad - [cons_f ff - cons_a fa] - (case [cons_f cons_a] - [(#.Some [head_f tail_f]) (#.Some [head_a tail_a])] - (wrap (#.Some [(head_f head_a) (apply tail_f tail_a)])) - - _ - (wrap #.None))))) - -(def: empty - Channel - (promise.resolved #.None)) - -(implementation: #export monad - (Monad Channel) - - (def: &functor ..functor) - - (def: (wrap a) - (promise.resolved (#.Some [a ..empty]))) - - (def: (join mma) - (let [[output sink] (channel [])] - (exec (: (Promise Any) - (loop [mma mma] - (do {! promise.monad} - [?mma mma] - (case ?mma - (#.Some [ma mma']) - (do ! - [_ (loop [ma ma] - (do ! - [?ma ma] - (case ?ma - (#.Some [a ma']) - (exec (io.run (\ sink feed a)) - (recur ma')) - - #.None - (wrap []))))] - (recur mma')) - - #.None - (wrap (: Any (io.run (\ sink close)))))))) - output)))) - -(type: #export (Subscriber a) - (-> a (IO (Maybe Any)))) - -(def: #export (subscribe subscriber channel) - (All [a] (-> (Subscriber a) (Channel a) (IO Any))) - (io (exec (: (Promise Any) - (loop [channel channel] - (do promise.monad - [cons channel] - (case cons - (#.Some [head tail]) - (case (io.run (subscriber head)) - (#.Some _) - (recur tail) - - #.None - (wrap [])) - - #.None - (wrap []))))) - []))) - -(def: #export (filter pass? channel) - (All [a] (-> (Predicate a) (Channel a) (Channel a))) - (do promise.monad - [cons channel] - (case cons - (#.Some [head tail]) - (let [tail' (filter pass? tail)] - (if (pass? head) - (wrap (#.Some [head tail'])) - tail')) - - #.None - (wrap #.None)))) - -(def: #export (from_promise promise) - (All [a] (-> (Promise a) (Channel a))) - (promise\map (function (_ value) - (#.Some [value ..empty])) - promise)) - -(def: #export (fold f init channel) - {#.doc "Asynchronous fold over channels."} - (All [a b] - (-> (-> b a (Promise a)) a (Channel b) - (Promise a))) - (do {! promise.monad} - [cons channel] - (case cons - #.None - (wrap init) - - (#.Some [head tail]) - (do ! - [init' (f head init)] - (fold f init' tail))))) - -(def: #export (folds f init channel) - {#.doc "A channel of folds."} - (All [a b] - (-> (-> b a (Promise a)) a (Channel b) - (Channel a))) - (do {! promise.monad} - [cons channel] - (case cons - #.None - (wrap (#.Some [init (wrap #.None)])) - - (#.Some [head tail]) - (do ! - [init' (f head init)] - (wrap (#.Some [init (folds f init' tail)])))))) - -(def: #export (poll milli_seconds action) - (All [a] - (-> Nat (IO a) [(Channel a) (Sink a)])) - (let [[output sink] (channel [])] - (exec (io.run (loop [_ []] - (do io.monad - [value action - _ (\ sink feed value)] - (promise.await recur (promise.wait milli_seconds))))) - [output sink]))) - -(def: #export (periodic milli_seconds) - (-> Nat [(Channel Any) (Sink Any)]) - (..poll milli_seconds (io []))) - -(def: #export (iterate f init) - (All [s o] (-> (-> s (Promise (Maybe [s o]))) s (Channel o))) - (do promise.monad - [?next (f init)] - (case ?next - (#.Some [state output]) - (wrap (#.Some [output (iterate f state)])) - - #.None - (wrap #.None)))) - -(def: (distinct' equivalence previous channel) - (All [a] (-> (Equivalence a) a (Channel a) (Channel a))) - (do promise.monad - [cons channel] - (case cons - (#.Some [head tail]) - (if (\ equivalence = previous head) - (distinct' equivalence previous tail) - (wrap (#.Some [head (distinct' equivalence head tail)]))) - - #.None - (wrap #.None)))) - -(def: #export (distinct equivalence channel) - (All [a] (-> (Equivalence a) (Channel a) (Channel a))) - (do promise.monad - [cons channel] - (case cons - (#.Some [head tail]) - (wrap (#.Some [head (distinct' equivalence head tail)])) - - #.None - (wrap #.None)))) - -(def: #export (consume channel) - {#.doc "Reads the entirety of a channel's content and returns it as a list."} - (All [a] (-> (Channel a) (Promise (List a)))) - (do {! promise.monad} - [cons channel] - (case cons - (#.Some [head tail]) - (\ ! map (|>> (#.Cons head)) - (consume tail)) - - #.None - (wrap #.Nil)))) - -(def: #export (sequential milli_seconds values) - (All [a] (-> Nat (List a) (Channel a))) - (case values - #.Nil - ..empty - - (#.Cons head tail) - (promise.resolved (#.Some [head (do promise.monad - [_ (promise.wait milli_seconds)] - (sequential milli_seconds tail))])))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux deleted file mode 100644 index 8e0acf8b9..000000000 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ /dev/null @@ -1,199 +0,0 @@ -(.module: - [lux (#- and or) - [abstract - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ Monad do)]] - [control - [pipe (#+ case>)] - ["." function] - ["." io (#+ IO io)]] - [data - ["." product]] - [type (#+ :share) - abstract]] - [// - ["." thread] - ["." atom (#+ Atom atom)]]) - -(abstract: #export (Promise a) - (Atom [(Maybe a) (List (-> a (IO Any)))]) - - {#.doc "Represents values produced by asynchronous computations (unlike IO, which is synchronous)."} - - (type: #export (Resolver a) - (-> a (IO Bit))) - - (def: (resolver promise) - {#.doc "Sets an promise's value if it has not been done yet."} - (All [a] (-> (Promise a) (Resolver a))) - (function (resolve value) - (let [promise (:representation promise)] - (do {! io.monad} - [(^@ old [_value _observers]) (atom.read promise)] - (case _value - (#.Some _) - (wrap #0) - - #.None - (do ! - [#let [new [(#.Some value) #.None]] - succeeded? (atom.compare_and_swap old new promise)] - (if succeeded? - (do ! - [_ (monad.map ! (function (_ f) (f value)) - _observers)] - (wrap #1)) - (resolve value)))))))) - - (def: #export (resolved value) - (All [a] (-> a (Promise a))) - (:abstraction (atom [(#.Some value) (list)]))) - - (def: #export (promise _) - (All [a] (-> Any [(Promise a) (Resolver a)])) - (let [promise (:abstraction (atom [#.None (list)]))] - [promise (..resolver promise)])) - - (def: #export poll - {#.doc "Polls a promise's value."} - (All [a] (-> (Promise a) (IO (Maybe a)))) - (|>> :representation - atom.read - (\ io.functor map product.left))) - - (def: #export (await f promise) - (All [a] (-> (-> a (IO Any)) (Promise a) (IO Any))) - (do {! io.monad} - [#let [promise (:representation promise)] - (^@ old [_value _observers]) (atom.read promise)] - (case _value - (#.Some value) - (f value) - - #.None - (let [new [_value (#.Cons f _observers)]] - (do ! - [swapped? (atom.compare_and_swap old new promise)] - (if swapped? - (wrap []) - (await f (:abstraction promise)))))))) - ) - -(def: #export resolved? - {#.doc "Checks whether a promise's value has already been resolved."} - (All [a] (-> (Promise a) (IO Bit))) - (|>> ..poll - (\ io.functor map - (|>> (case> #.None - #0 - - (#.Some _) - #1))))) - -(implementation: #export functor - (Functor Promise) - - (def: (map f fa) - (let [[fb resolve] (..promise [])] - (exec (io.run (..await (|>> f resolve) fa)) - fb)))) - -(implementation: #export apply - (Apply Promise) - - (def: &functor ..functor) - - (def: (apply ff fa) - (let [[fb resolve] (..promise [])] - (exec (io.run (..await (function (_ f) - (..await (|>> f resolve) fa)) - ff)) - fb)))) - -(implementation: #export monad - (Monad Promise) - - (def: &functor ..functor) - - (def: wrap ..resolved) - - (def: (join mma) - (let [[ma resolve] (promise [])] - (exec (io.run (..await (..await resolve) mma)) - ma)))) - -(def: #export (and left right) - {#.doc "Sequencing combinator."} - (All [a b] (-> (Promise a) (Promise b) (Promise [a b]))) - (let [[read! write!] (:share [a b] - [(Promise a) (Promise b)] - [left right] - - [(Promise [a b]) - (Resolver [a b])] - (..promise [])) - _ (io.run (..await (function (_ left) - (..await (function (_ right) - (write! [left right])) - right)) - left))] - read!)) - -(def: #export (or left right) - {#.doc "Heterogeneous alternative combinator."} - (All [a b] (-> (Promise a) (Promise b) (Promise (| a b)))) - (let [[a|b resolve] (..promise [])] - (with_expansions - [<sides> (template [<promise> <tag>] - [(io.run (await (|>> <tag> resolve) <promise>))] - - [left #.Left] - [right #.Right] - )] - (exec <sides> - a|b)))) - -(def: #export (either left right) - {#.doc "Homogeneous alternative combinator."} - (All [a] (-> (Promise a) (Promise a) (Promise a))) - (let [[left||right resolve] (..promise [])] - (`` (exec (~~ (template [<promise>] - [(io.run (await resolve <promise>))] - - [left] - [right])) - left||right)))) - -(def: #export (schedule millis_delay computation) - {#.doc (doc "Runs an I/O computation on its own thread (after a specified delay)." - "Returns a Promise that will eventually host its result.")} - (All [a] (-> Nat (IO a) (Promise a))) - (let [[!out resolve] (..promise [])] - (exec (|> (do io.monad - [value computation] - (resolve value)) - (thread.schedule millis_delay) - io.run) - !out))) - -(def: #export future - {#.doc (doc "Runs an I/O computation on its own thread." - "Returns a Promise that will eventually host its result.")} - (All [a] (-> (IO a) (Promise a))) - (..schedule 0)) - -(def: #export (delay time_millis value) - {#.doc "Delivers a value after a certain period has passed."} - (All [a] (-> Nat a (Promise a))) - (..schedule time_millis (io value))) - -(def: #export (wait time_millis) - {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."} - (-> Nat (Promise Any)) - (..delay time_millis [])) - -(def: #export (time_out time_millis promise) - {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."} - (All [a] (-> Nat (Promise a) (Promise (Maybe a)))) - (..or (wait time_millis) promise)) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux deleted file mode 100644 index 0e8fa2b94..000000000 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ /dev/null @@ -1,173 +0,0 @@ -(.module: - [lux #* - [abstract - [monad (#+ do)]] - [control - [pipe (#+ if>)] - ["." io (#+ IO)] - ["." try (#+ Try)] - ["." exception (#+ exception:)]] - [data - [text - ["%" format (#+ format)]] - [collection - ["." queue (#+ Queue)]]] - [math - [number - ["n" nat] - ["i" int]]] - [type - abstract - ["." refinement]]] - [// - ["." atom (#+ Atom)] - ["." promise (#+ Promise Resolver)]]) - -(type: State - {#max_positions Nat - #open_positions Int - #waiting_list (Queue (Resolver Any))}) - -(abstract: #export Semaphore - (Atom State) - - {#.doc "A tool for controlling access to resources by multiple concurrent processes."} - - (def: most_positions_possible - (.nat (\ i.interval top))) - - (def: #export (semaphore initial_open_positions) - (-> Nat Semaphore) - (let [max_positions (n.min initial_open_positions - ..most_positions_possible)] - (:abstraction (atom.atom {#max_positions max_positions - #open_positions (.int max_positions) - #waiting_list queue.empty})))) - - (def: #export (wait semaphore) - (Ex [k] (-> Semaphore (Promise Any))) - (let [semaphore (:representation semaphore) - [signal sink] (: [(Promise Any) (Resolver Any)] - (promise.promise []))] - (exec (io.run - (with_expansions [<had_open_position?> (as_is (get@ #open_positions) (i.> -1))] - (do io.monad - [[_ state'] (atom.update (|>> (update@ #open_positions dec) - (if> [<had_open_position?>] - [] - [(update@ #waiting_list (queue.push sink))])) - semaphore)] - (with_expansions [<go_ahead> (sink []) - <get_in_line> (wrap false)] - (if (|> state' <had_open_position?>) - <go_ahead> - <get_in_line>))))) - signal))) - - (exception: #export (semaphore_is_maxed_out {max_positions Nat}) - (exception.report - ["Max Positions" (%.nat max_positions)])) - - (def: #export (signal semaphore) - (Ex [k] (-> Semaphore (Promise (Try Int)))) - (let [semaphore (:representation semaphore)] - (promise.future - (do {! io.monad} - [[pre post] (atom.update (function (_ state) - (if (i.= (.int (get@ #max_positions state)) - (get@ #open_positions state)) - state - (|> state - (update@ #open_positions inc) - (update@ #waiting_list queue.pop)))) - semaphore)] - (if (is? pre post) - (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions pre)])) - (do ! - [_ (case (queue.peek (get@ #waiting_list pre)) - #.None - (wrap true) - - (#.Some sink) - (sink []))] - (wrap (#try.Success (get@ #open_positions post))))))))) - ) - -(abstract: #export Mutex - Semaphore - - {#.doc "A mutual-exclusion lock that can only be acquired by one process at a time."} - - (def: #export (mutex _) - (-> Any Mutex) - (:abstraction (semaphore 1))) - - (def: acquire - (-> Mutex (Promise Any)) - (|>> :representation ..wait)) - - (def: release - (-> Mutex (Promise Any)) - (|>> :representation ..signal)) - - (def: #export (synchronize mutex procedure) - (All [a] (-> Mutex (IO (Promise a)) (Promise a))) - (do promise.monad - [_ (..acquire mutex) - output (io.run procedure) - _ (..release mutex)] - (wrap output))) - ) - -(def: #export limit - (refinement.refinement (n.> 0))) - -(type: #export Limit - (:~ (refinement.type limit))) - -(abstract: #export Barrier - {#limit Limit - #count (Atom Nat) - #start_turnstile Semaphore - #end_turnstile Semaphore} - - {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} - - (def: #export (barrier limit) - (-> Limit Barrier) - (:abstraction {#limit limit - #count (atom.atom 0) - #start_turnstile (..semaphore 0) - #end_turnstile (..semaphore 0)})) - - (def: (un_block times turnstile) - (-> Nat Semaphore (Promise Any)) - (loop [step 0] - (if (n.< times step) - (do promise.monad - [outcome (..signal turnstile)] - (recur (inc step))) - (\ promise.monad wrap [])))) - - (template [<phase> <update> <goal> <turnstile>] - [(def: (<phase> (^:representation barrier)) - (-> Barrier (Promise Any)) - (do promise.monad - [#let [limit (refinement.un_refine (get@ #limit barrier)) - goal <goal> - [_ count] (io.run (atom.update <update> (get@ #count barrier))) - reached? (n.= goal count)]] - (if reached? - (..un_block (dec limit) (get@ <turnstile> barrier)) - (..wait (get@ <turnstile> barrier)))))] - - [start inc limit #start_turnstile] - [end dec 0 #end_turnstile] - ) - - (def: #export (block barrier) - (-> Barrier (Promise Any)) - (do promise.monad - [_ (..start barrier)] - (..end barrier))) - ) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux deleted file mode 100644 index d375059a4..000000000 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ /dev/null @@ -1,273 +0,0 @@ -(.module: - [lux #* - [abstract - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ Monad do)]] - [control - ["." io (#+ IO io)] - ["." try]] - [data - ["." product] - ["." maybe] - [collection - ["." list]]] - [type - abstract]] - [// - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver)] - ["." frp (#+ Channel Sink)]]) - -(type: (Observer a) - (-> a (IO Any))) - -(abstract: #export (Var a) - (Atom [a (List (Sink a))]) - - {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - - (def: #export (var value) - {#.doc "Creates a new STM var, with a default value."} - (All [a] (-> a (Var a))) - (:abstraction (atom.atom [value (list)]))) - - (def: read! - (All [a] (-> (Var a) a)) - (|>> :representation atom.read io.run product.left)) - - (def: (un_follow sink var) - (All [a] (-> (Sink a) (Var a) (IO Any))) - (do io.monad - [_ (atom.update (function (_ [value observers]) - [value (list.filter (|>> (is? sink) not) observers)]) - (:representation var))] - (wrap []))) - - (def: (write! new_value var) - (All [a] (-> a (Var a) (IO Any))) - (do {! io.monad} - [#let [var' (:representation var)] - (^@ old [old_value observers]) (atom.read var') - succeeded? (atom.compare_and_swap old [new_value observers] var')] - (if succeeded? - (do ! - [_ (monad.map ! (function (_ sink) - (do ! - [result (\ sink feed new_value)] - (case result - (#try.Success _) - (wrap []) - - (#try.Failure _) - (un_follow sink var)))) - observers)] - (wrap [])) - (write! new_value var)))) - - (def: #export (follow target) - {#.doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO [(Channel a) (Sink a)]))) - (do io.monad - [#let [[channel sink] (frp.channel [])] - _ (atom.update (function (_ [value observers]) - [value (#.Cons sink observers)]) - (:representation target))] - (wrap [channel sink]))) - ) - -(type: (Tx_Frame a) - {#var (Var a) - #original a - #current a}) - -(type: Tx - (List (Ex [a] (Tx_Frame a)))) - -(type: #export (STM a) - {#.doc "A computation which updates a transaction and produces a value."} - (-> Tx [Tx a])) - -(def: (find_var_value var tx) - (All [a] (-> (Var a) Tx (Maybe a))) - (|> tx - (list.find (function (_ [_var _original _current]) - (is? (:as (Var Any) var) - (:as (Var Any) _var)))) - (\ maybe.monad map (function (_ [_var _original _current]) - _current)) - (:assume) - )) - -(def: #export (read var) - (All [a] (-> (Var a) (STM a))) - (function (_ tx) - (case (find_var_value var tx) - (#.Some value) - [tx value] - - #.None - (let [value (..read! var)] - [(#.Cons [var value value] tx) - value])))) - -(def: (update_tx_value var value tx) - (All [a] (-> (Var a) a Tx Tx)) - (case tx - #.Nil - #.Nil - - (#.Cons [_var _original _current] tx') - (if (is? (:as (Var Any) var) - (:as (Var Any) _var)) - (#.Cons {#var (:as (Var Any) _var) - #original (:as Any _original) - #current (:as Any value)} - tx') - (#.Cons {#var _var - #original _original - #current _current} - (update_tx_value var value tx'))))) - -(def: #export (write value var) - {#.doc "Writes value to var."} - (All [a] (-> a (Var a) (STM Any))) - (function (_ tx) - (case (find_var_value var tx) - (#.Some _) - [(update_tx_value var value tx) - []] - - #.None - [(#.Cons [var (..read! var) value] tx) - []]))) - -(implementation: #export functor - (Functor STM) - - (def: (map f fa) - (function (_ tx) - (let [[tx' a] (fa tx)] - [tx' (f a)])))) - -(implementation: #export apply - (Apply STM) - - (def: &functor ..functor) - - (def: (apply ff fa) - (function (_ tx) - (let [[tx' f] (ff tx) - [tx'' a] (fa tx')] - [tx'' (f a)])))) - -(implementation: #export monad - (Monad STM) - - (def: &functor ..functor) - - (def: (wrap a) - (function (_ tx) - [tx a])) - - (def: (join mma) - (function (_ tx) - (let [[tx' ma] (mma tx)] - (ma tx'))))) - -(def: #export (update f var) - {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} - (All [a] (-> (-> a a) (Var a) (STM [a a]))) - (do ..monad - [a (..read var) - #let [a' (f a)] - _ (..write a' var)] - (wrap [a a']))) - -(def: (can_commit? tx) - (-> Tx Bit) - (list.every? (function (_ [_var _original _current]) - (is? _original (..read! _var))) - tx)) - -(def: (commit_var! [_var _original _current]) - (-> (Ex [a] (Tx_Frame a)) (IO Any)) - (if (is? _original _current) - (io []) - (..write! _current _var))) - -(def: fresh_tx Tx (list)) - -(type: (Commit a) - [(STM a) - (Promise a) - (Resolver a)]) - -(def: pending_commits - (Atom (Rec Commits - [(Promise [(Ex [a] (Commit a)) Commits]) - (Resolver [(Ex [a] (Commit a)) Commits])])) - (atom (promise.promise []))) - -(def: commit_processor_flag - (Atom Bit) - (atom #0)) - -(def: (issue_commit commit) - (All [a] (-> (Commit a) (IO Any))) - (let [entry [commit (promise.promise [])]] - (do {! io.monad} - [|commits|&resolve (atom.read pending_commits)] - (loop [[|commits| resolve] |commits|&resolve] - (do ! - [|commits| (promise.poll |commits|)] - (case |commits| - #.None - (do io.monad - [resolved? (resolve entry)] - (if resolved? - (atom.write (product.right entry) pending_commits) - (recur |commits|&resolve))) - - (#.Some [head tail]) - (recur tail))))))) - -(def: (process_commit commit) - (All [a] (-> (Commit a) (IO Any))) - (let [[stm_proc output resolve] commit - [finished_tx value] (stm_proc fresh_tx)] - (if (can_commit? finished_tx) - (do {! io.monad} - [_ (monad.map ! commit_var! finished_tx)] - (resolve value)) - (issue_commit commit)))) - -(def: init_processor! - (IO Any) - (do {! io.monad} - [flag (atom.read commit_processor_flag)] - (if flag - (wrap []) - (do ! - [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)] - (if was_first? - (do ! - [[promise resolve] (atom.read pending_commits)] - (promise.await (function (recur [head [tail _resolve]]) - (do ! - [_ (process_commit head)] - (promise.await recur tail))) - promise)) - (wrap []))) - ))) - -(def: #export (commit stm_proc) - {#.doc (doc "Commits a transaction and returns its result (asynchronously)." - "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." - "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} - (All [a] (-> (STM a) (Promise a))) - (let [[output resolver] (promise.promise [])] - (exec (io.run (do io.monad - [_ init_processor!] - (issue_commit [stm_proc output resolver]))) - output))) diff --git a/stdlib/source/lux/control/concurrency/thread.lux b/stdlib/source/lux/control/concurrency/thread.lux deleted file mode 100644 index d6dc71c37..000000000 --- a/stdlib/source/lux/control/concurrency/thread.lux +++ /dev/null @@ -1,169 +0,0 @@ -(.module: - [lux #* - ["@" target] - ["." ffi] - [abstract - ["." monad (#+ do)]] - [control - ["." try] - ["." exception (#+ exception:)] - ["." io (#+ IO io)]] - [data - ["." text] - [collection - ["." list]]] - [math - [number - ["n" nat] - ["f" frac]]] - [time - ["." instant]]] - [// - ["." atom (#+ Atom)]]) - -(with_expansions [<jvm> (as_is (ffi.import: java/lang/Object) - - (ffi.import: java/lang/Runtime - ["#::." - (#static getRuntime [] java/lang/Runtime) - (availableProcessors [] int)]) - - (ffi.import: java/lang/Runnable) - - (ffi.import: java/util/concurrent/TimeUnit - ["#::." - (#enum MILLISECONDS)]) - - (ffi.import: java/util/concurrent/Executor - ["#::." - (execute [java/lang/Runnable] #io void)]) - - (ffi.import: (java/util/concurrent/ScheduledFuture a)) - - (ffi.import: java/util/concurrent/ScheduledThreadPoolExecutor - ["#::." - (new [int]) - (schedule [java/lang/Runnable long java/util/concurrent/TimeUnit] #io (java/util/concurrent/ScheduledFuture java/lang/Object))]))] - (for {@.old (as_is <jvm>) - @.jvm (as_is <jvm>) - - @.js - (as_is (ffi.import: (setTimeout [ffi.Function ffi.Number] #io Any))) - - @.python - (ffi.import: threading/Timer - ["#::." - (new [ffi.Float ffi.Function]) - (start [] #io #? Any)])} - - ## Default - (type: Thread - {#creation Nat - #delay Nat - #action (IO Any)}) - )) - -(def: #export parallelism - Nat - (with_expansions [<jvm> (|> (java/lang/Runtime::getRuntime) - (java/lang/Runtime::availableProcessors) - .nat)] - (for {@.old <jvm> - @.jvm <jvm>} - ## Default - 1))) - -(with_expansions [<jvm> (as_is (def: runner - java/util/concurrent/ScheduledThreadPoolExecutor - (java/util/concurrent/ScheduledThreadPoolExecutor::new (.int ..parallelism))))] - (for {@.old <jvm> - @.jvm <jvm> - @.js (as_is) - @.python (as_is)} - - ## Default - (def: runner - (Atom (List Thread)) - (atom.atom (list))))) - -(def: (execute! action) - (-> (IO Any) Any) - (case (try (io.run action)) - (#try.Failure error) - (exec - ("lux io log" ($_ "lux text concat" - "ERROR DURING THREAD EXECUTION:" text.new_line - error)) - []) - - (#try.Success _) - [])) - -(def: #export (schedule milli_seconds action) - (-> Nat (IO Any) (IO Any)) - (with_expansions [<jvm> (as_is (let [runnable (ffi.object [] [java/lang/Runnable] - [] - (java/lang/Runnable [] (run self) void - (..execute! action)))] - (case milli_seconds - 0 (java/util/concurrent/Executor::execute runnable runner) - _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS - runner))))] - (for {@.old <jvm> - @.jvm <jvm> - - @.js - (..setTimeout [(ffi.closure [] (..execute! action)) - (n.frac milli_seconds)]) - - @.python - (do io.monad - [_ (|> (ffi.lambda [] (..execute! action)) - [(|> milli_seconds n.frac (f./ +1,000.0))] - threading/Timer::new - (threading/Timer::start []))] - (wrap []))} - - ## Default - (do {! io.monad} - [now (\ ! map (|>> instant.to_millis .nat) instant.now) - _ (atom.update (|>> (#.Cons {#creation now - #delay milli_seconds - #action action})) - ..runner)] - (wrap []))))) - -(for {@.old (as_is) - @.jvm (as_is) - @.js (as_is) - @.python (as_is)} - - ## Default - (as_is (exception: #export cannot_continue_running_threads) - - (def: #export run! - (IO Any) - (loop [_ []] - (do {! io.monad} - [threads (atom.read ..runner)] - (case threads - ## And... we're done! - #.Nil - (wrap []) - - _ - (do ! - [now (\ ! map (|>> instant.to_millis .nat) instant.now) - #let [[ready pending] (list.partition (function (_ thread) - (|> (get@ #creation thread) - (n.+ (get@ #delay thread)) - (n.<= now))) - threads)] - swapped? (atom.compare_and_swap threads pending ..runner)] - (if swapped? - (do ! - [_ (monad.map ! (|>> (get@ #action) ..execute! io.io) ready)] - (recur [])) - (error! (exception.construct ..cannot_continue_running_threads [])))) - )))) - )) |