aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux78
1 files changed, 39 insertions, 39 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 523aa5567..7fd916fdb 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -36,7 +36,7 @@
(All [a] (-> (Var a) a))
(|>> :representation atom.read io.run product.left))
- (def: (un-follow sink var)
+ (def: (un_follow sink var)
(All [a] (-> (Sink a) (Var a) (IO Any)))
(do io.monad
[_ (atom.update (function (_ [value observers])
@@ -44,26 +44,26 @@
(:representation var))]
(wrap [])))
- (def: (write! new-value var)
+ (def: (write! new_value var)
(All [a] (-> a (Var a) (IO Any)))
(do {! io.monad}
[#let [var' (:representation var)]
- (^@ old [old-value observers]) (atom.read var')
- succeeded? (atom.compare-and-swap old [new-value observers] var')]
+ (^@ old [old_value observers]) (atom.read var')
+ succeeded? (atom.compare_and_swap old [new_value observers] var')]
(if succeeded?
(do !
[_ (monad.map ! (function (_ sink)
(do !
- [result (\ sink feed new-value)]
+ [result (\ sink feed new_value)]
(case result
(#try.Success _)
(wrap [])
(#try.Failure _)
- (un-follow sink var))))
+ (un_follow sink var))))
observers)]
(wrap []))
- (write! new-value var))))
+ (write! new_value var))))
(def: #export (follow target)
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
@@ -76,19 +76,19 @@
(wrap [channel sink])))
)
-(type: (Tx-Frame a)
+(type: (Tx_Frame a)
{#var (Var a)
#original a
#current a})
(type: Tx
- (List (Ex [a] (Tx-Frame a))))
+ (List (Ex [a] (Tx_Frame a))))
(type: #export (STM a)
{#.doc "A computation which updates a transaction and produces a value."}
(-> Tx [Tx a]))
-(def: (find-var-value var tx)
+(def: (find_var_value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
(|> tx
(list.find (function (_ [_var _original _current])
@@ -102,7 +102,7 @@
(def: #export (read var)
(All [a] (-> (Var a) (STM a)))
(function (_ tx)
- (case (find-var-value var tx)
+ (case (find_var_value var tx)
(#.Some value)
[tx value]
@@ -111,7 +111,7 @@
[(#.Cons [var value value] tx)
value]))))
-(def: (update-tx-value var value tx)
+(def: (update_tx_value var value tx)
(All [a] (-> (Var a) a Tx Tx))
(case tx
#.Nil
@@ -127,15 +127,15 @@
(#.Cons {#var _var
#original _original
#current _current}
- (update-tx-value var value tx')))))
+ (update_tx_value var value tx')))))
(def: #export (write value var)
{#.doc "Writes value to var."}
(All [a] (-> a (Var a) (STM Any)))
(function (_ tx)
- (case (find-var-value var tx)
+ (case (find_var_value var tx)
(#.Some _)
- [(update-tx-value var value tx)
+ [(update_tx_value var value tx)
[]]
#.None
@@ -184,40 +184,40 @@
_ (..write a' var)]
(wrap [a a'])))
-(def: (can-commit? tx)
+(def: (can_commit? tx)
(-> Tx Bit)
(list.every? (function (_ [_var _original _current])
(is? _original (..read! _var)))
tx))
-(def: (commit-var! [_var _original _current])
- (-> (Ex [a] (Tx-Frame a)) (IO Any))
+(def: (commit_var! [_var _original _current])
+ (-> (Ex [a] (Tx_Frame a)) (IO Any))
(if (is? _original _current)
(io [])
(..write! _current _var)))
-(def: fresh-tx Tx (list))
+(def: fresh_tx Tx (list))
(type: (Commit a)
[(STM a)
(Promise a)
(Resolver a)])
-(def: pending-commits
+(def: pending_commits
(Atom (Rec Commits
[(Promise [(Ex [a] (Commit a)) Commits])
(Resolver [(Ex [a] (Commit a)) Commits])]))
(atom (promise.promise [])))
-(def: commit-processor-flag
+(def: commit_processor_flag
(Atom Bit)
(atom #0))
-(def: (issue-commit commit)
+(def: (issue_commit commit)
(All [a] (-> (Commit a) (IO Any)))
(let [entry [commit (promise.promise [])]]
(do {! io.monad}
- [|commits|&resolve (atom.read pending-commits)]
+ [|commits|&resolve (atom.read pending_commits)]
(loop [[|commits| resolve] |commits|&resolve]
(do !
[|commits| (promise.poll |commits|)]
@@ -226,48 +226,48 @@
(do io.monad
[resolved? (resolve entry)]
(if resolved?
- (atom.write (product.right entry) pending-commits)
+ (atom.write (product.right entry) pending_commits)
(recur |commits|&resolve)))
(#.Some [head tail])
(recur tail)))))))
-(def: (process-commit commit)
+(def: (process_commit commit)
(All [a] (-> (Commit a) (IO Any)))
- (let [[stm-proc output resolve] commit
- [finished-tx value] (stm-proc fresh-tx)]
- (if (can-commit? finished-tx)
+ (let [[stm_proc output resolve] commit
+ [finished_tx value] (stm_proc fresh_tx)]
+ (if (can_commit? finished_tx)
(do {! io.monad}
- [_ (monad.map ! commit-var! finished-tx)]
+ [_ (monad.map ! commit_var! finished_tx)]
(resolve value))
- (issue-commit commit))))
+ (issue_commit commit))))
-(def: init-processor!
+(def: init_processor!
(IO Any)
(do {! io.monad}
- [flag (atom.read commit-processor-flag)]
+ [flag (atom.read commit_processor_flag)]
(if flag
(wrap [])
(do !
- [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
- (if was-first?
+ [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)]
+ (if was_first?
(do !
- [[promise resolve] (atom.read pending-commits)]
+ [[promise resolve] (atom.read pending_commits)]
(promise.await (function (recur [head [tail _resolve]])
(do !
- [_ (process-commit head)]
+ [_ (process_commit head)]
(promise.await recur tail)))
promise))
(wrap [])))
)))
-(def: #export (commit stm-proc)
+(def: #export (commit stm_proc)
{#.doc (doc "Commits a transaction and returns its result (asynchronously)."
"Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
"For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
(All [a] (-> (STM a) (Promise a)))
(let [[output resolver] (promise.promise [])]
(exec (io.run (do io.monad
- [_ init-processor!]
- (issue-commit [stm-proc output resolver])))
+ [_ init_processor!]
+ (issue_commit [stm_proc output resolver])))
output)))