diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 78 |
1 files changed, 39 insertions, 39 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 523aa5567..7fd916fdb 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -36,7 +36,7 @@ (All [a] (-> (Var a) a)) (|>> :representation atom.read io.run product.left)) - (def: (un-follow sink var) + (def: (un_follow sink var) (All [a] (-> (Sink a) (Var a) (IO Any))) (do io.monad [_ (atom.update (function (_ [value observers]) @@ -44,26 +44,26 @@ (:representation var))] (wrap []))) - (def: (write! new-value var) + (def: (write! new_value var) (All [a] (-> a (Var a) (IO Any))) (do {! io.monad} [#let [var' (:representation var)] - (^@ old [old-value observers]) (atom.read var') - succeeded? (atom.compare-and-swap old [new-value observers] var')] + (^@ old [old_value observers]) (atom.read var') + succeeded? (atom.compare_and_swap old [new_value observers] var')] (if succeeded? (do ! [_ (monad.map ! (function (_ sink) (do ! - [result (\ sink feed new-value)] + [result (\ sink feed new_value)] (case result (#try.Success _) (wrap []) (#try.Failure _) - (un-follow sink var)))) + (un_follow sink var)))) observers)] (wrap [])) - (write! new-value var)))) + (write! new_value var)))) (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} @@ -76,19 +76,19 @@ (wrap [channel sink]))) ) -(type: (Tx-Frame a) +(type: (Tx_Frame a) {#var (Var a) #original a #current a}) (type: Tx - (List (Ex [a] (Tx-Frame a)))) + (List (Ex [a] (Tx_Frame a)))) (type: #export (STM a) {#.doc "A computation which updates a transaction and produces a value."} (-> Tx [Tx a])) -(def: (find-var-value var tx) +(def: (find_var_value var tx) (All [a] (-> (Var a) Tx (Maybe a))) (|> tx (list.find (function (_ [_var _original _current]) @@ -102,7 +102,7 @@ (def: #export (read var) (All [a] (-> (Var a) (STM a))) (function (_ tx) - (case (find-var-value var tx) + (case (find_var_value var tx) (#.Some value) [tx value] @@ -111,7 +111,7 @@ [(#.Cons [var value value] tx) value])))) -(def: (update-tx-value var value tx) +(def: (update_tx_value var value tx) (All [a] (-> (Var a) a Tx Tx)) (case tx #.Nil @@ -127,15 +127,15 @@ (#.Cons {#var _var #original _original #current _current} - (update-tx-value var value tx'))))) + (update_tx_value var value tx'))))) (def: #export (write value var) {#.doc "Writes value to var."} (All [a] (-> a (Var a) (STM Any))) (function (_ tx) - (case (find-var-value var tx) + (case (find_var_value var tx) (#.Some _) - [(update-tx-value var value tx) + [(update_tx_value var value tx) []] #.None @@ -184,40 +184,40 @@ _ (..write a' var)] (wrap [a a']))) -(def: (can-commit? tx) +(def: (can_commit? tx) (-> Tx Bit) (list.every? (function (_ [_var _original _current]) (is? _original (..read! _var))) tx)) -(def: (commit-var! [_var _original _current]) - (-> (Ex [a] (Tx-Frame a)) (IO Any)) +(def: (commit_var! [_var _original _current]) + (-> (Ex [a] (Tx_Frame a)) (IO Any)) (if (is? _original _current) (io []) (..write! _current _var))) -(def: fresh-tx Tx (list)) +(def: fresh_tx Tx (list)) (type: (Commit a) [(STM a) (Promise a) (Resolver a)]) -(def: pending-commits +(def: pending_commits (Atom (Rec Commits [(Promise [(Ex [a] (Commit a)) Commits]) (Resolver [(Ex [a] (Commit a)) Commits])])) (atom (promise.promise []))) -(def: commit-processor-flag +(def: commit_processor_flag (Atom Bit) (atom #0)) -(def: (issue-commit commit) +(def: (issue_commit commit) (All [a] (-> (Commit a) (IO Any))) (let [entry [commit (promise.promise [])]] (do {! io.monad} - [|commits|&resolve (atom.read pending-commits)] + [|commits|&resolve (atom.read pending_commits)] (loop [[|commits| resolve] |commits|&resolve] (do ! [|commits| (promise.poll |commits|)] @@ -226,48 +226,48 @@ (do io.monad [resolved? (resolve entry)] (if resolved? - (atom.write (product.right entry) pending-commits) + (atom.write (product.right entry) pending_commits) (recur |commits|&resolve))) (#.Some [head tail]) (recur tail))))))) -(def: (process-commit commit) +(def: (process_commit commit) (All [a] (-> (Commit a) (IO Any))) - (let [[stm-proc output resolve] commit - [finished-tx value] (stm-proc fresh-tx)] - (if (can-commit? finished-tx) + (let [[stm_proc output resolve] commit + [finished_tx value] (stm_proc fresh_tx)] + (if (can_commit? finished_tx) (do {! io.monad} - [_ (monad.map ! commit-var! finished-tx)] + [_ (monad.map ! commit_var! finished_tx)] (resolve value)) - (issue-commit commit)))) + (issue_commit commit)))) -(def: init-processor! +(def: init_processor! (IO Any) (do {! io.monad} - [flag (atom.read commit-processor-flag)] + [flag (atom.read commit_processor_flag)] (if flag (wrap []) (do ! - [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] - (if was-first? + [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)] + (if was_first? (do ! - [[promise resolve] (atom.read pending-commits)] + [[promise resolve] (atom.read pending_commits)] (promise.await (function (recur [head [tail _resolve]]) (do ! - [_ (process-commit head)] + [_ (process_commit head)] (promise.await recur tail))) promise)) (wrap []))) ))) -(def: #export (commit stm-proc) +(def: #export (commit stm_proc) {#.doc (doc "Commits a transaction and returns its result (asynchronously)." "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} (All [a] (-> (STM a) (Promise a))) (let [[output resolver] (promise.promise [])] (exec (io.run (do io.monad - [_ init-processor!] - (issue-commit [stm-proc output resolver]))) + [_ init_processor!] + (issue_commit [stm_proc output resolver]))) output))) |