aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/concurrency/stm.lux192
1 files changed, 101 insertions, 91 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 09daa5e2d..9b1cabe01 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -3,7 +3,7 @@
(lux (control [functor #+ Functor]
[applicative #+ Applicative]
[monad #+ do Monad])
- [io #- run]
+ [io #+ IO io]
(data [product]
[maybe]
[number "nat/" Codec<Text,Nat>]
@@ -11,13 +11,75 @@
(coll [list "list/" Functor<List> Fold<List>]
[dict #+ Dict]))
(concurrency [atom #+ Atom atom]
- ["P" promise]
+ [promise #+ Promise promise]
[frp "frp/" Functor<Channel>])
- ))
+ (type abstract)))
-(type: #export (Var a)
+(abstract: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom [a (Dict Text (-> a (IO Unit)))]))
+ (Atom [a (Dict Text (-> a (IO Unit)))])
+
+ (def: #export (var value)
+ {#.doc "Creates a new STM var, with a default value."}
+ (All [a] (-> a (Var a)))
+ (@abstraction (atom.atom [value (dict.new text.Hash<Text>)])))
+
+ (def: read!!
+ (All [a] (-> (Var a) a))
+ (|>> @representation atom.read io.run product.left))
+
+ (def: #export (read! (^@representation var))
+ {#.doc "Reads var immediately, without going through a transaction."}
+ (All [a] (-> (Var a) (IO a)))
+ (|> var
+ atom.read
+ (:: io.Functor<IO> map product.left)))
+
+ (def: (write! new-value (^@representation var))
+ (All [a] (-> a (Var a) (IO Unit)))
+ (do io.Monad<IO>
+ [(^@ old [_value _observers]) (atom.read var)
+ succeeded? (atom.compare-and-swap old [new-value _observers] var)]
+ (if succeeded?
+ (do @
+ [_ (|> _observers
+ dict.values
+ (monad.map @ (function [f] (f new-value))))]
+ (wrap []))
+ (write! new-value (@abstraction var)))))
+
+ (def: #export (follow (^@representation target))
+ {#.doc "Creates a channel that will receive all changes to the value of the given var."}
+ (All [a] (-> (Var a) (IO (frp.Channel a))))
+ (let [head (: (frp.Channel ($ +0)) (frp.channel))
+ ## head (frp.channel)
+ channel-var (var head)
+ observer (function [label value]
+ (case (io.run (|> channel-var read!! (frp.write value)))
+ #.None
+ ## By closing the output Channel, the
+ ## observer becomes obsolete.
+ (atom.update (function [[value observers]]
+ [value (dict.remove label observers)])
+ target)
+
+ (#.Some tail')
+ (write! tail' channel-var)))]
+ (do io.Monad<IO>
+ [_ (atom.update (function [[value observers]]
+ (let [label (nat/encode (list/fold (function [key base]
+ (case (nat/decode key)
+ (#.Left _)
+ base
+
+ (#.Right key-num)
+ (n/max key-num base)))
+ +0
+ (dict.keys observers)))]
+ [value (dict.put label (observer label) observers)]))
+ target)]
+ (wrap head))))
+ )
(type: (Tx-Frame a)
{#var (Var a)
@@ -31,15 +93,6 @@
{#.doc "A computation which updates a transaction and produces a value."}
(-> Tx [Tx a]))
-(def: #export (var value)
- {#.doc "Creates a new STM var, with a default value."}
- (All [a] (-> a (Var a)))
- (atom.atom [value (dict.new text.Hash<Text>)]))
-
-(def: raw-read
- (All [a] (-> (Var a) a))
- (|>> atom.read io.run product.left))
-
(def: (find-var-value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
(|> tx
@@ -59,17 +112,10 @@
[tx value]
#.None
- (let [value (raw-read var)]
+ (let [value (read!! var)]
[(#.Cons [var value value] tx)
value]))))
-(def: #export (read! var)
- {#.doc "Reads var immediately, without going through a transaction."}
- (All [a] (-> (Var a) (IO a)))
- (|> var
- atom.read
- (:: Functor<IO> map product.left)))
-
(def: (update-tx-value var value tx)
(All [a] (-> (Var a) a Tx Tx))
(case tx
@@ -99,54 +145,9 @@
[]]
#.None
- [(#.Cons [var (raw-read var) value] tx)
+ [(#.Cons [var (read!! var) value] tx)
[]])))
-(def: (write! new-value var)
- (All [a] (-> a (Var a) (IO Unit)))
- (do Monad<IO>
- [(^@ old [_value _observers]) (atom.read var)
- succeeded? (atom.compare-and-swap old [new-value _observers] var)]
- (if succeeded?
- (do @
- [_ (|> _observers
- dict.values
- (monad.map @ (function [f] (f new-value))))]
- (wrap []))
- (write! new-value var))))
-
-(def: #export (follow target)
- {#.doc "Creates a channel that will receive all changes to the value of the given var."}
- (All [a] (-> (Var a) (IO (frp.Channel a))))
- (let [head (: (frp.Channel ($ +0)) (frp.channel))
- ## head (frp.channel)
- channel-var (var head)
- observer (function [label value]
- (case (io.run (|> channel-var raw-read (frp.write value)))
- #.None
- ## By closing the output Channel, the
- ## observer becomes obsolete.
- (atom.update (function [[value observers]]
- [value (dict.remove label observers)])
- target)
-
- (#.Some tail')
- (write! tail' channel-var)))]
- (do Monad<IO>
- [_ (atom.update (function [[value observers]]
- (let [label (nat/encode (list/fold (function [key base]
- (case (nat/decode key)
- (#.Left _)
- base
-
- (#.Right key-num)
- (n/max key-num base)))
- +0
- (dict.keys observers)))]
- [value (dict.put label (observer label) observers)]))
- target)]
- (wrap head))))
-
(struct: #export _ (Functor STM)
(def: (map f fa)
(function [tx]
@@ -185,7 +186,7 @@
(def: (can-commit? tx)
(-> Tx Bool)
(list.every? (function [[_var _original _current]]
- (is _original (raw-read _var)))
+ (is _original (read!! _var)))
tx))
(def: (commit-var! [_var _original _current])
@@ -196,44 +197,53 @@
(def: fresh-tx Tx (list))
+(type: Commit (Ex [a] [(STM a) (Promise a)]))
+
(def: pending-commits
- (Var (Ex [a] [(STM a) (P.Promise a)]))
- (var (:!! [])))
+ (Atom (Rec Commits (Promise [Commit Commits])))
+ (atom (promise #.None)))
(def: commit-processor-flag
(Atom Bool)
(atom false))
+(def: (issue-commit commit)
+ (-> Commit (IO Unit))
+ (let [entry [commit (promise #.None)]]
+ (loop [|commits| (io.run (atom.read pending-commits))]
+ (case (promise.poll |commits|)
+ #.None
+ (do io.Monad<IO>
+ [resolved? (promise.resolve entry |commits|)]
+ (if resolved?
+ (atom.write (product.right entry) pending-commits)
+ (recur |commits|)))
+
+ (#.Some [head tail])
+ (recur tail)))))
+
(def: (process-commit [stm-proc output])
- (-> [(STM Unit) (P.Promise Unit)] Top)
+ (-> [(STM Unit) (Promise Unit)] Top)
(let [[finished-tx value] (stm-proc fresh-tx)]
(io.run (if (can-commit? finished-tx)
(exec (list/map commit-var! finished-tx)
- (P.resolve value output))
- (write! [stm-proc output] pending-commits)))))
+ (promise.resolve value output))
+ (issue-commit [stm-proc output])))))
(def: init-processor!
(IO Unit)
- (do Monad<IO>
+ (do io.Monad<IO>
[flag (atom.read commit-processor-flag)]
(if flag
(wrap [])
(do @
[was-first? (atom.compare-and-swap flag true commit-processor-flag)]
(if was-first?
- (do Monad<IO>
- [inputs (follow pending-commits)]
- (exec (|> inputs
- (:! (frp.Channel [(STM Unit) (P.Promise Unit)]))
- (P.await (function recur [?inputs]
- (io (case ?inputs
- #.None
- []
-
- (#.Some [head tail])
- (exec (process-commit head)
- (P.await recur tail)))))))
- (wrap [])))
+ (exec (|> (io.run (atom.read pending-commits))
+ (promise.await (function recur [[head tail]]
+ (io (exec (process-commit (:! [(STM Unit) (Promise Unit)] head))
+ (promise.await recur tail))))))
+ (wrap []))
(wrap [])))
)))
@@ -243,8 +253,8 @@
Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first.
For this reason, it's important to note that transactions must be free from side-effects, such as I/O."}
- (All [a] (-> (STM a) (P.Promise a)))
- (let [output (P.promise #.None)]
+ (All [a] (-> (STM a) (Promise a)))
+ (let [output (promise #.None)]
(exec (io.run init-processor!)
- (io.run (write! [stm-proc output] pending-commits))
+ (io.run (issue-commit [stm-proc output]))
output)))