diff options
Diffstat (limited to 'stdlib')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 81 |
1 files changed, 59 insertions, 22 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 914920773..b9f337024 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -10,7 +10,8 @@ monad) (codata [io #- run]) (data (struct [list #* "List/" Functor<List>] - [dict #+ Dict]) + [dict #+ Dict] + ["Q" queue]) [product] [text] maybe @@ -20,7 +21,7 @@ (macro [ast] ["s" syntax #+ syntax: Syntax]) (concurrency [atom #+ Atom atom] - [promise #+ Promise "Promise/" Monad<Promise>] + ["P" promise #+ Promise "Promise/" Monad<Promise>] [frp]) )) @@ -181,14 +182,14 @@ (def: #export (update! f var) (All [a] (-> (-> a a) (Var a) (Promise [a a]))) - (promise;future (io (loop [_ []] - (let [(^@ state [value observers]) (io;run (atom;get var)) - value' (f value)] - (if (io;run (atom;compare-and-swap state - [value' observers] - var)) - [value value'] - (recur []))))))) + (P;future (io (loop [_ []] + (let [(^@ state [value observers]) (io;run (atom;get var)) + value' (f value)] + (if (io;run (atom;compare-and-swap state + [value' observers] + var)) + [value value'] + (recur []))))))) (def: #export (update f var) (All [a] (-> (-> a a) (Var a) (STM [a a]))) @@ -212,16 +213,51 @@ (def: fresh-tx Tx (list)) -(def: (commit' output stm-proc) - (All [a] (-> (Promise a) (STM a) (Promise Unit))) - (promise;future (io (let [[finished-tx value] (stm-proc fresh-tx)] - (if (can-commit? finished-tx) - (exec (List/map commit-var finished-tx) - (io;run (promise;resolve value output)) - []) - (exec (commit' output stm-proc) - [])) - )))) +(def: pending-commits + (Var (Ex [a] [(STM a) (Promise a)])) + (var (:!! []))) + +(def: commit-processor-flag + (Atom Bool) + (atom false)) + +(def: (process-commit commits) + (-> (frp;Chan [(STM Unit) (Promise Unit)]) + (Promise Unit)) + (do P;Monad<Promise> + [?head+tail commits] + (case ?head+tail + (#;Cons [stm-proc output] tail) + (do @ + [#let [[finished-tx value] (stm-proc fresh-tx)]] + (exec (if (can-commit? finished-tx) + (exec (List/map commit-var finished-tx) + (io;run (P;resolve value output)) + []) + (exec (io;run (write! [stm-proc output] pending-commits)) + [])) + (process-commit tail))) + + #;Nil + (undefined) + ))) + +(def: init-processor! + (IO Unit) + (do Monad<IO> + [flag (atom;get commit-processor-flag)] + (if flag + (wrap []) + (do @ + [was-first? (atom;compare-and-swap flag true commit-processor-flag)] + (if was-first? + (do Monad<IO> + [inputs (follow "commit-processor" pending-commits)] + (exec (process-commit (:! (frp;Chan [(STM Unit) (Promise Unit)]) + inputs)) + (wrap []))) + (wrap []))) + ))) (def: #export (commit stm-proc) {#;doc "Commits a transaction and returns its result (asynchronously). @@ -230,6 +266,7 @@ For this reason, it's important to note that transactions must be free from side-effects, such as I/O."} (All [a] (-> (STM a) (Promise a))) - (let [output (promise;promise)] - (exec (commit' output stm-proc) + (let [output (P;promise ($ +0))] + (exec (io;run init-processor!) + (io;run (write! [stm-proc output] pending-commits)) output))) |