diff options
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 66 |
1 files changed, 21 insertions, 45 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 0fe9ee9df..09daa5e2d 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -4,28 +4,20 @@ [applicative #+ Applicative] [monad #+ do Monad]) [io #- run] - (data (coll [list "list/" Functor<List> Fold<List>] - [dict #+ Dict]) - [product] - [text] - maybe + (data [product] + [maybe] [number "nat/" Codec<Text,Nat>] - text/format) - [macro] - (macro [code] - ["s" syntax #+ syntax: Syntax]) + [text] + (coll [list "list/" Functor<List> Fold<List>] + [dict #+ Dict])) (concurrency [atom #+ Atom atom] ["P" promise] [frp "frp/" Functor<Channel>]) )) -(type: (Var-State a) - {#value a - #observers (Dict Text (-> a (IO Unit)))}) - (type: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom (Var-State a))) + (Atom [a (Dict Text (-> a (IO Unit)))])) (type: (Tx-Frame a) {#var (Var a) @@ -42,12 +34,11 @@ (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) - (atom.atom {#value value - #observers (dict.new text.Hash<Text>)})) + (atom.atom [value (dict.new text.Hash<Text>)])) (def: raw-read (All [a] (-> (Var a) a)) - (|>> atom.read io.run (get@ #value))) + (|>> atom.read io.run product.left)) (def: (find-var-value var tx) (All [a] (-> (Var a) Tx (Maybe a))) @@ -55,8 +46,8 @@ (list.find (function [[_var _original _current]] (is (:! (Var Unit) var) (:! (Var Unit) _var)))) - (:: Monad<Maybe> map (function [[_var _original _current]] - _current)) + (:: maybe.Monad<Maybe> map (function [[_var _original _current]] + _current)) (:!!) )) @@ -77,7 +68,7 @@ (All [a] (-> (Var a) (IO a))) (|> var atom.read - (:: Functor<IO> map (get@ #value)))) + (:: Functor<IO> map product.left))) (def: (update-tx-value var value tx) (All [a] (-> (Var a) a Tx Tx)) @@ -99,6 +90,7 @@ )) (def: #export (write value var) + {#.doc "Writes value to var."} (All [a] (-> a (Var a) (STM Unit))) (function [tx] (case (find-var-value var tx) @@ -110,18 +102,14 @@ [(#.Cons [var (raw-read var) value] tx) []]))) -(def: #export (write! new-value var) - {#.doc "Writes value to var immediately, without going through a transaction."} +(def: (write! new-value var) (All [a] (-> a (Var a) (IO Unit))) (do Monad<IO> - [old (atom.read var) - #let [old-value (get@ #value old) - new (set@ #value new-value old)] - succeeded? (atom.compare-and-swap old new var)] + [(^@ old [_value _observers]) (atom.read var) + succeeded? (atom.compare-and-swap old [new-value _observers] var)] (if succeeded? (do @ - [_ (|> old - (get@ #observers) + [_ (|> _observers dict.values (monad.map @ (function [f] (f new-value))))] (wrap [])) @@ -185,18 +173,6 @@ (let [[tx' ma] (mma tx)] (ma tx'))))) -(def: #export (update! f var) - {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} - (All [a] (-> (-> a a) (Var a) (IO [a a]))) - (io (loop [_ []] - (let [(^@ state [value observers]) (io.run (atom.read var)) - value' (f value)] - (if (io.run (atom.compare-and-swap state - [value' observers] - var)) - [value value'] - (recur [])))))) - (def: #export (update f var) {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) @@ -212,7 +188,7 @@ (is _original (raw-read _var))) tx)) -(def: (commit-var [_var _original _current]) +(def: (commit-var! [_var _original _current]) (-> (Ex [a] (Tx-Frame a)) Unit) (if (is _original _current) [] @@ -231,10 +207,10 @@ (def: (process-commit [stm-proc output]) (-> [(STM Unit) (P.Promise Unit)] Top) (let [[finished-tx value] (stm-proc fresh-tx)] - (if (can-commit? finished-tx) - (exec (list/map commit-var finished-tx) - (io.run (P.resolve value output))) - (io.run (write! [stm-proc output] pending-commits))))) + (io.run (if (can-commit? finished-tx) + (exec (list/map commit-var! finished-tx) + (P.resolve value output)) + (write! [stm-proc output] pending-commits))))) (def: init-processor! (IO Unit) |