aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/actor.lux178
-rw-r--r--stdlib/source/lux/control/concurrency/atom.lux14
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux36
-rw-r--r--stdlib/source/lux/control/concurrency/promise.lux22
-rw-r--r--stdlib/source/lux/control/concurrency/semaphore.lux76
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux78
-rw-r--r--stdlib/source/lux/control/concurrency/thread.lux40
7 files changed, 222 insertions, 222 deletions
diff --git a/stdlib/source/lux/control/concurrency/actor.lux b/stdlib/source/lux/control/concurrency/actor.lux
index 66ea24cd8..6355a43b7 100644
--- a/stdlib/source/lux/control/concurrency/actor.lux
+++ b/stdlib/source/lux/control/concurrency/actor.lux
@@ -25,7 +25,7 @@
["csr" reader]
["csw" writer]
["|.|" export]]]]
- ["." meta (#+ with-gensyms monad)
+ ["." meta (#+ with_gensyms monad)
["." annotation]]
[type (#+ :share)
["." abstract (#+ abstract: :representation :abstraction)]]]
@@ -37,10 +37,10 @@
(exception: #export poisoned)
(exception: #export dead)
-(with-expansions
- [<Mail> (as-is (-> s (Actor s) (Promise (Try s))))
- <Obituary> (as-is [Text s (List <Mail>)])
- <Mailbox> (as-is (Rec Mailbox
+(with_expansions
+ [<Mail> (as_is (-> s (Actor s) (Promise (Try s))))
+ <Obituary> (as_is [Text s (List <Mail>)])
+ <Mailbox> (as_is (Rec Mailbox
[(Promise [<Mail> Mailbox])
(Resolver [<Mail> Mailbox])]))]
@@ -73,29 +73,29 @@
(type: #export (Behavior o s)
{#.doc "An actor's behavior when mail is received and when a fatal error occurs."}
- {#on-init (-> o s)
- #on-mail (-> (Mail s) s (Actor s) (Promise (Try s)))
- #on-stop (-> Text s (Promise Any))})
+ {#on_init (-> o s)
+ #on_mail (-> (Mail s) s (Actor s) (Promise (Try s)))
+ #on_stop (-> Text s (Promise Any))})
(def: #export (spawn! behavior init)
{#.doc "Given a behavior and initial state, spawns an actor and returns it."}
(All [o s] (-> (Behavior o s) o (IO (Actor s))))
- (io (let [[on-init on-mail on-stop] behavior
+ (io (let [[on_init on_mail on_stop] behavior
self (:share [o s]
{(Behavior o s)
behavior}
{(Actor s)
(:abstraction {#obituary (promise.promise [])
#mailbox (atom (promise.promise []))})})
- process (loop [state (on-init init)
+ process (loop [state (on_init init)
[|mailbox| _] (io.run (atom.read (get@ #mailbox (:representation self))))]
(do {! promise.monad}
[[head tail] |mailbox|
- ?state' (on-mail head state self)]
+ ?state' (on_mail head state self)]
(case ?state'
(#try.Failure error)
(do !
- [_ (on-stop error state)]
+ [_ (on_stop error state)]
(let [[_ resolve] (get@ #obituary (:representation self))]
(exec (io.run
(do io.monad
@@ -195,19 +195,19 @@
)
)
-(def: (default-on-mail mail state self)
+(def: (default_on_mail mail state self)
(All [s] (-> (Mail s) s (Actor s) (Promise (Try s))))
(mail state self))
-(def: (default-on-stop cause state)
+(def: (default_on_stop cause state)
(All [s] (-> Text s (Promise Any)))
(promise\wrap []))
(def: #export default
(All [s] (Behavior s s))
- {#on-init function.identity
- #on-mail ..default-on-mail
- #on-stop ..default-on-stop})
+ {#on_init function.identity
+ #on_mail ..default_on_mail
+ #on_stop ..default_on_stop})
(def: #export (poison! actor)
{#.doc (doc "Kills the actor by sending mail that will kill it upon processing,"
@@ -217,64 +217,64 @@
(promise.resolved (exception.throw ..poisoned [])))
actor))
-(def: actor-decl^
+(def: actor_decl^
(Parser [Text (List Text)])
- (<>.either (<c>.form (<>.and <c>.local-identifier (<>.some <c>.local-identifier)))
- (<>.and <c>.local-identifier (\ <>.monad wrap (list)))))
+ (<>.either (<c>.form (<>.and <c>.local_identifier (<>.some <c>.local_identifier)))
+ (<>.and <c>.local_identifier (\ <>.monad wrap (list)))))
-(type: On-MailC
+(type: On_MailC
[[Text Text Text] Code])
-(type: On-StopC
+(type: On_StopC
[[Text Text] Code])
(type: BehaviorC
- [(Maybe On-MailC) (Maybe On-StopC) (List Code)])
+ [(Maybe On_MailC) (Maybe On_StopC) (List Code)])
(def: argument
(Parser Text)
- <c>.local-identifier)
+ <c>.local_identifier)
(def: behavior^
(Parser BehaviorC)
- (let [on-mail-args ($_ <>.and ..argument ..argument ..argument)
- on-stop-args ($_ <>.and ..argument ..argument)]
+ (let [on_mail_args ($_ <>.and ..argument ..argument ..argument)
+ on_stop_args ($_ <>.and ..argument ..argument)]
($_ <>.and
- (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on-mail)) on-mail-args))
+ (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on_mail)) on_mail_args))
<c>.any)))
- (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on-stop)) on-stop-args))
+ (<>.maybe (<c>.form (<>.and (<c>.form (<>.after (<c>.this! (' on_stop)) on_stop_args))
<c>.any)))
(<>.some <c>.any))))
-(def: (on-mail g!_ ?on-mail)
- (-> Code (Maybe On-MailC) Code)
- (case ?on-mail
+(def: (on_mail g!_ ?on_mail)
+ (-> Code (Maybe On_MailC) Code)
+ (case ?on_mail
#.None
- (` (~! ..default-on-mail))
+ (` (~! ..default_on_mail))
(#.Some [[mailN stateN selfN] bodyC])
(` (function ((~ g!_)
- (~ (code.local-identifier mailN))
- (~ (code.local-identifier stateN))
- (~ (code.local-identifier selfN)))
+ (~ (code.local_identifier mailN))
+ (~ (code.local_identifier stateN))
+ (~ (code.local_identifier selfN)))
(~ bodyC)))))
-(def: (on-stop g!_ ?on-stop)
- (-> Code (Maybe On-StopC) Code)
- (case ?on-stop
+(def: (on_stop g!_ ?on_stop)
+ (-> Code (Maybe On_StopC) Code)
+ (case ?on_stop
#.None
- (` (~! ..default-on-stop))
+ (` (~! ..default_on_stop))
(#.Some [[causeN stateN] bodyC])
(` (function ((~ g!_)
- (~ (code.local-identifier causeN))
- (~ (code.local-identifier stateN)))
+ (~ (code.local_identifier causeN))
+ (~ (code.local_identifier stateN)))
(~ bodyC)))))
-(with-expansions [<examples> (as-is (actor: #export (Stack a)
+(with_expansions [<examples> (as_is (actor: #export (Stack a)
(List a)
- ((on-mail mail state self)
+ ((on_mail mail state self)
(do (try.with promise.monad)
[#let [_ (log! "BEFORE")]
output (mail state self)
@@ -288,7 +288,7 @@
(actor: #export Counter
Nat
- ((on-stop cause state)
+ ((on_stop cause state)
(\ promise.monad wrap
(log! (if (exception.match? ..poisoned cause)
(format "Counter was poisoned: " (%.nat state))
@@ -302,45 +302,45 @@
(promise.resolved (#try.Success [state state])))))]
(syntax: #export (actor:
{export |export|.parser}
- {[name vars] actor-decl^}
- {annotations (<>.default cs.empty-annotations csr.annotations)}
- state-type
- {[?on-mail ?on-stop messages] behavior^})
+ {[name vars] actor_decl^}
+ {annotations (<>.default cs.empty_annotations csr.annotations)}
+ state_type
+ {[?on_mail ?on_stop messages] behavior^})
{#.doc (doc "Defines an actor, with its behavior and internal state."
- "Messages for the actor must be defined after the on-mail and on-stop handlers."
+ "Messages for the actor must be defined after the on_mail and on_stop handlers."
<examples>)}
- (with-gensyms [g!_]
+ (with_gensyms [g!_]
(do meta.monad
- [g!type (meta.gensym (format name "-abstract-type"))
- #let [g!actor (code.local-identifier name)
- g!vars (list\map code.local-identifier vars)]]
+ [g!type (meta.gensym (format name "_abstract_type"))
+ #let [g!actor (code.local_identifier name)
+ g!vars (list\map code.local_identifier vars)]]
(wrap (list (` ((~! abstract:) (~+ (|export|.write export)) ((~ g!type) (~+ g!vars))
- (~ state-type)
+ (~ state_type)
(def: (~+ (|export|.write export)) (~ g!actor)
(All [(~+ g!vars)]
- (..Behavior (~ state-type) ((~ g!type) (~+ g!vars))))
- {#..on-init (|>> ((~! abstract.:abstraction) (~ g!type)))
- #..on-mail (~ (..on-mail g!_ ?on-mail))
- #..on-stop (~ (..on-stop g!_ ?on-stop))})
+ (..Behavior (~ state_type) ((~ g!type) (~+ g!vars))))
+ {#..on_init (|>> ((~! abstract.:abstraction) (~ g!type)))
+ #..on_mail (~ (..on_mail g!_ ?on_mail))
+ #..on_stop (~ (..on_stop g!_ ?on_stop))})
(~+ messages))))))))
- (syntax: #export (actor {[state-type init] (<c>.record (<>.and <c>.any <c>.any))}
- {[?on-mail ?on-stop messages] behavior^})
- (with-gensyms [g!_]
- (wrap (list (` (: ((~! io.IO) (..Actor (~ state-type)))
- (..spawn! (: (..Behavior (~ state-type) (~ state-type))
- {#..on-init (|>>)
- #..on-mail (~ (..on-mail g!_ ?on-mail))
- #..on-stop (~ (..on-stop g!_ ?on-stop))})
- (: (~ state-type)
+ (syntax: #export (actor {[state_type init] (<c>.record (<>.and <c>.any <c>.any))}
+ {[?on_mail ?on_stop messages] behavior^})
+ (with_gensyms [g!_]
+ (wrap (list (` (: ((~! io.IO) (..Actor (~ state_type)))
+ (..spawn! (: (..Behavior (~ state_type) (~ state_type))
+ {#..on_init (|>>)
+ #..on_mail (~ (..on_mail g!_ ?on_mail))
+ #..on_stop (~ (..on_stop g!_ ?on_stop))})
+ (: (~ state_type)
(~ init)))))))))
(type: Signature
{#vars (List Text)
#name Text
- #inputs (List cs.Typed-Input)
+ #inputs (List cs.Typed_Input)
#state Text
#self Text
#output Code})
@@ -348,22 +348,22 @@
(def: signature^
(Parser Signature)
(<c>.form ($_ <>.and
- (<>.default (list) (<c>.tuple (<>.some <c>.local-identifier)))
- <c>.local-identifier
- (<>.some csr.typed-input)
- <c>.local-identifier
- <c>.local-identifier
+ (<>.default (list) (<c>.tuple (<>.some <c>.local_identifier)))
+ <c>.local_identifier
+ (<>.some csr.typed_input)
+ <c>.local_identifier
+ <c>.local_identifier
<c>.any)))
(def: reference^
(Parser [Name (List Text)])
- (<>.either (<c>.form (<>.and <c>.identifier (<>.some <c>.local-identifier)))
+ (<>.either (<c>.form (<>.and <c>.identifier (<>.some <c>.local_identifier)))
(<>.and <c>.identifier (\ <>.monad wrap (list)))))
(syntax: #export (message:
{export |export|.parser}
{signature signature^}
- {annotations (<>.default cs.empty-annotations csr.annotations)}
+ {annotations (<>.default cs.empty_annotations csr.annotations)}
body)
{#.doc (doc "A message can access the actor's state through the state parameter."
"A message can also access the actor itself through the self parameter."
@@ -371,30 +371,30 @@
"A message may succeed or fail (in case of failure, the actor dies)."
<examples>)}
- (with-gensyms [g!_ g!return]
+ (with_gensyms [g!_ g!return]
(do meta.monad
- [actor-scope abstract.current
- #let [g!type (code.local-identifier (get@ #abstract.name actor-scope))
- g!message (code.local-identifier (get@ #name signature))
- g!actor-vars (get@ #abstract.type-vars actor-scope)
- g!all-vars (|> (get@ #vars signature) (list\map code.local-identifier) (list\compose g!actor-vars))
+ [actor_scope abstract.current
+ #let [g!type (code.local_identifier (get@ #abstract.name actor_scope))
+ g!message (code.local_identifier (get@ #name signature))
+ g!actor_vars (get@ #abstract.type_vars actor_scope)
+ g!all_vars (|> (get@ #vars signature) (list\map code.local_identifier) (list\compose g!actor_vars))
g!inputsC (|> (get@ #inputs signature) (list\map product.left))
g!inputsT (|> (get@ #inputs signature) (list\map product.right))
- g!state (|> signature (get@ #state) code.local-identifier)
- g!self (|> signature (get@ #self) code.local-identifier)]]
+ g!state (|> signature (get@ #state) code.local_identifier)
+ g!self (|> signature (get@ #self) code.local_identifier)]]
(wrap (list (` (def: (~+ (|export|.write export)) ((~ g!message) (~+ g!inputsC))
(~ (csw.annotations annotations))
- (All [(~+ g!all-vars)]
+ (All [(~+ g!all_vars)]
(-> (~+ g!inputsT)
- (..Message (~ (get@ #abstract.abstraction actor-scope))
+ (..Message (~ (get@ #abstract.abstraction actor_scope))
(~ (get@ #output signature)))))
(function ((~ g!_) (~ g!state) (~ g!self))
- (let [(~ g!state) (:coerce (~ (get@ #abstract.representation actor-scope))
+ (let [(~ g!state) (:coerce (~ (get@ #abstract.representation actor_scope))
(~ g!state))]
(|> (~ body)
- (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor-scope))
+ (: ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.representation actor_scope))
(~ (get@ #output signature))])))
- (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor-scope))
+ (:coerce ((~! promise.Promise) ((~! try.Try) [(~ (get@ #abstract.abstraction actor_scope))
(~ (get@ #output signature))]))))))))
))))))
@@ -416,6 +416,6 @@
(if continue?
(do !
[outcome (..mail! (action event stop) actor)]
- (wrap (try.to-maybe outcome)))
+ (wrap (try.to_maybe outcome)))
(wrap #.None))))
channel)))
diff --git a/stdlib/source/lux/control/concurrency/atom.lux b/stdlib/source/lux/control/concurrency/atom.lux
index 04517cc3e..3920c0214 100644
--- a/stdlib/source/lux/control/concurrency/atom.lux
+++ b/stdlib/source/lux/control/concurrency/atom.lux
@@ -13,15 +13,15 @@
[type
abstract]])
-(with-expansions [<jvm> (as-is (host.import: (java/util/concurrent/atomic/AtomicReference a)
+(with_expansions [<jvm> (as_is (host.import: (java/util/concurrent/atomic/AtomicReference a)
["#::."
(new [a])
(get [] a)
(compareAndSet [a a] boolean)]))]
- (for {@.old <jvm>
- @.jvm <jvm>}
-
- (as-is)))
+ (for {@.old <jvm>
+ @.jvm <jvm>}
+
+ (as_is)))
(abstract: #export (Atom a)
(for {@.old
@@ -60,7 +60,7 @@
("js array read" 0 (:representation atom))
})))
- (def: #export (compare-and-swap current new atom)
+ (def: #export (compare_and_swap current new atom)
{#.doc (doc "Only mutates an atom if you can present its current value."
"That guarantees that atom was not updated since you last read from it.")}
(All [a] (-> a a (Atom a) (IO Bit)))
@@ -87,7 +87,7 @@
(do io.monad
[old (read atom)
#let [new (f old)]
- swapped? (compare-and-swap old new atom)]
+ swapped? (compare_and_swap old new atom)]
(if swapped?
(wrap new)
(recur [])))))
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index aea0b082a..0c5303f46 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -22,7 +22,7 @@
{#.doc "An asynchronous channel to distribute values."}
(Promise (Maybe [a (Channel a)])))
-(exception: #export channel-is-already-closed)
+(exception: #export channel_is_already_closed)
(signature: #export (Sink a)
(: (IO (Try Any))
@@ -49,7 +49,7 @@
[latter (atom.read sink)]
(if (is? current latter)
## Someone else closed the sink.
- (wrap (exception.throw ..channel-is-already-closed []))
+ (wrap (exception.throw ..channel_is_already_closed []))
## Someone else fed the sink while I was closing it.
(recur [])))))))
@@ -57,7 +57,7 @@
(loop [_ []]
(do {! io.monad}
[current (atom.read sink)
- #let [[next resolve-next] (:share [a]
+ #let [[next resolve_next] (:share [a]
{(promise.Resolver (Maybe [a (Channel a)]))
current}
{[(Promise (Maybe [a (Channel a)]))
@@ -67,14 +67,14 @@
(if fed?
## I fed the sink.
(do !
- [_ (atom.compare-and-swap current resolve-next sink)]
+ [_ (atom.compare_and_swap current resolve_next sink)]
(wrap (exception.return [])))
## Someone else interacted with the sink.
(do !
[latter (atom.read sink)]
(if (is? current latter)
## Someone else closed the sink while I was feeding it.
- (wrap (exception.throw ..channel-is-already-closed []))
+ (wrap (exception.throw ..channel_is_already_closed []))
## Someone else fed the sink.
(recur []))))))))))
@@ -99,11 +99,11 @@
(def: (apply ff fa)
(do promise.monad
- [cons-f ff
- cons-a fa]
- (case [cons-f cons-a]
- [(#.Some [head-f tail-f]) (#.Some [head-a tail-a])]
- (wrap (#.Some [(head-f head-a) (apply tail-f tail-a)]))
+ [cons_f ff
+ cons_a fa]
+ (case [cons_f cons_a]
+ [(#.Some [head_f tail_f]) (#.Some [head_a tail_a])]
+ (wrap (#.Some [(head_f head_a) (apply tail_f tail_a)]))
_
(wrap #.None)))))
@@ -181,7 +181,7 @@
#.None
(wrap #.None))))
-(def: #export (from-promise promise)
+(def: #export (from_promise promise)
(All [a] (-> (Promise a) (Channel a)))
(promise\map (function (_ value)
(#.Some [value ..empty]))
@@ -219,7 +219,7 @@
[init' (f head init)]
(wrap (#.Some [init (folds f init' tail)]))))))
-(def: #export (poll milli-seconds action)
+(def: #export (poll milli_seconds action)
(All [a]
(-> Nat (IO a) [(Channel a) (Sink a)]))
(let [[output sink] (channel [])]
@@ -227,12 +227,12 @@
(do io.monad
[value action
_ (\ sink feed value)]
- (promise.await recur (promise.wait milli-seconds)))))
+ (promise.await recur (promise.wait milli_seconds)))))
[output sink])))
-(def: #export (periodic milli-seconds)
+(def: #export (periodic milli_seconds)
(-> Nat [(Channel Any) (Sink Any)])
- (..poll milli-seconds (io [])))
+ (..poll milli_seconds (io [])))
(def: #export (iterate f init)
(All [s o] (-> (-> s (Promise (Maybe [s o]))) s (Channel o)))
@@ -282,7 +282,7 @@
#.None
(wrap #.Nil))))
-(def: #export (sequential milli-seconds values)
+(def: #export (sequential milli_seconds values)
(All [a] (-> Nat (List a) (Channel a)))
(case values
#.Nil
@@ -290,5 +290,5 @@
(#.Cons head tail)
(promise.resolved (#.Some [head (do promise.monad
- [_ (promise.wait milli-seconds)]
- (sequential milli-seconds tail))]))))
+ [_ (promise.wait milli_seconds)]
+ (sequential milli_seconds tail))]))))
diff --git a/stdlib/source/lux/control/concurrency/promise.lux b/stdlib/source/lux/control/concurrency/promise.lux
index e4835b8d8..96822700d 100644
--- a/stdlib/source/lux/control/concurrency/promise.lux
+++ b/stdlib/source/lux/control/concurrency/promise.lux
@@ -38,7 +38,7 @@
#.None
(do !
[#let [new [(#.Some value) #.None]]
- succeeded? (atom.compare-and-swap old new promise)]
+ succeeded? (atom.compare_and_swap old new promise)]
(if succeeded?
(do !
[_ (monad.map ! (function (_ f) (f value))
@@ -72,7 +72,7 @@
#.None
(let [new [_value (#.Cons f _observers)]]
- (if (io.run (atom.compare-and-swap old new promise))
+ (if (io.run (atom.compare_and_swap old new promise))
(io.io [])
(await f (:abstraction promise)))))))
)
@@ -134,7 +134,7 @@
{#.doc "Heterogeneous alternative combinator."}
(All [a b] (-> (Promise a) (Promise b) (Promise (| a b))))
(let [[a|b resolve] (..promise [])]
- (with-expansions
+ (with_expansions
[<sides> (template [<promise> <tag>]
[(io.run (await (|>> <tag> resolve) <promise>))]
@@ -155,7 +155,7 @@
[right]))
left||right))))
-(def: #export (schedule millis-delay computation)
+(def: #export (schedule millis_delay computation)
{#.doc (doc "Runs an I/O computation on its own thread (after a specified delay)."
"Returns a Promise that will eventually host its result.")}
(All [a] (-> Nat (IO a) (Promise a)))
@@ -163,7 +163,7 @@
(exec (|> (do io.monad
[value computation]
(resolve value))
- (thread.schedule millis-delay)
+ (thread.schedule millis_delay)
io.run)
!out)))
@@ -173,17 +173,17 @@
(All [a] (-> (IO a) (Promise a)))
(schedule 0))
-(def: #export (delay time-millis value)
+(def: #export (delay time_millis value)
{#.doc "Delivers a value after a certain period has passed."}
(All [a] (-> Nat a (Promise a)))
- (schedule time-millis (io value)))
+ (schedule time_millis (io value)))
-(def: #export (wait time-millis)
+(def: #export (wait time_millis)
{#.doc "Returns a promise that will be resolved after the specified amount of milliseconds."}
(-> Nat (Promise Any))
- (delay time-millis []))
+ (delay time_millis []))
-(def: #export (time-out time-millis promise)
+(def: #export (time_out time_millis promise)
{#.doc "Wait for a promise to be resolved within the specified amount of milliseconds."}
(All [a] (-> Nat (Promise a) (Promise (Maybe a))))
- (..or (wait time-millis) promise))
+ (..or (wait time_millis) promise))
diff --git a/stdlib/source/lux/control/concurrency/semaphore.lux b/stdlib/source/lux/control/concurrency/semaphore.lux
index a405b7b3e..9e6ff9b29 100644
--- a/stdlib/source/lux/control/concurrency/semaphore.lux
+++ b/stdlib/source/lux/control/concurrency/semaphore.lux
@@ -22,25 +22,25 @@
["." promise (#+ Promise Resolver)]])
(type: State
- {#max-positions Nat
- #open-positions Int
- #waiting-list (Queue (Resolver Any))})
+ {#max_positions Nat
+ #open_positions Int
+ #waiting_list (Queue (Resolver Any))})
(abstract: #export Semaphore
(Atom State)
{#.doc "A tool for controlling access to resources by multiple concurrent processes."}
- (def: most-positions-possible
+ (def: most_positions_possible
(.nat (\ i.interval top)))
- (def: #export (semaphore initial-open-positions)
+ (def: #export (semaphore initial_open_positions)
(-> Nat Semaphore)
- (let [max-positions (n.min initial-open-positions
- ..most-positions-possible)]
- (:abstraction (atom.atom {#max-positions max-positions
- #open-positions (.int max-positions)
- #waiting-list queue.empty}))))
+ (let [max_positions (n.min initial_open_positions
+ ..most_positions_possible)]
+ (:abstraction (atom.atom {#max_positions max_positions
+ #open_positions (.int max_positions)
+ #waiting_list queue.empty}))))
(def: #export (wait semaphore)
(Ex [k] (-> Semaphore (Promise Any)))
@@ -52,13 +52,13 @@
(do io.monad
[state (atom.read semaphore)
#let [[ready? state'] (: [Bit State]
- (if (i.> +0 (get@ #open-positions state))
+ (if (i.> +0 (get@ #open_positions state))
[true (|> state
- (update@ #open-positions dec))]
+ (update@ #open_positions dec))]
[false (|> state
- (update@ #open-positions dec)
- (update@ #waiting-list (queue.push sink)))]))]
- success? (atom.compare-and-swap state state' semaphore)]
+ (update@ #open_positions dec)
+ (update@ #waiting_list (queue.push sink)))]))]
+ success? (atom.compare_and_swap state state' semaphore)]
(if success?
(if ready?
(sink [])
@@ -66,9 +66,9 @@
(recur [])))))
signal)))
- (exception: #export (semaphore-is-maxed-out {max-positions Nat})
+ (exception: #export (semaphore_is_maxed_out {max_positions Nat})
(exception.report
- ["Max Positions" (%.nat max-positions)]))
+ ["Max Positions" (%.nat max_positions)]))
(def: #export (signal semaphore)
(Ex [k] (-> Semaphore (Promise (Try Int))))
@@ -77,29 +77,29 @@
(loop [_ []]
(do {! io.monad}
[state (atom.read semaphore)
- #let [[?sink state' maxed-out?] (: [(Maybe (Resolver Any)) State Bit]
- (case (queue.peek (get@ #waiting-list state))
+ #let [[?sink state' maxed_out?] (: [(Maybe (Resolver Any)) State Bit]
+ (case (queue.peek (get@ #waiting_list state))
#.None
- (if (n.= (get@ #max-positions state)
- (.nat (get@ #open-positions state)))
+ (if (n.= (get@ #max_positions state)
+ (.nat (get@ #open_positions state)))
[#.None
state
true]
[#.None
- (update@ #open-positions inc state)
+ (update@ #open_positions inc state)
false])
(#.Some head)
[(#.Some head)
(|> state
- (update@ #open-positions inc)
- (update@ #waiting-list queue.pop))
+ (update@ #open_positions inc)
+ (update@ #waiting_list queue.pop))
false]))]]
- (if maxed-out?
- (wrap (exception.throw ..semaphore-is-maxed-out [(get@ #max-positions state)]))
+ (if maxed_out?
+ (wrap (exception.throw ..semaphore_is_maxed_out [(get@ #max_positions state)]))
(do !
- [#let [open-positions (get@ #open-positions state')]
- success? (atom.compare-and-swap state state' semaphore)]
+ [#let [open_positions (get@ #open_positions state')]
+ success? (atom.compare_and_swap state state' semaphore)]
(if success?
(do !
[_ (case ?sink
@@ -108,7 +108,7 @@
(#.Some sink)
(sink []))]
- (wrap (#try.Success open-positions)))
+ (wrap (#try.Success open_positions)))
(recur [])))))))))
)
@@ -144,8 +144,8 @@
(abstract: #export Barrier
{#limit Limit
#count (Atom Nat)
- #start-turnstile Semaphore
- #end-turnstile Semaphore}
+ #start_turnstile Semaphore
+ #end_turnstile Semaphore}
{#.doc "A barrier that blocks all processes from proceeding until a given number of processes are parked at the barrier."}
@@ -153,10 +153,10 @@
(-> Limit Barrier)
(:abstraction {#limit limit
#count (atom.atom 0)
- #start-turnstile (semaphore 0)
- #end-turnstile (semaphore 0)}))
+ #start_turnstile (semaphore 0)
+ #end_turnstile (semaphore 0)}))
- (def: (un-block times turnstile)
+ (def: (un_block times turnstile)
(-> Nat Semaphore (Promise Any))
(loop [step 0]
(if (n.< times step)
@@ -169,16 +169,16 @@
[(def: (<phase> (^:representation barrier))
(-> Barrier (Promise Any))
(do promise.monad
- [#let [limit (refinement.un-refine (get@ #limit barrier))
+ [#let [limit (refinement.un_refine (get@ #limit barrier))
goal <goal>
count (io.run (atom.update <update> (get@ #count barrier)))
reached? (n.= goal count)]]
(if reached?
- (un-block limit (get@ <turnstile> barrier))
+ (un_block limit (get@ <turnstile> barrier))
(wait (get@ <turnstile> barrier)))))]
- [start inc limit #start-turnstile]
- [end dec 0 #end-turnstile]
+ [start inc limit #start_turnstile]
+ [end dec 0 #end_turnstile]
)
(def: #export (block barrier)
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 523aa5567..7fd916fdb 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -36,7 +36,7 @@
(All [a] (-> (Var a) a))
(|>> :representation atom.read io.run product.left))
- (def: (un-follow sink var)
+ (def: (un_follow sink var)
(All [a] (-> (Sink a) (Var a) (IO Any)))
(do io.monad
[_ (atom.update (function (_ [value observers])
@@ -44,26 +44,26 @@
(:representation var))]
(wrap [])))
- (def: (write! new-value var)
+ (def: (write! new_value var)
(All [a] (-> a (Var a) (IO Any)))
(do {! io.monad}
[#let [var' (:representation var)]
- (^@ old [old-value observers]) (atom.read var')
- succeeded? (atom.compare-and-swap old [new-value observers] var')]
+ (^@ old [old_value observers]) (atom.read var')
+ succeeded? (atom.compare_and_swap old [new_value observers] var')]
(if succeeded?
(do !
[_ (monad.map ! (function (_ sink)
(do !
- [result (\ sink feed new-value)]
+ [result (\ sink feed new_value)]
(case result
(#try.Success _)
(wrap [])
(#try.Failure _)
- (un-follow sink var))))
+ (un_follow sink var))))
observers)]
(wrap []))
- (write! new-value var))))
+ (write! new_value var))))
(def: #export (follow target)
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
@@ -76,19 +76,19 @@
(wrap [channel sink])))
)
-(type: (Tx-Frame a)
+(type: (Tx_Frame a)
{#var (Var a)
#original a
#current a})
(type: Tx
- (List (Ex [a] (Tx-Frame a))))
+ (List (Ex [a] (Tx_Frame a))))
(type: #export (STM a)
{#.doc "A computation which updates a transaction and produces a value."}
(-> Tx [Tx a]))
-(def: (find-var-value var tx)
+(def: (find_var_value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
(|> tx
(list.find (function (_ [_var _original _current])
@@ -102,7 +102,7 @@
(def: #export (read var)
(All [a] (-> (Var a) (STM a)))
(function (_ tx)
- (case (find-var-value var tx)
+ (case (find_var_value var tx)
(#.Some value)
[tx value]
@@ -111,7 +111,7 @@
[(#.Cons [var value value] tx)
value]))))
-(def: (update-tx-value var value tx)
+(def: (update_tx_value var value tx)
(All [a] (-> (Var a) a Tx Tx))
(case tx
#.Nil
@@ -127,15 +127,15 @@
(#.Cons {#var _var
#original _original
#current _current}
- (update-tx-value var value tx')))))
+ (update_tx_value var value tx')))))
(def: #export (write value var)
{#.doc "Writes value to var."}
(All [a] (-> a (Var a) (STM Any)))
(function (_ tx)
- (case (find-var-value var tx)
+ (case (find_var_value var tx)
(#.Some _)
- [(update-tx-value var value tx)
+ [(update_tx_value var value tx)
[]]
#.None
@@ -184,40 +184,40 @@
_ (..write a' var)]
(wrap [a a'])))
-(def: (can-commit? tx)
+(def: (can_commit? tx)
(-> Tx Bit)
(list.every? (function (_ [_var _original _current])
(is? _original (..read! _var)))
tx))
-(def: (commit-var! [_var _original _current])
- (-> (Ex [a] (Tx-Frame a)) (IO Any))
+(def: (commit_var! [_var _original _current])
+ (-> (Ex [a] (Tx_Frame a)) (IO Any))
(if (is? _original _current)
(io [])
(..write! _current _var)))
-(def: fresh-tx Tx (list))
+(def: fresh_tx Tx (list))
(type: (Commit a)
[(STM a)
(Promise a)
(Resolver a)])
-(def: pending-commits
+(def: pending_commits
(Atom (Rec Commits
[(Promise [(Ex [a] (Commit a)) Commits])
(Resolver [(Ex [a] (Commit a)) Commits])]))
(atom (promise.promise [])))
-(def: commit-processor-flag
+(def: commit_processor_flag
(Atom Bit)
(atom #0))
-(def: (issue-commit commit)
+(def: (issue_commit commit)
(All [a] (-> (Commit a) (IO Any)))
(let [entry [commit (promise.promise [])]]
(do {! io.monad}
- [|commits|&resolve (atom.read pending-commits)]
+ [|commits|&resolve (atom.read pending_commits)]
(loop [[|commits| resolve] |commits|&resolve]
(do !
[|commits| (promise.poll |commits|)]
@@ -226,48 +226,48 @@
(do io.monad
[resolved? (resolve entry)]
(if resolved?
- (atom.write (product.right entry) pending-commits)
+ (atom.write (product.right entry) pending_commits)
(recur |commits|&resolve)))
(#.Some [head tail])
(recur tail)))))))
-(def: (process-commit commit)
+(def: (process_commit commit)
(All [a] (-> (Commit a) (IO Any)))
- (let [[stm-proc output resolve] commit
- [finished-tx value] (stm-proc fresh-tx)]
- (if (can-commit? finished-tx)
+ (let [[stm_proc output resolve] commit
+ [finished_tx value] (stm_proc fresh_tx)]
+ (if (can_commit? finished_tx)
(do {! io.monad}
- [_ (monad.map ! commit-var! finished-tx)]
+ [_ (monad.map ! commit_var! finished_tx)]
(resolve value))
- (issue-commit commit))))
+ (issue_commit commit))))
-(def: init-processor!
+(def: init_processor!
(IO Any)
(do {! io.monad}
- [flag (atom.read commit-processor-flag)]
+ [flag (atom.read commit_processor_flag)]
(if flag
(wrap [])
(do !
- [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
- (if was-first?
+ [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)]
+ (if was_first?
(do !
- [[promise resolve] (atom.read pending-commits)]
+ [[promise resolve] (atom.read pending_commits)]
(promise.await (function (recur [head [tail _resolve]])
(do !
- [_ (process-commit head)]
+ [_ (process_commit head)]
(promise.await recur tail)))
promise))
(wrap [])))
)))
-(def: #export (commit stm-proc)
+(def: #export (commit stm_proc)
{#.doc (doc "Commits a transaction and returns its result (asynchronously)."
"Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
"For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
(All [a] (-> (STM a) (Promise a)))
(let [[output resolver] (promise.promise [])]
(exec (io.run (do io.monad
- [_ init-processor!]
- (issue-commit [stm-proc output resolver])))
+ [_ init_processor!]
+ (issue_commit [stm_proc output resolver])))
output)))
diff --git a/stdlib/source/lux/control/concurrency/thread.lux b/stdlib/source/lux/control/concurrency/thread.lux
index 10ec17815..8bdd2b9c9 100644
--- a/stdlib/source/lux/control/concurrency/thread.lux
+++ b/stdlib/source/lux/control/concurrency/thread.lux
@@ -15,7 +15,7 @@
[//
["." atom (#+ Atom)]])
-(with-expansions [<jvm> (as-is (host.import: java/lang/Object)
+(with_expansions [<jvm> (as_is (host.import: java/lang/Object)
(host.import: java/lang/Runtime
["#::."
@@ -38,11 +38,11 @@
["#::."
(new [int])
(schedule [java/lang/Runnable long java/util/concurrent/TimeUnit] #io (java/util/concurrent/ScheduledFuture java/lang/Object))]))]
- (for {@.old (as-is <jvm>)
- @.jvm (as-is <jvm>)
+ (for {@.old (as_is <jvm>)
+ @.jvm (as_is <jvm>)
@.js
- (as-is (host.import: (setTimeout [host.Function host.Number] #io Any)))}
+ (as_is (host.import: (setTimeout [host.Function host.Number] #io Any)))}
## Default
(type: Thread
@@ -53,7 +53,7 @@
(def: #export parallelism
Nat
- (with-expansions [<jvm> (|> (java/lang/Runtime::getRuntime)
+ (with_expansions [<jvm> (|> (java/lang/Runtime::getRuntime)
(java/lang/Runtime::availableProcessors)
.nat)]
(for {@.old <jvm>
@@ -62,30 +62,30 @@
## Default
1)))
-(with-expansions [<jvm> (as-is (def: runner
+(with_expansions [<jvm> (as_is (def: runner
java/util/concurrent/ScheduledThreadPoolExecutor
(java/util/concurrent/ScheduledThreadPoolExecutor::new (.int ..parallelism))))]
(for {@.old <jvm>
@.jvm <jvm>
@.js
- (as-is)}
+ (as_is)}
## Default
(def: runner
(Atom (List Thread))
(atom.atom (list)))))
-(def: #export (schedule milli-seconds action)
+(def: #export (schedule milli_seconds action)
(-> Nat (IO Any) (IO Any))
(for {@.old
(let [runnable (host.object [] [java/lang/Runnable]
[]
(java/lang/Runnable [] (run self) void
(io.run action)))]
- (case milli-seconds
+ (case milli_seconds
0 (java/util/concurrent/Executor::execute runnable runner)
- _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) java/util/concurrent/TimeUnit::MILLISECONDS
+ _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS
runner)))
@.jvm
@@ -93,34 +93,34 @@
[]
(java/lang/Runnable [] (run self) void
(io.run action)))]
- (case milli-seconds
+ (case milli_seconds
0 (java/util/concurrent/Executor::execute runnable runner)
- _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli-seconds) java/util/concurrent/TimeUnit::MILLISECONDS
+ _ (java/util/concurrent/ScheduledThreadPoolExecutor::schedule runnable (.int milli_seconds) java/util/concurrent/TimeUnit::MILLISECONDS
runner)))
@.js
(..setTimeout [(host.closure [] (io.run action))
- (n.frac milli-seconds)])}
+ (n.frac milli_seconds)])}
## Default
(do io.monad
[_ (atom.update (|>> (#.Cons {#creation (.nat ("lux io current-time"))
- #delay milli-seconds
+ #delay milli_seconds
#action action}))
..runner)]
(wrap []))))
(for {@.old
- (as-is)
+ (as_is)
@.jvm
- (as-is)
+ (as_is)
@.js
- (as-is)}
+ (as_is)}
## Default
- (as-is (exception: #export cannot-continue-running-threads)
+ (as_is (exception: #export cannot_continue_running_threads)
(def: #export (run! _)
(-> Any (IO Any))
@@ -139,11 +139,11 @@
(n.+ (get@ #delay thread))
(n.<= now)))
threads)]
- swapped? (atom.compare-and-swap threads pending ..runner)]
+ swapped? (atom.compare_and_swap threads pending ..runner)]
(if swapped?
(do !
[_ (monad.map ! (get@ #action) ready)]
(run! []))
- (error! (ex.construct ..cannot-continue-running-threads []))))
+ (error! (ex.construct ..cannot_continue_running_threads []))))
)))
))