aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux86
1 files changed, 49 insertions, 37 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 3203b2d52..f54e16baf 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -9,12 +9,12 @@
["." product]
["." maybe]
[collection
- ["." list ("list/." Functor<List> Fold<List>)]]]
+ ["." list]]]
[type
abstract]]
[//
["." atom (#+ Atom atom)]
- ["." promise (#+ Promise promise)]
+ ["." promise (#+ Promise Resolver)]
["." frp ("frp/." Functor<Channel>)]])
(type: #export (Observer a)
@@ -177,45 +177,54 @@
tx))
(def: (commit-var! [_var _original _current])
- (-> (Ex [a] (Tx-Frame a)) Any)
+ (-> (Ex [a] (Tx-Frame a)) (IO Any))
(if (is? _original _current)
- []
- (io.run (write! _current _var))))
+ (io [])
+ (write! _current _var)))
(def: fresh-tx Tx (list))
-(type: Commit (Ex [a] [(STM a) (Promise a)]))
+(type: (Commit a)
+ [(STM a)
+ (Promise a)
+ (Resolver a)])
(def: pending-commits
- (Atom (Rec Commits (Promise [Commit Commits])))
- (atom (promise #.None)))
+ (Atom (Rec Commits
+ [(Promise [(Ex [a] (Commit a)) Commits])
+ (Resolver [(Ex [a] (Commit a)) Commits])]))
+ (atom (promise.promise [])))
(def: commit-processor-flag
(Atom Bit)
(atom #0))
(def: (issue-commit commit)
- (-> Commit (IO Any))
- (let [entry [commit (promise #.None)]]
- (loop [|commits| (io.run (atom.read pending-commits))]
- (case (promise.poll |commits|)
- #.None
- (do io.Monad<IO>
- [resolved? (promise.resolve entry |commits|)]
- (if resolved?
- (atom.write (product.right entry) pending-commits)
- (recur |commits|)))
-
- (#.Some [head tail])
- (recur tail)))))
-
-(def: (process-commit [stm-proc output])
- (-> [(STM Any) (Promise Any)] Any)
- (let [[finished-tx value] (stm-proc fresh-tx)]
- (io.run (if (can-commit? finished-tx)
- (exec (list/map commit-var! finished-tx)
- (promise.resolve value output))
- (issue-commit [stm-proc output])))))
+ (All [a] (-> (Commit a) (IO Any)))
+ (let [entry [commit (promise.promise [])]]
+ (do io.Monad<IO>
+ [|commits|&resolve (atom.read pending-commits)]
+ (loop [[|commits| resolve] |commits|&resolve]
+ (case (promise.poll |commits|)
+ #.None
+ (do io.Monad<IO>
+ [resolved? (resolve entry)]
+ (if resolved?
+ (atom.write (product.right entry) pending-commits)
+ (recur |commits|&resolve)))
+
+ (#.Some [head tail])
+ (recur tail))))))
+
+(def: (process-commit commit)
+ (All [a] (-> (Commit a) (IO Any)))
+ (let [[stm-proc output resolve] commit
+ [finished-tx value] (stm-proc fresh-tx)]
+ (if (can-commit? finished-tx)
+ (do io.Monad<IO>
+ [_ (monad.map @ commit-var! finished-tx)]
+ (resolve value))
+ (issue-commit commit))))
(def: init-processor!
(IO Any)
@@ -226,11 +235,13 @@
(do @
[was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
(if was-first?
- (exec (|> (io.run (atom.read pending-commits))
- (promise.await (function (recur [head tail])
- (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head))
- (promise.await recur tail))))))
- (wrap []))
+ (do @
+ [[promise resolve] (atom.read pending-commits)]
+ (promise.await (function (recur [head [tail _resolve]])
+ (do @
+ [_ (process-commit head)]
+ (promise.await recur tail)))
+ promise))
(wrap [])))
)))
@@ -239,7 +250,8 @@
"Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
"For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
(All [a] (-> (STM a) (Promise a)))
- (let [output (promise #.None)]
- (exec (io.run init-processor!)
- (io.run (issue-commit [stm-proc output]))
+ (let [[output resolver] (promise.promise [])]
+ (exec (io.run (do io.Monad<IO>
+ [_ init-processor!]
+ (issue-commit [stm-proc output resolver])))
output)))