aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEduardo Julian2016-12-28 19:21:51 -0400
committerEduardo Julian2016-12-28 19:21:51 -0400
commit4785f68c8b26775a07831ec8c92897c7235d654a (patch)
treef2343cacc6e1a56ff1923dad3f0665e00cb4de8b
parent9cdc1e8a9c2b0864bfe08f648ede75e81f419c90 (diff)
- Fixed the bug that caused some commits to be lost.
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/concurrency/stm.lux81
1 files changed, 59 insertions, 22 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 914920773..b9f337024 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -10,7 +10,8 @@
monad)
(codata [io #- run])
(data (struct [list #* "List/" Functor<List>]
- [dict #+ Dict])
+ [dict #+ Dict]
+ ["Q" queue])
[product]
[text]
maybe
@@ -20,7 +21,7 @@
(macro [ast]
["s" syntax #+ syntax: Syntax])
(concurrency [atom #+ Atom atom]
- [promise #+ Promise "Promise/" Monad<Promise>]
+ ["P" promise #+ Promise "Promise/" Monad<Promise>]
[frp])
))
@@ -181,14 +182,14 @@
(def: #export (update! f var)
(All [a] (-> (-> a a) (Var a) (Promise [a a])))
- (promise;future (io (loop [_ []]
- (let [(^@ state [value observers]) (io;run (atom;get var))
- value' (f value)]
- (if (io;run (atom;compare-and-swap state
- [value' observers]
- var))
- [value value']
- (recur [])))))))
+ (P;future (io (loop [_ []]
+ (let [(^@ state [value observers]) (io;run (atom;get var))
+ value' (f value)]
+ (if (io;run (atom;compare-and-swap state
+ [value' observers]
+ var))
+ [value value']
+ (recur [])))))))
(def: #export (update f var)
(All [a] (-> (-> a a) (Var a) (STM [a a])))
@@ -212,16 +213,51 @@
(def: fresh-tx Tx (list))
-(def: (commit' output stm-proc)
- (All [a] (-> (Promise a) (STM a) (Promise Unit)))
- (promise;future (io (let [[finished-tx value] (stm-proc fresh-tx)]
- (if (can-commit? finished-tx)
- (exec (List/map commit-var finished-tx)
- (io;run (promise;resolve value output))
- [])
- (exec (commit' output stm-proc)
- []))
- ))))
+(def: pending-commits
+ (Var (Ex [a] [(STM a) (Promise a)]))
+ (var (:!! [])))
+
+(def: commit-processor-flag
+ (Atom Bool)
+ (atom false))
+
+(def: (process-commit commits)
+ (-> (frp;Chan [(STM Unit) (Promise Unit)])
+ (Promise Unit))
+ (do P;Monad<Promise>
+ [?head+tail commits]
+ (case ?head+tail
+ (#;Cons [stm-proc output] tail)
+ (do @
+ [#let [[finished-tx value] (stm-proc fresh-tx)]]
+ (exec (if (can-commit? finished-tx)
+ (exec (List/map commit-var finished-tx)
+ (io;run (P;resolve value output))
+ [])
+ (exec (io;run (write! [stm-proc output] pending-commits))
+ []))
+ (process-commit tail)))
+
+ #;Nil
+ (undefined)
+ )))
+
+(def: init-processor!
+ (IO Unit)
+ (do Monad<IO>
+ [flag (atom;get commit-processor-flag)]
+ (if flag
+ (wrap [])
+ (do @
+ [was-first? (atom;compare-and-swap flag true commit-processor-flag)]
+ (if was-first?
+ (do Monad<IO>
+ [inputs (follow "commit-processor" pending-commits)]
+ (exec (process-commit (:! (frp;Chan [(STM Unit) (Promise Unit)])
+ inputs))
+ (wrap [])))
+ (wrap [])))
+ )))
(def: #export (commit stm-proc)
{#;doc "Commits a transaction and returns its result (asynchronously).
@@ -230,6 +266,7 @@
For this reason, it's important to note that transactions must be free from side-effects, such as I/O."}
(All [a] (-> (STM a) (Promise a)))
- (let [output (promise;promise)]
- (exec (commit' output stm-proc)
+ (let [output (P;promise ($ +0))]
+ (exec (io;run init-processor!)
+ (io;run (write! [stm-proc output] pending-commits))
output)))