diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/actor.lux | 178 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/atom.lux | 14 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 36 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/promise.lux | 22 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/semaphore.lux | 76 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 78 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/thread.lux | 40 |
7 files changed, 222 insertions, 222 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux index 66ea24cd8..6355a43b7 100644 --- a/stdlib/source/lux/control/concurrency/actor.lux +++ b/stdlib/source/lux/control/concurrency/actor.lux @@ -25,7 +25,7 @@ ["csr" reader] ["csw" writer] ["|.|" export]]]] - ["." meta (#+ with-gensyms monad) + ["." meta (#+ with_gensyms monad) ["." annotation]] [type (#+ :share) ["." abstract (#+ abstract: :representation :abstraction)]]] @@ -37,10 +37,10 @@ (exception: #export poisoned) (exception: #export dead) -(with-expansions - [<Mail> (as-is (-> s (Actor s) (Promise (Try s)))) - <Obituary> (as-is [Text s (List <Mail>)]) - <Mailbox> (as-is (Rec Mailbox +(with_expansions + [<Mail> (as_is (-> s (Actor s) (Promise (Try s)))) + <Obituary> (as_is [Text s (List <Mail>)]) + <Mailbox> (as_is (Rec Mailbox [(Promise [<Mail> Mailbox]) (Resolver [<Mail> Mailbox])]))] @@ -73,29 +73,29 @@ (type: #export (Behavior o s) {#.doc "An actor's behavior when mail is received and when a fatal error occurs."} - {#on-init (-> o s) - #on-mail (-> (Mail s) s (Actor s) (Promise (Try s))) - #on-stop (-> Text s (Promise Any))}) + {#on_init (-> o s) + #on_mail (-> (Mail s) s (Actor s) (Promise (Try s))) + #on_stop (-> Text s (Promise Any))}) (def: #export (spawn! behavior init) {#.doc "Given a behavior and initial state, spawns an actor and returns it."} (All [o s] (-> (Behavior o s) o (IO (Actor s)))) - (io (let [[on-init on-mail on-stop] behavior + (io (let [[on_init on_mail on_stop] behavior self (:share [o s] {(Behavior o s) behavior} {(Actor s) (:abstraction {#obituary (promise.promise []) #mailbox (atom (promise.promise []))})}) - process (loop [state (on-init init) + process (loop [state (on_init init) [|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))] (do {! promise.monad} [[head tail] |mailbox| - ?state' (on-mail head state self)] + ?state' (on_mail head state self)] (case ?state' (#try.Failure error) (do ! - [_ (on-stop error state)] + [_ (on_stop error state)] (let [[_ resolve] (get@ #obituary (:representation self))] (exec (io.run (do io.monad @@ -195,19 +195,19 @@ ) ) -(def: (default-on-mail mail state self) +(def: (default_on_mail mail state self) (All [s] (-> (Mail s) s (Actor s) (Promise (Try s)))) (mail state self)) -(def: (default-on-stop cause state) +(def: (default_on_stop cause state) (All [s] (-> Text s (Promise Any))) (promise\wrap [])) (def: #export default (All [s] (Behavior s s)) - {#on-init function.identity - #on-mail ..default-on-mail - #on-stop ..default-on-stop}) + {#on_init function.identity + #on_mail ..default_on_mail + #on_stop ..default_on_stop}) (def: #export (poison! actor) {#.doc (doc "Kills the actor by sending mail that will kill it upon processing," @@ -217,64 +217,64 @@ (promise.resolved (exception.throw ..poisoned []))) actor)) -(def: actor-decl^ +(def: actor_decl^ (Parser [Text (List Text)]) - (<>.either (<c>.form (<>.and <c>.local-identifier (<>.some <c>.local-identifier))) - (<>.and <c>.local-identifier (\ <>.monad wrap (list))))) + (<>.either (<c>.form (<>.and <c>.local_identifier (<>.some <c>.local_identifier))) + (<>.and <c>.local_identifier (\ <>.monad wrap (list))))) -(type: On-MailC +(type: On_MailC [[Text Text Text] Code]) -(type: On-StopC +(type: On_StopC [[Text Text] Code]) (type: BehaviorC - [(Maybe On-MailC) (Maybe On-StopC) (List Code)]) + [(Maybe On_MailC) (Maybe On_StopC) (List Code)]) (def: argument (Parser Text) - <c>.local-identifier) + <c>.local_identifier) (def: behavior^ (Parser BehaviorC) - (let [on-mail-args ($_ <>.and ..argument ..argument ..argument) - on-stop-args ($_ <>.and ..argument ..argument)] + (let [on_mail_args ($_ <>.and ..argument ..argument ..argument) + on_stop_args ($_ <>.and ..argument ..argument)] ($_ <>.and - (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on-mail)) on-mail-args)) + (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on_mail)) on_mail_args)) <c>.any))) - (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on-stop)) on-stop-args)) + (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on_stop)) on_stop_args)) <c>.any))) (<>.some <c>.any)))) -(def: (on-mail g!_ ?on-mail) - (-> Code (Maybe On-MailC) Code) - (case ?on-mail +(def: (on_mail g!_ ?on_mail) + (-> Code (Maybe On_MailC) Code) + (case ?on_mail #.None - (` (~! ..default-on-mail)) + (` (~! ..default_on_mail)) (#.Some [[mailN stateN selfN] bodyC]) (` (function ((~ g!_) - (~ (code.local-identifier mailN)) - (~ (code.local-identifier stateN)) - (~ (code.local-identifier selfN))) + (~ (code.local_identifier mailN)) + (~ (code.local_identifier stateN)) + (~ (code.local_identifier selfN))) (~ bodyC))))) -(def: (on-stop g!_ ?on-stop) - (-> Code (Maybe On-StopC) Code) - (case ?on-stop +(def: (on_stop g!_ ?on_stop) + (-> Code (Maybe On_StopC) Code) + (case ?on_stop #.None - (` (~! ..default-on-stop)) + (` (~! ..default_on_stop)) (#.Some [[causeN stateN] bodyC]) (` (function ((~ g!_) - (~ (code.local-identifier causeN)) - (~ (code.local-identifier stateN))) + (~ (code.local_identifier causeN)) + (~ (code.local_identifier stateN))) (~ bodyC))))) -(with-expansions [<examples> (as-is (actor: #export (Stack a) +(with_expansions [<examples> (as_is (actor: #export (Stack a) (List a) - ((on-mail mail state self) + ((on_mail mail state self) (do (try.with promise.monad) [#let [_ (log! "BEFORE")] output (mail state self) @@ -288,7 +288,7 @@ (actor: #export Counter Nat - ((on-stop cause state) + ((on_stop cause state) (\ promise.monad wrap (log! (if (exception.match? ..poisoned cause) (format "Counter was poisoned: " (%.nat state)) @@ -302,45 +302,45 @@ (promise.resolved (#try.Success [state state])))))] (syntax: #export (actor: {export |export|.parser} - {[name vars] actor-decl^} - {annotations (<>.default cs.empty-annotations csr.annotations)} - state-type - {[?on-mail ?on-stop messages] behavior^}) + {[name vars] actor_decl^} + {annotations (<>.default cs.empty_annotations csr.annotations)} + state_type + {[?on_mail ?on_stop messages] behavior^}) {#.doc (doc "Defines an actor, with its behavior and internal state." - "Messages for the actor must be defined after the on-mail and on-stop handlers." + "Messages for the actor must be defined after the on_mail and on_stop handlers." <examples>)} - (with-gensyms [g!_] + (with_gensyms [g!_] (do meta.monad - [g!type (meta.gensym (format name "-abstract-type")) - #let [g!actor (code.local-identifier name) - g!vars (list\map code.local-identifier vars)]] + [g!type (meta.gensym (format name "_abstract_type")) + #let [g!actor (code.local_identifier name) + g!vars (list\map code.local_identifier vars)]] (wrap (list (` ((~! abstract:) (~+ (|export|.write export)) ((~ g!type) (~+ g!vars)) - (~ state-type) + (~ state_type) (def: (~+ (|export|.write export)) (~ g!actor) (All [(~+ g!vars)] - (..Behavior (~ state-type) ((~ g!type) (~+ g!vars)))) - {#..on-init (|>> ((~! abstract.:abstraction) (~ g!type))) - #..on-mail (~ (..on-mail g!_ ?on-mail)) - #..on-stop (~ (..on-stop g!_ ?on-stop))}) + (..Behavior (~ state_type) ((~ g!type) (~+ g!vars)))) + {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type))) + #..on_mail (~ (..on_mail g!_ ?on_mail)) + #..on_stop (~ (..on_stop g!_ ?on_stop))}) (~+ messages)))))))) - (syntax: #export (actor {[state-type init] (<c>.record (<>.and <c>.any <c>.any))} - {[?on-mail ?on-stop messages] behavior^}) - (with-gensyms [g!_] - (wrap (list (` (: ((~! io.IO) (..Actor (~ state-type))) - (..spawn! (: (..Behavior (~ state-type) (~ state-type)) - {#..on-init (|>>) - #..on-mail (~ (..on-mail g!_ ?on-mail)) - #..on-stop (~ (..on-stop g!_ ?on-stop))}) - (: (~ state-type) + (syntax: #export (actor {[state_type init] (<c>.record (<>.and <c>.any <c>.any))} + {[?on_mail ?on_stop messages] behavior^}) + (with_gensyms [g!_] + (wrap (list (` (: ((~! io.IO) (..Actor (~ state_type))) + (..spawn! (: (..Behavior (~ state_type) (~ state_type)) + {#..on_init (|>>) + #..on_mail (~ (..on_mail g!_ ?on_mail)) + #..on_stop (~ (..on_stop g!_ ?on_stop))}) + (: (~ state_type) (~ init))))))))) (type: Signature {#vars (List Text) #name Text - #inputs (List cs.Typed-Input) + #inputs (List cs.Typed_Input) #state Text #self Text #output Code}) @@ -348,22 +348,22 @@ (def: signature^ (Parser Signature) (<c>.form ($_ <>.and - (<>.default (list) (<c>.tuple (<>.some <c>.local-identifier))) - <c>.local-identifier - (<>.some csr.typed-input) - <c>.local-identifier - <c>.local-identifier + (<>.default (list) (<c>.tuple (<>.some <c>.local_identifier))) + <c>.local_identifier + (<>.some csr.typed_input) + <c>.local_identifier + <c>.local_identifier <c>.any))) (def: reference^ (Parser [Name (List Text)]) - (<>.either (<c>.form (<>.and <c>.identifier (<>.some <c>.local-identifier))) + (<>.either (<c>.form (<>.and <c>.identifier (<>.some <c>.local_identifier))) (<>.and <c>.identifier (\ <>.monad wrap (list))))) (syntax: #export (message: {export |export|.parser} {signature signature^} - {annotations (<>.default cs.empty-annotations csr.annotations)} + {annotations (<>.default cs.empty_annotations csr.annotations)} body) {#.doc (doc "A message can access the actor's state through the state parameter." "A message can also access the actor itself through the self parameter." @@ -371,30 +371,30 @@ "A message may succeed or fail (in case of failure, the actor dies)." <examples>)} - (with-gensyms [g!_ g!return] + (with_gensyms [g!_ g!return] (do meta.monad - [actor-scope abstract.current - #let [g!type (code.local-identifier (get@ #abstract.name actor-scope)) - g!message (code.local-identifier (get@ #name signature)) - g!actor-vars (get@ #abstract.type-vars actor-scope) - g!all-vars (|> (get@ #vars signature) (list\map code.local-identifier) (list\compose g!actor-vars)) + [actor_scope abstract.current + #let [g!type (code.local_identifier (get@ #abstract.name actor_scope)) + g!message (code.local_identifier (get@ #name signature)) + g!actor_vars (get@ #abstract.type_vars actor_scope) + g!all_vars (|> (get@ #vars signature) (list\map code.local_identifier) (list\compose g!actor_vars)) g!inputsC (|> (get@ #inputs signature) (list\map product.left)) g!inputsT (|> (get@ #inputs signature) (list\map product.right)) - g!state (|> signature (get@ #state) code.local-identifier) - g!self (|> signature (get@ #self) code.local-identifier)]] + g!state (|> signature (get@ #state) code.local_identifier) + g!self (|> signature (get@ #self) code.local_identifier)]] (wrap (list (` (def: (~+ (|export|.write export)) ((~ g!message) (~+ g!inputsC)) (~ (csw.annotations annotations)) - (All [(~+ g!all-vars)] + (All [(~+ g!all_vars)] (-> (~+ g!inputsT) - (..Message (~ (get@ #abstract.abstraction actor-scope)) + (..Message (~ (get@ #abstract.abstraction actor_scope)) (~ (get@ #output signature))))) (function ((~ g!_) (~ g!state) (~ g!self)) - (let [(~ g!state) (:coerce (~ (get@ #abstract.representation actor-scope)) + (let [(~ g!state) (:coerce (~ (get@ #abstract.representation actor_scope)) (~ g!state))] (|> (~ body) - (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor-scope)) + (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope)) (~ (get@ #output signature))]))) - (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor-scope)) + (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope)) (~ (get@ #output signature))])))))))) )))))) @@ -416,6 +416,6 @@ (if continue? (do ! [outcome (..mail! (action event stop) actor)] - (wrap (try.to-maybe outcome))) + (wrap (try.to_maybe outcome))) (wrap #.None)))) channel))) diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux index 04517cc3e..3920c0214 100644 --- a/stdlib/source/lux/control/concurrency/atom.lux +++ b/stdlib/source/lux/control/concurrency/atom.lux @@ -13,15 +13,15 @@ [type abstract]]) -(with-expansions [<jvm> (as-is (host.import: (java/util/concurrent/atomic/AtomicReference a) +(with_expansions [<jvm> (as_is (host.import: (java/util/concurrent/atomic/AtomicReference a) ["#::." (new [a]) (get [] a) (compareAndSet [a a] boolean)]))] - (for {@.old <jvm> - @.jvm <jvm>} - - (as-is))) + (for {@.old <jvm> + @.jvm <jvm>} + + (as_is))) (abstract: #export (Atom a) (for {@.old @@ -60,7 +60,7 @@ ("js array read" 0 (:representation atom)) }))) - (def: #export (compare-and-swap current new atom) + (def: #export (compare_and_swap current new atom) {#.doc (doc "Only mutates an atom if you can present its current value." "That guarantees that atom was not updated since you last read from it.")} (All [a] (-> a a (Atom a) (IO Bit))) @@ -87,7 +87,7 @@ (do io.monad [old (read atom) #let [new (f old)] - swapped? (compare-and-swap old new atom)] + swapped? (compare_and_swap old new atom)] (if swapped? (wrap new) (recur []))))) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index aea0b082a..0c5303f46 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -22,7 +22,7 @@ {#.doc "An asynchronous channel to distribute values."} (Promise (Maybe [a (Channel a)]))) -(exception: #export channel-is-already-closed) +(exception: #export channel_is_already_closed) (signature: #export (Sink a) (: (IO (Try Any)) @@ -49,7 +49,7 @@ [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink. - (wrap (exception.throw ..channel-is-already-closed [])) + (wrap (exception.throw ..channel_is_already_closed [])) ## Someone else fed the sink while I was closing it. (recur []))))))) @@ -57,7 +57,7 @@ (loop [_ []] (do {! io.monad} [current (atom.read sink) - #let [[next resolve-next] (:share [a] + #let [[next resolve_next] (:share [a] {(promise.Resolver (Maybe [a (Channel a)])) current} {[(Promise (Maybe [a (Channel a)])) @@ -67,14 +67,14 @@ (if fed? ## I fed the sink. (do ! - [_ (atom.compare-and-swap current resolve-next sink)] + [_ (atom.compare_and_swap current resolve_next sink)] (wrap (exception.return []))) ## Someone else interacted with the sink. (do ! [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink while I was feeding it. - (wrap (exception.throw ..channel-is-already-closed [])) + (wrap (exception.throw ..channel_is_already_closed [])) ## Someone else fed the sink. (recur [])))))))))) @@ -99,11 +99,11 @@ (def: (apply ff fa) (do promise.monad - [cons-f ff - cons-a fa] - (case [cons-f cons-a] - [(#.Some [head-f tail-f]) (#.Some [head-a tail-a])] - (wrap (#.Some [(head-f head-a) (apply tail-f tail-a)])) + [cons_f ff + cons_a fa] + (case [cons_f cons_a] + [(#.Some [head_f tail_f]) (#.Some [head_a tail_a])] + (wrap (#.Some [(head_f head_a) (apply tail_f tail_a)])) _ (wrap #.None))))) @@ -181,7 +181,7 @@ #.None (wrap #.None)))) -(def: #export (from-promise promise) +(def: #export (from_promise promise) (All [a] (-> (Promise a) (Channel a))) (promise\map (function (_ value) (#.Some [value ..empty])) @@ -219,7 +219,7 @@ [init' (f head init)] (wrap (#.Some [init (folds f init' tail)])))))) -(def: #export (poll milli-seconds action) +(def: #export (poll milli_seconds action) (All [a] (-> Nat (IO a) [(Channel a) (Sink a)])) (let [[output sink] (channel [])] @@ -227,12 +227,12 @@ (do io.monad [value action _ (\ sink feed value)] - (promise.await recur (promise.wait milli-seconds))))) + (promise.await recur (promise.wait milli_seconds))))) [output sink]))) -(def: #export (periodic milli-seconds) +(def: #export (periodic milli_seconds) (-> Nat [(Channel Any) (Sink Any)]) - (..poll milli-seconds (io []))) + (..poll milli_seconds (io []))) (def: #export (iterate f init) (All [s o] (-> (-> s (Promise (Maybe [s o]))) s (Channel o))) @@ -282,7 +282,7 @@ #.None (wrap #.Nil)))) -(def: #export (sequential milli-seconds values) +(def: #export (sequential milli_seconds values) (All [a] (-> Nat (List a) (Channel a))) (case values #.Nil @@ -290,5 +290,5 @@ (#.Cons head tail) (promise.resolved (#.Some [head (do promise.monad - [_ (promise.wait milli-seconds)] - (sequential milli-seconds tail))])))) + [_ (promise.wait milli_seconds)] + (sequential milli_seconds tail))])))) diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux index e4835b8d8..96822700d 100644 --- a/stdlib/source/lux/control/concurrency/promise.lux +++ b/stdlib/source/lux/control/concurrency/promise.lux @@ -38,7 +38,7 @@ #.None (do ! [#let [new [(#.Some value) #.None]] - succeeded? (atom.compare-and-swap old new promise)] + succeeded? (atom.compare_and_swap old new promise)] (if succeeded? (do ! [_ (monad.map ! (function (_ f) (f value)) @@ -72,7 +72,7 @@ #.None (let [new [_value (#.Cons f _observers)]] - (if (io.run (atom.compare-and-swap old new promise)) + (if (io.run (atom.compare_and_swap old new promise)) (io.io []) (await f (:abstraction promise))))))) ) @@ -134,7 +134,7 @@ {#.doc "Heterogeneous alternative combinator."} (All [a b] (-> (Promise a) (Promise b) (Promise (| a b)))) (let [[a|b resolve] (..promise [])] - (with-expansions + (with_expansions [<sides> (template [<promise> <tag>] [(io.run (await (|>> <tag> resolve) <promise>))] @@ -155,7 +155,7 @@ [right])) left||right)))) -(def: #export (schedule millis-delay computation) +(def: #export (schedule millis_delay computation) {#.doc (doc "Runs an I/O computation on its own thread (after a specified delay)." "Returns a Promise that will eventually host its result.")} (All [a] (-> Nat (IO a) (Promise a))) @@ -163,7 +163,7 @@ (exec (|> (do io.monad [value computation] (resolve value)) - (thread.schedule millis-delay) + (thread.schedule millis_delay) io.run) !out))) @@ -173,17 +173,17 @@ (All [a] (-> (IO a) (Promise a))) (schedule 0)) -(def: #export (delay time-millis value) +(def: #export (delay time_millis value) {#.doc "Delivers a value after a certain period has passed."} (All [a] (-> Nat a (Promise a))) - (schedule time-millis (io value))) + (schedule time_millis (io value))) -(def: #export (wait time-millis) +(def: #export (wait time_millis) {#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."} (-> Nat (Promise Any)) - (delay time-millis [])) + (delay time_millis [])) -(def: #export (time-out time-millis promise) +(def: #export (time_out time_millis promise) {#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."} (All [a] (-> Nat (Promise a) (Promise (Maybe a)))) - (..or (wait time-millis) promise)) + (..or (wait time_millis) promise)) diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux index a405b7b3e..9e6ff9b29 100644 --- a/stdlib/source/lux/control/concurrency/semaphore.lux +++ b/stdlib/source/lux/control/concurrency/semaphore.lux @@ -22,25 +22,25 @@ ["." promise (#+ Promise Resolver)]]) (type: State - {#max-positions Nat - #open-positions Int - #waiting-list (Queue (Resolver Any))}) + {#max_positions Nat + #open_positions Int + #waiting_list (Queue (Resolver Any))}) (abstract: #export Semaphore (Atom State) {#.doc "A tool for controlling access to resources by multiple concurrent processes."} - (def: most-positions-possible + (def: most_positions_possible (.nat (\ i.interval top))) - (def: #export (semaphore initial-open-positions) + (def: #export (semaphore initial_open_positions) (-> Nat Semaphore) - (let [max-positions (n.min initial-open-positions - ..most-positions-possible)] - (:abstraction (atom.atom {#max-positions max-positions - #open-positions (.int max-positions) - #waiting-list queue.empty})))) + (let [max_positions (n.min initial_open_positions + ..most_positions_possible)] + (:abstraction (atom.atom {#max_positions max_positions + #open_positions (.int max_positions) + #waiting_list queue.empty})))) (def: #export (wait semaphore) (Ex [k] (-> Semaphore (Promise Any))) @@ -52,13 +52,13 @@ (do io.monad [state (atom.read semaphore) #let [[ready? state'] (: [Bit State] - (if (i.> +0 (get@ #open-positions state)) + (if (i.> +0 (get@ #open_positions state)) [true (|> state - (update@ #open-positions dec))] + (update@ #open_positions dec))] [false (|> state - (update@ #open-positions dec) - (update@ #waiting-list (queue.push sink)))]))] - success? (atom.compare-and-swap state state' semaphore)] + (update@ #open_positions dec) + (update@ #waiting_list (queue.push sink)))]))] + success? (atom.compare_and_swap state state' semaphore)] (if success? (if ready? (sink []) @@ -66,9 +66,9 @@ (recur []))))) signal))) - (exception: #export (semaphore-is-maxed-out {max-positions Nat}) + (exception: #export (semaphore_is_maxed_out {max_positions Nat}) (exception.report - ["Max Positions" (%.nat max-positions)])) + ["Max Positions" (%.nat max_positions)])) (def: #export (signal semaphore) (Ex [k] (-> Semaphore (Promise (Try Int)))) @@ -77,29 +77,29 @@ (loop [_ []] (do {! io.monad} [state (atom.read semaphore) - #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit] - (case (queue.peek (get@ #waiting-list state)) + #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit] + (case (queue.peek (get@ #waiting_list state)) #.None - (if (n.= (get@ #max-positions state) - (.nat (get@ #open-positions state))) + (if (n.= (get@ #max_positions state) + (.nat (get@ #open_positions state))) [#.None state true] [#.None - (update@ #open-positions inc state) + (update@ #open_positions inc state) false]) (#.Some head) [(#.Some head) (|> state - (update@ #open-positions inc) - (update@ #waiting-list queue.pop)) + (update@ #open_positions inc) + (update@ #waiting_list queue.pop)) false]))]] - (if maxed-out? - (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)])) + (if maxed_out? + (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)])) (do ! - [#let [open-positions (get@ #open-positions state')] - success? (atom.compare-and-swap state state' semaphore)] + [#let [open_positions (get@ #open_positions state')] + success? (atom.compare_and_swap state state' semaphore)] (if success? (do ! [_ (case ?sink @@ -108,7 +108,7 @@ (#.Some sink) (sink []))] - (wrap (#try.Success open-positions))) + (wrap (#try.Success open_positions))) (recur []))))))))) ) @@ -144,8 +144,8 @@ (abstract: #export Barrier {#limit Limit #count (Atom Nat) - #start-turnstile Semaphore - #end-turnstile Semaphore} + #start_turnstile Semaphore + #end_turnstile Semaphore} {#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."} @@ -153,10 +153,10 @@ (-> Limit Barrier) (:abstraction {#limit limit #count (atom.atom 0) - #start-turnstile (semaphore 0) - #end-turnstile (semaphore 0)})) + #start_turnstile (semaphore 0) + #end_turnstile (semaphore 0)})) - (def: (un-block times turnstile) + (def: (un_block times turnstile) (-> Nat Semaphore (Promise Any)) (loop [step 0] (if (n.< times step) @@ -169,16 +169,16 @@ [(def: (<phase> (^:representation barrier)) (-> Barrier (Promise Any)) (do promise.monad - [#let [limit (refinement.un-refine (get@ #limit barrier)) + [#let [limit (refinement.un_refine (get@ #limit barrier)) goal <goal> count (io.run (atom.update <update> (get@ #count barrier))) reached? (n.= goal count)]] (if reached? - (un-block limit (get@ <turnstile> barrier)) + (un_block limit (get@ <turnstile> barrier)) (wait (get@ <turnstile> barrier)))))] - [start inc limit #start-turnstile] - [end dec 0 #end-turnstile] + [start inc limit #start_turnstile] + [end dec 0 #end_turnstile] ) (def: #export (block barrier) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 523aa5567..7fd916fdb 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -36,7 +36,7 @@ (All [a] (-> (Var a) a)) (|>> :representation atom.read io.run product.left)) - (def: (un-follow sink var) + (def: (un_follow sink var) (All [a] (-> (Sink a) (Var a) (IO Any))) (do io.monad [_ (atom.update (function (_ [value observers]) @@ -44,26 +44,26 @@ (:representation var))] (wrap []))) - (def: (write! new-value var) + (def: (write! new_value var) (All [a] (-> a (Var a) (IO Any))) (do {! io.monad} [#let [var' (:representation var)] - (^@ old [old-value observers]) (atom.read var') - succeeded? (atom.compare-and-swap old [new-value observers] var')] + (^@ old [old_value observers]) (atom.read var') + succeeded? (atom.compare_and_swap old [new_value observers] var')] (if succeeded? (do ! [_ (monad.map ! (function (_ sink) (do ! - [result (\ sink feed new-value)] + [result (\ sink feed new_value)] (case result (#try.Success _) (wrap []) (#try.Failure _) - (un-follow sink var)))) + (un_follow sink var)))) observers)] (wrap [])) - (write! new-value var)))) + (write! new_value var)))) (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} @@ -76,19 +76,19 @@ (wrap [channel sink]))) ) -(type: (Tx-Frame a) +(type: (Tx_Frame a) {#var (Var a) #original a #current a}) (type: Tx - (List (Ex [a] (Tx-Frame a)))) + (List (Ex [a] (Tx_Frame a)))) (type: #export (STM a) {#.doc "A computation which updates a transaction and produces a value."} (-> Tx [Tx a])) -(def: (find-var-value var tx) +(def: (find_var_value var tx) (All [a] (-> (Var a) Tx (Maybe a))) (|> tx (list.find (function (_ [_var _original _current]) @@ -102,7 +102,7 @@ (def: #export (read var) (All [a] (-> (Var a) (STM a))) (function (_ tx) - (case (find-var-value var tx) + (case (find_var_value var tx) (#.Some value) [tx value] @@ -111,7 +111,7 @@ [(#.Cons [var value value] tx) value])))) -(def: (update-tx-value var value tx) +(def: (update_tx_value var value tx) (All [a] (-> (Var a) a Tx Tx)) (case tx #.Nil @@ -127,15 +127,15 @@ (#.Cons {#var _var #original _original #current _current} - (update-tx-value var value tx'))))) + (update_tx_value var value tx'))))) (def: #export (write value var) {#.doc "Writes value to var."} (All [a] (-> a (Var a) (STM Any))) (function (_ tx) - (case (find-var-value var tx) + (case (find_var_value var tx) (#.Some _) - [(update-tx-value var value tx) + [(update_tx_value var value tx) []] #.None @@ -184,40 +184,40 @@ _ (..write a' var)] (wrap [a a']))) -(def: (can-commit? tx) +(def: (can_commit? tx) (-> Tx Bit) (list.every? (function (_ [_var _original _current]) (is? _original (..read! _var))) tx)) -(def: (commit-var! [_var _original _current]) - (-> (Ex [a] (Tx-Frame a)) (IO Any)) +(def: (commit_var! [_var _original _current]) + (-> (Ex [a] (Tx_Frame a)) (IO Any)) (if (is? _original _current) (io []) (..write! _current _var))) -(def: fresh-tx Tx (list)) +(def: fresh_tx Tx (list)) (type: (Commit a) [(STM a) (Promise a) (Resolver a)]) -(def: pending-commits +(def: pending_commits (Atom (Rec Commits [(Promise [(Ex [a] (Commit a)) Commits]) (Resolver [(Ex [a] (Commit a)) Commits])])) (atom (promise.promise []))) -(def: commit-processor-flag +(def: commit_processor_flag (Atom Bit) (atom #0)) -(def: (issue-commit commit) +(def: (issue_commit commit) (All [a] (-> (Commit a) (IO Any))) (let [entry [commit (promise.promise [])]] (do {! io.monad} - [|commits|&resolve (atom.read pending-commits)] + [|commits|&resolve (atom.read pending_commits)] (loop [[|commits| resolve] |commits|&resolve] (do ! [|commits| (promise.poll |commits|)] @@ -226,48 +226,48 @@ (do io.monad [resolved? (resolve entry)] (if resolved? - (atom.write (product.right entry) pending-commits) + (atom.write (product.right entry) pending_commits) (recur |commits|&resolve))) (#.Some [head tail]) (recur tail))))))) -(def: (process-commit commit) +(def: (process_commit commit) (All [a] (-> (Commit a) (IO Any))) - (let [[stm-proc output resolve] commit - [finished-tx value] (stm-proc fresh-tx)] - (if (can-commit? finished-tx) + (let [[stm_proc output resolve] commit + [finished_tx value] (stm_proc fresh_tx)] + (if (can_commit? finished_tx) (do {! io.monad} - [_ (monad.map ! commit-var! finished-tx)] + [_ (monad.map ! commit_var! finished_tx)] (resolve value)) - (issue-commit commit)))) + (issue_commit commit)))) -(def: init-processor! +(def: init_processor! (IO Any) (do {! io.monad} - [flag (atom.read commit-processor-flag)] + [flag (atom.read commit_processor_flag)] (if flag (wrap []) (do ! - [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] - (if was-first? + [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)] + (if was_first? (do ! - [[promise resolve] (atom.read pending-commits)] + [[promise resolve] (atom.read pending_commits)] (promise.await (function (recur [head [tail _resolve]]) (do ! - [_ (process-commit head)] + [_ (process_commit head)] (promise.await recur tail))) promise)) (wrap []))) ))) -(def: #export (commit stm-proc) +(def: #export (commit stm_proc) {#.doc (doc "Commits a transaction and returns its result (asynchronously)." "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} (All [a] (-> (STM a) (Promise a))) (let [[output resolver] (promise.promise [])] (exec (io.run (do io.monad - [_ init-processor!] - (issue-commit [stm-proc output resolver]))) + [_ init_processor!] + (issue_commit [stm_proc output resolver]))) output))) diff --git a/stdlib/source/lux/control/concurrency/thread.lux b/stdlib/source/lux/control/concurrency/thread.lux index 10ec17815..8bdd2b9c9 100644 --- a/stdlib/source/lux/control/concurrency/thread.lux +++ b/stdlib/source/lux/control/concurrency/thread.lux @@ -15,7 +15,7 @@ [// ["." atom (#+ Atom)]]) -(with-expansions [<jvm> (as-is (host.import: java/lang/Object) +(with_expansions [<jvm> (as_is (host.import: java/lang/Object) (host.import: java/lang/Runtime ["#::." @@ -38,11 +38,11 @@ ["#::." (new [int]) (schedule [java/lang/Runnable long java/util/concurrent/TimeUnit] #io (java/util/concurrent/ScheduledFuture java/lang/Object))]))] - (for {@.old (as-is <jvm>) - @.jvm (as-is <jvm>) + (for {@.old (as_is <jvm>) + @.jvm (as_is <jvm>) @.js - (as-is (host.import: (setTimeout [host.Function host.Number] #io Any)))} + (as_is (host.import: (setTimeout [host.Function host.Number] #io Any)))} ## Default (type: Thread @@ -53,7 +53,7 @@ (def: #export parallelism Nat - (with-expansions [<jvm> (|> (java/lang/Runtime::getRuntime) + (with_expansions [<jvm> (|> (java/lang/Runtime::getRuntime) (java/lang/Runtime::availableProcessors) .nat)] (for {@.old <jvm> @@ -62,30 +62,30 @@ ## Default 1))) -(with-expansions [<jvm> (as-is (def: runner +(with_expansions [<jvm> (as_is (def: runner java/util/concurrent/ScheduledThreadPoolExecutor (java/util/concurrent/ScheduledThreadPoolExecutor::new (.int ..parallelism))))] (for {@.old <jvm> @.jvm <jvm> @.js - (as-is)} + (as_is)} ## Default (def: runner (Atom (List Thread)) (atom.atom (list))))) -(def: #export (schedule milli-seconds action) +(def: #export (schedule milli_seconds action) (-> Nat (IO Any) (IO Any)) (for {@.old (let [runnable (host.object [] [java/lang/Runnable] [] (java/lang/Runnable [] (run self) void (io.run action)))] - (case milli-seconds + (case milli_seconds 0 (java/util/concurrent/Executor::execute runnable runner) - _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) java/util/concurrent/TimeUnit::MILLISECONDS + _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS runner))) @.jvm @@ -93,34 +93,34 @@ [] (java/lang/Runnable [] (run self) void (io.run action)))] - (case milli-seconds + (case milli_seconds 0 (java/util/concurrent/Executor::execute runnable runner) - _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) java/util/concurrent/TimeUnit::MILLISECONDS + _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS runner))) @.js (..setTimeout [(host.closure [] (io.run action)) - (n.frac milli-seconds)])} + (n.frac milli_seconds)])} ## Default (do io.monad [_ (atom.update (|>> (#.Cons {#creation (.nat ("lux io current-time")) - #delay milli-seconds + #delay milli_seconds #action action})) ..runner)] (wrap [])))) (for {@.old - (as-is) + (as_is) @.jvm - (as-is) + (as_is) @.js - (as-is)} + (as_is)} ## Default - (as-is (exception: #export cannot-continue-running-threads) + (as_is (exception: #export cannot_continue_running_threads) (def: #export (run! _) (-> Any (IO Any)) @@ -139,11 +139,11 @@ (n.+ (get@ #delay thread)) (n.<= now))) threads)] - swapped? (atom.compare-and-swap threads pending ..runner)] + swapped? (atom.compare_and_swap threads pending ..runner)] (if swapped? (do ! [_ (monad.map ! (get@ #action) ready)] (run! [])) - (error! (ex.construct ..cannot-continue-running-threads [])))) + (error! (ex.construct ..cannot_continue_running_threads [])))) ))) )) |