diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 122 |
1 files changed, 61 insertions, 61 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 4aaee3580..f7c7664f1 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -1,4 +1,4 @@ -(;module: +(.module: lux (lux (control [functor #+ Functor] [applicative #+ Applicative] @@ -24,7 +24,7 @@ #observers (Dict Text (-> a (IO Unit)))}) (type: #export (Var a) - {#;doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} + {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} (Atom (Var-State a))) (type: (Tx-Frame a) @@ -36,23 +36,23 @@ (List (Ex [a] (Tx-Frame a)))) (type: #export (STM a) - {#;doc "A computation which updates a transaction and produces a value."} + {#.doc "A computation which updates a transaction and produces a value."} (-> Tx [Tx a])) (def: #export (var value) - {#;doc "Creates a new STM var, with a default value."} + {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) - (atom;atom {#value value - #observers (dict;new text;Hash<Text>)})) + (atom.atom {#value value + #observers (dict.new text.Hash<Text>)})) (def: raw-read (All [a] (-> (Var a) a)) - (|>> atom;read io;run (get@ #value))) + (|>> atom.read io.run (get@ #value))) (def: (find-var-value var tx) (All [a] (-> (Var a) Tx (Maybe a))) (|> tx - (list;find (function [[_var _original _current]] + (list.find (function [[_var _original _current]] (is (:! (Var Unit) var) (:! (Var Unit) _var)))) (:: Monad<Maybe> map (function [[_var _original _current]] @@ -63,35 +63,35 @@ (All [a] (-> (Var a) (STM a))) (function [tx] (case (find-var-value var tx) - (#;Some value) + (#.Some value) [tx value] - #;None + #.None (let [value (raw-read var)] - [(#;Cons [var value value] tx) + [(#.Cons [var value value] tx) value])))) (def: #export (read! var) - {#;doc "Reads var immediately, without going through a transaction."} + {#.doc "Reads var immediately, without going through a transaction."} (All [a] (-> (Var a) (IO a))) (|> var - atom;read + atom.read (:: Functor<IO> map (get@ #value)))) (def: (update-tx-value var value tx) (All [a] (-> (Var a) a Tx Tx)) (case tx - #;Nil - #;Nil + #.Nil + #.Nil - (#;Cons [_var _original _current] tx') + (#.Cons [_var _original _current] tx') (if (is (:! (Var ($ +0)) var) (:! (Var ($ +0)) _var)) - (#;Cons [(:! (Var ($ +0)) _var) + (#.Cons [(:! (Var ($ +0)) _var) (:! ($ +0) _original) (:! ($ +0) value)] tx') - (#;Cons [_var _original _current] + (#.Cons [_var _original _current] (update-tx-value var value tx'))) )) @@ -99,59 +99,59 @@ (All [a] (-> a (Var a) (STM Unit))) (function [tx] (case (find-var-value var tx) - (#;Some _) + (#.Some _) [(update-tx-value var value tx) []] - #;None - [(#;Cons [var (raw-read var) value] tx) + #.None + [(#.Cons [var (raw-read var) value] tx) []]))) (def: #export (write! new-value var) - {#;doc "Writes value to var immediately, without going through a transaction."} + {#.doc "Writes value to var immediately, without going through a transaction."} (All [a] (-> a (Var a) (IO Unit))) (do Monad<IO> - [old (atom;read var) + [old (atom.read var) #let [old-value (get@ #value old) new (set@ #value new-value old)] - succeeded? (atom;compare-and-swap old new var)] + succeeded? (atom.compare-and-swap old new var)] (if succeeded? (do @ [_ (|> old (get@ #observers) - dict;values - (monad;map @ (function [f] (f new-value))))] + dict.values + (monad.map @ (function [f] (f new-value))))] (wrap [])) (write! new-value var)))) (def: #export (follow target) - {#;doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO (frp;Channel a)))) - (let [head (frp;channel ($ +0)) + {#.doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp.Channel a)))) + (let [head (frp.channel ($ +0)) channel-var (var head) observer (function [label value] - (case (io;run (|> channel-var raw-read (frp;write value))) - #;None + (case (io.run (|> channel-var raw-read (frp.write value))) + #.None ## By closing the output Channel, the ## observer becomes obsolete. - (atom;update (function [[value observers]] - [value (dict;remove label observers)]) + (atom.update (function [[value observers]] + [value (dict.remove label observers)]) target) - (#;Some tail') + (#.Some tail') (write! tail' channel-var)))] (do Monad<IO> - [_ (atom;update (function [[value observers]] + [_ (atom.update (function [[value observers]] (let [label (nat/encode (list/fold (function [key base] (case (nat/decode key) - (#;Left _) + (#.Left _) base - (#;Right key-num) + (#.Right key-num) (n/max key-num base))) +0 - (dict;keys observers)))] - [value (dict;put label (observer label) observers)])) + (dict.keys observers)))] + [value (dict.put label (observer label) observers)])) target)] (wrap head)))) @@ -182,19 +182,19 @@ (ma tx'))))) (def: #export (update! f var) - {#;doc "Will update a Var's value, and return a tuple with the old and the new values."} + {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (IO [a a]))) (io (loop [_ []] - (let [(^@ state [value observers]) (io;run (atom;read var)) + (let [(^@ state [value observers]) (io.run (atom.read var)) value' (f value)] - (if (io;run (atom;compare-and-swap state + (if (io.run (atom.compare-and-swap state [value' observers] var)) [value value'] (recur [])))))) (def: #export (update f var) - {#;doc "Will update a Var's value, and return a tuple with the old and the new values."} + {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) (do Monad<STM> [a (read var) @@ -204,7 +204,7 @@ (def: (can-commit? tx) (-> Tx Bool) - (list;every? (function [[_var _original _current]] + (list.every? (function [[_var _original _current]] (is _original (raw-read _var))) tx)) @@ -212,12 +212,12 @@ (-> (Ex [a] (Tx-Frame a)) Unit) (if (is _original _current) [] - (io;run (write! _current _var)))) + (io.run (write! _current _var)))) (def: fresh-tx Tx (list)) (def: pending-commits - (Var (Ex [a] [(STM a) (P;Promise a)])) + (Var (Ex [a] [(STM a) (P.Promise a)])) (var (:!! []))) (def: commit-processor-flag @@ -225,46 +225,46 @@ (atom false)) (def: (process-commit [stm-proc output]) - (-> [(STM Unit) (P;Promise Unit)] Top) + (-> [(STM Unit) (P.Promise Unit)] Top) (let [[finished-tx value] (stm-proc fresh-tx)] (if (can-commit? finished-tx) (exec (list/map commit-var finished-tx) - (io;run (P;resolve value output))) - (io;run (write! [stm-proc output] pending-commits))))) + (io.run (P.resolve value output))) + (io.run (write! [stm-proc output] pending-commits))))) (def: init-processor! (IO Unit) (do Monad<IO> - [flag (atom;read commit-processor-flag)] + [flag (atom.read commit-processor-flag)] (if flag (wrap []) (do @ - [was-first? (atom;compare-and-swap flag true commit-processor-flag)] + [was-first? (atom.compare-and-swap flag true commit-processor-flag)] (if was-first? (do Monad<IO> [inputs (follow pending-commits)] (exec (|> inputs - (:! (frp;Channel [(STM Unit) (P;Promise Unit)])) - (P;await (function recur [?inputs] + (:! (frp.Channel [(STM Unit) (P.Promise Unit)])) + (P.await (function recur [?inputs] (io (case ?inputs - #;Nil + #.Nil [] - (#;Cons head tail) + (#.Cons head tail) (exec (process-commit head) - (P;await recur tail))))))) + (P.await recur tail))))))) (wrap []))) (wrap []))) ))) (def: #export (commit stm-proc) - {#;doc "Commits a transaction and returns its result (asynchronously). + {#.doc "Commits a transaction and returns its result (asynchronously). Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first. For this reason, it's important to note that transactions must be free from side-effects, such as I/O."} - (All [a] (-> (STM a) (P;Promise a))) - (let [output (P;promise ($ +0))] - (exec (io;run init-processor!) - (io;run (write! [stm-proc output] pending-commits)) + (All [a] (-> (STM a) (P.Promise a))) + (let [output (P.promise ($ +0))] + (exec (io.run init-processor!) + (io.run (write! [stm-proc output] pending-commits)) output))) |