diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 86 |
1 files changed, 49 insertions, 37 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 3203b2d52..f54e16baf 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -9,12 +9,12 @@ ["." product] ["." maybe] [collection - ["." list ("list/." Functor<List> Fold<List>)]]] + ["." list]]] [type abstract]] [// ["." atom (#+ Atom atom)] - ["." promise (#+ Promise promise)] + ["." promise (#+ Promise Resolver)] ["." frp ("frp/." Functor<Channel>)]]) (type: #export (Observer a) @@ -177,45 +177,54 @@ tx)) (def: (commit-var! [_var _original _current]) - (-> (Ex [a] (Tx-Frame a)) Any) + (-> (Ex [a] (Tx-Frame a)) (IO Any)) (if (is? _original _current) - [] - (io.run (write! _current _var)))) + (io []) + (write! _current _var))) (def: fresh-tx Tx (list)) -(type: Commit (Ex [a] [(STM a) (Promise a)])) +(type: (Commit a) + [(STM a) + (Promise a) + (Resolver a)]) (def: pending-commits - (Atom (Rec Commits (Promise [Commit Commits]))) - (atom (promise #.None))) + (Atom (Rec Commits + [(Promise [(Ex [a] (Commit a)) Commits]) + (Resolver [(Ex [a] (Commit a)) Commits])])) + (atom (promise.promise []))) (def: commit-processor-flag (Atom Bit) (atom #0)) (def: (issue-commit commit) - (-> Commit (IO Any)) - (let [entry [commit (promise #.None)]] - (loop [|commits| (io.run (atom.read pending-commits))] - (case (promise.poll |commits|) - #.None - (do io.Monad<IO> - [resolved? (promise.resolve entry |commits|)] - (if resolved? - (atom.write (product.right entry) pending-commits) - (recur |commits|))) - - (#.Some [head tail]) - (recur tail))))) - -(def: (process-commit [stm-proc output]) - (-> [(STM Any) (Promise Any)] Any) - (let [[finished-tx value] (stm-proc fresh-tx)] - (io.run (if (can-commit? finished-tx) - (exec (list/map commit-var! finished-tx) - (promise.resolve value output)) - (issue-commit [stm-proc output]))))) + (All [a] (-> (Commit a) (IO Any))) + (let [entry [commit (promise.promise [])]] + (do io.Monad<IO> + [|commits|&resolve (atom.read pending-commits)] + (loop [[|commits| resolve] |commits|&resolve] + (case (promise.poll |commits|) + #.None + (do io.Monad<IO> + [resolved? (resolve entry)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|&resolve))) + + (#.Some [head tail]) + (recur tail)))))) + +(def: (process-commit commit) + (All [a] (-> (Commit a) (IO Any))) + (let [[stm-proc output resolve] commit + [finished-tx value] (stm-proc fresh-tx)] + (if (can-commit? finished-tx) + (do io.Monad<IO> + [_ (monad.map @ commit-var! finished-tx)] + (resolve value)) + (issue-commit commit)))) (def: init-processor! (IO Any) @@ -226,11 +235,13 @@ (do @ [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] (if was-first? - (exec (|> (io.run (atom.read pending-commits)) - (promise.await (function (recur [head tail]) - (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head)) - (promise.await recur tail)))))) - (wrap [])) + (do @ + [[promise resolve] (atom.read pending-commits)] + (promise.await (function (recur [head [tail _resolve]]) + (do @ + [_ (process-commit head)] + (promise.await recur tail))) + promise)) (wrap []))) ))) @@ -239,7 +250,8 @@ "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} (All [a] (-> (STM a) (Promise a))) - (let [output (promise #.None)] - (exec (io.run init-processor!) - (io.run (issue-commit [stm-proc output])) + (let [[output resolver] (promise.promise [])] + (exec (io.run (do io.Monad<IO> + [_ init-processor!] + (issue-commit [stm-proc output resolver]))) output))) |