aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/concurrency/stm.lux66
1 files changed, 21 insertions, 45 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 0fe9ee9df..09daa5e2d 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -4,28 +4,20 @@
[applicative #+ Applicative]
[monad #+ do Monad])
[io #- run]
- (data (coll [list "list/" Functor<List> Fold<List>]
- [dict #+ Dict])
- [product]
- [text]
- maybe
+ (data [product]
+ [maybe]
[number "nat/" Codec<Text,Nat>]
- text/format)
- [macro]
- (macro [code]
- ["s" syntax #+ syntax: Syntax])
+ [text]
+ (coll [list "list/" Functor<List> Fold<List>]
+ [dict #+ Dict]))
(concurrency [atom #+ Atom atom]
["P" promise]
[frp "frp/" Functor<Channel>])
))
-(type: (Var-State a)
- {#value a
- #observers (Dict Text (-> a (IO Unit)))})
-
(type: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom (Var-State a)))
+ (Atom [a (Dict Text (-> a (IO Unit)))]))
(type: (Tx-Frame a)
{#var (Var a)
@@ -42,12 +34,11 @@
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
- (atom.atom {#value value
- #observers (dict.new text.Hash<Text>)}))
+ (atom.atom [value (dict.new text.Hash<Text>)]))
(def: raw-read
(All [a] (-> (Var a) a))
- (|>> atom.read io.run (get@ #value)))
+ (|>> atom.read io.run product.left))
(def: (find-var-value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
@@ -55,8 +46,8 @@
(list.find (function [[_var _original _current]]
(is (:! (Var Unit) var)
(:! (Var Unit) _var))))
- (:: Monad<Maybe> map (function [[_var _original _current]]
- _current))
+ (:: maybe.Monad<Maybe> map (function [[_var _original _current]]
+ _current))
(:!!)
))
@@ -77,7 +68,7 @@
(All [a] (-> (Var a) (IO a)))
(|> var
atom.read
- (:: Functor<IO> map (get@ #value))))
+ (:: Functor<IO> map product.left)))
(def: (update-tx-value var value tx)
(All [a] (-> (Var a) a Tx Tx))
@@ -99,6 +90,7 @@
))
(def: #export (write value var)
+ {#.doc "Writes value to var."}
(All [a] (-> a (Var a) (STM Unit)))
(function [tx]
(case (find-var-value var tx)
@@ -110,18 +102,14 @@
[(#.Cons [var (raw-read var) value] tx)
[]])))
-(def: #export (write! new-value var)
- {#.doc "Writes value to var immediately, without going through a transaction."}
+(def: (write! new-value var)
(All [a] (-> a (Var a) (IO Unit)))
(do Monad<IO>
- [old (atom.read var)
- #let [old-value (get@ #value old)
- new (set@ #value new-value old)]
- succeeded? (atom.compare-and-swap old new var)]
+ [(^@ old [_value _observers]) (atom.read var)
+ succeeded? (atom.compare-and-swap old [new-value _observers] var)]
(if succeeded?
(do @
- [_ (|> old
- (get@ #observers)
+ [_ (|> _observers
dict.values
(monad.map @ (function [f] (f new-value))))]
(wrap []))
@@ -185,18 +173,6 @@
(let [[tx' ma] (mma tx)]
(ma tx')))))
-(def: #export (update! f var)
- {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
- (All [a] (-> (-> a a) (Var a) (IO [a a])))
- (io (loop [_ []]
- (let [(^@ state [value observers]) (io.run (atom.read var))
- value' (f value)]
- (if (io.run (atom.compare-and-swap state
- [value' observers]
- var))
- [value value']
- (recur []))))))
-
(def: #export (update f var)
{#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (STM [a a])))
@@ -212,7 +188,7 @@
(is _original (raw-read _var)))
tx))
-(def: (commit-var [_var _original _current])
+(def: (commit-var! [_var _original _current])
(-> (Ex [a] (Tx-Frame a)) Unit)
(if (is _original _current)
[]
@@ -231,10 +207,10 @@
(def: (process-commit [stm-proc output])
(-> [(STM Unit) (P.Promise Unit)] Top)
(let [[finished-tx value] (stm-proc fresh-tx)]
- (if (can-commit? finished-tx)
- (exec (list/map commit-var finished-tx)
- (io.run (P.resolve value output)))
- (io.run (write! [stm-proc output] pending-commits)))))
+ (io.run (if (can-commit? finished-tx)
+ (exec (list/map commit-var! finished-tx)
+ (P.resolve value output))
+ (write! [stm-proc output] pending-commits)))))
(def: init-processor!
(IO Unit)