aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/concurrency/stm.lux122
1 files changed, 61 insertions, 61 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 4aaee3580..f7c7664f1 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -1,4 +1,4 @@
-(;module:
+(.module:
lux
(lux (control [functor #+ Functor]
[applicative #+ Applicative]
@@ -24,7 +24,7 @@
#observers (Dict Text (-> a (IO Unit)))})
(type: #export (Var a)
- {#;doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
+ {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
(Atom (Var-State a)))
(type: (Tx-Frame a)
@@ -36,23 +36,23 @@
(List (Ex [a] (Tx-Frame a))))
(type: #export (STM a)
- {#;doc "A computation which updates a transaction and produces a value."}
+ {#.doc "A computation which updates a transaction and produces a value."}
(-> Tx [Tx a]))
(def: #export (var value)
- {#;doc "Creates a new STM var, with a default value."}
+ {#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
- (atom;atom {#value value
- #observers (dict;new text;Hash<Text>)}))
+ (atom.atom {#value value
+ #observers (dict.new text.Hash<Text>)}))
(def: raw-read
(All [a] (-> (Var a) a))
- (|>> atom;read io;run (get@ #value)))
+ (|>> atom.read io.run (get@ #value)))
(def: (find-var-value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
(|> tx
- (list;find (function [[_var _original _current]]
+ (list.find (function [[_var _original _current]]
(is (:! (Var Unit) var)
(:! (Var Unit) _var))))
(:: Monad<Maybe> map (function [[_var _original _current]]
@@ -63,35 +63,35 @@
(All [a] (-> (Var a) (STM a)))
(function [tx]
(case (find-var-value var tx)
- (#;Some value)
+ (#.Some value)
[tx value]
- #;None
+ #.None
(let [value (raw-read var)]
- [(#;Cons [var value value] tx)
+ [(#.Cons [var value value] tx)
value]))))
(def: #export (read! var)
- {#;doc "Reads var immediately, without going through a transaction."}
+ {#.doc "Reads var immediately, without going through a transaction."}
(All [a] (-> (Var a) (IO a)))
(|> var
- atom;read
+ atom.read
(:: Functor<IO> map (get@ #value))))
(def: (update-tx-value var value tx)
(All [a] (-> (Var a) a Tx Tx))
(case tx
- #;Nil
- #;Nil
+ #.Nil
+ #.Nil
- (#;Cons [_var _original _current] tx')
+ (#.Cons [_var _original _current] tx')
(if (is (:! (Var ($ +0)) var)
(:! (Var ($ +0)) _var))
- (#;Cons [(:! (Var ($ +0)) _var)
+ (#.Cons [(:! (Var ($ +0)) _var)
(:! ($ +0) _original)
(:! ($ +0) value)]
tx')
- (#;Cons [_var _original _current]
+ (#.Cons [_var _original _current]
(update-tx-value var value tx')))
))
@@ -99,59 +99,59 @@
(All [a] (-> a (Var a) (STM Unit)))
(function [tx]
(case (find-var-value var tx)
- (#;Some _)
+ (#.Some _)
[(update-tx-value var value tx)
[]]
- #;None
- [(#;Cons [var (raw-read var) value] tx)
+ #.None
+ [(#.Cons [var (raw-read var) value] tx)
[]])))
(def: #export (write! new-value var)
- {#;doc "Writes value to var immediately, without going through a transaction."}
+ {#.doc "Writes value to var immediately, without going through a transaction."}
(All [a] (-> a (Var a) (IO Unit)))
(do Monad<IO>
- [old (atom;read var)
+ [old (atom.read var)
#let [old-value (get@ #value old)
new (set@ #value new-value old)]
- succeeded? (atom;compare-and-swap old new var)]
+ succeeded? (atom.compare-and-swap old new var)]
(if succeeded?
(do @
[_ (|> old
(get@ #observers)
- dict;values
- (monad;map @ (function [f] (f new-value))))]
+ dict.values
+ (monad.map @ (function [f] (f new-value))))]
(wrap []))
(write! new-value var))))
(def: #export (follow target)
- {#;doc "Creates a channel that will receive all changes to the value of the given var."}
- (All [a] (-> (Var a) (IO (frp;Channel a))))
- (let [head (frp;channel ($ +0))
+ {#.doc "Creates a channel that will receive all changes to the value of the given var."}
+ (All [a] (-> (Var a) (IO (frp.Channel a))))
+ (let [head (frp.channel ($ +0))
channel-var (var head)
observer (function [label value]
- (case (io;run (|> channel-var raw-read (frp;write value)))
- #;None
+ (case (io.run (|> channel-var raw-read (frp.write value)))
+ #.None
## By closing the output Channel, the
## observer becomes obsolete.
- (atom;update (function [[value observers]]
- [value (dict;remove label observers)])
+ (atom.update (function [[value observers]]
+ [value (dict.remove label observers)])
target)
- (#;Some tail')
+ (#.Some tail')
(write! tail' channel-var)))]
(do Monad<IO>
- [_ (atom;update (function [[value observers]]
+ [_ (atom.update (function [[value observers]]
(let [label (nat/encode (list/fold (function [key base]
(case (nat/decode key)
- (#;Left _)
+ (#.Left _)
base
- (#;Right key-num)
+ (#.Right key-num)
(n/max key-num base)))
+0
- (dict;keys observers)))]
- [value (dict;put label (observer label) observers)]))
+ (dict.keys observers)))]
+ [value (dict.put label (observer label) observers)]))
target)]
(wrap head))))
@@ -182,19 +182,19 @@
(ma tx')))))
(def: #export (update! f var)
- {#;doc "Will update a Var's value, and return a tuple with the old and the new values."}
+ {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (IO [a a])))
(io (loop [_ []]
- (let [(^@ state [value observers]) (io;run (atom;read var))
+ (let [(^@ state [value observers]) (io.run (atom.read var))
value' (f value)]
- (if (io;run (atom;compare-and-swap state
+ (if (io.run (atom.compare-and-swap state
[value' observers]
var))
[value value']
(recur []))))))
(def: #export (update f var)
- {#;doc "Will update a Var's value, and return a tuple with the old and the new values."}
+ {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (STM [a a])))
(do Monad<STM>
[a (read var)
@@ -204,7 +204,7 @@
(def: (can-commit? tx)
(-> Tx Bool)
- (list;every? (function [[_var _original _current]]
+ (list.every? (function [[_var _original _current]]
(is _original (raw-read _var)))
tx))
@@ -212,12 +212,12 @@
(-> (Ex [a] (Tx-Frame a)) Unit)
(if (is _original _current)
[]
- (io;run (write! _current _var))))
+ (io.run (write! _current _var))))
(def: fresh-tx Tx (list))
(def: pending-commits
- (Var (Ex [a] [(STM a) (P;Promise a)]))
+ (Var (Ex [a] [(STM a) (P.Promise a)]))
(var (:!! [])))
(def: commit-processor-flag
@@ -225,46 +225,46 @@
(atom false))
(def: (process-commit [stm-proc output])
- (-> [(STM Unit) (P;Promise Unit)] Top)
+ (-> [(STM Unit) (P.Promise Unit)] Top)
(let [[finished-tx value] (stm-proc fresh-tx)]
(if (can-commit? finished-tx)
(exec (list/map commit-var finished-tx)
- (io;run (P;resolve value output)))
- (io;run (write! [stm-proc output] pending-commits)))))
+ (io.run (P.resolve value output)))
+ (io.run (write! [stm-proc output] pending-commits)))))
(def: init-processor!
(IO Unit)
(do Monad<IO>
- [flag (atom;read commit-processor-flag)]
+ [flag (atom.read commit-processor-flag)]
(if flag
(wrap [])
(do @
- [was-first? (atom;compare-and-swap flag true commit-processor-flag)]
+ [was-first? (atom.compare-and-swap flag true commit-processor-flag)]
(if was-first?
(do Monad<IO>
[inputs (follow pending-commits)]
(exec (|> inputs
- (:! (frp;Channel [(STM Unit) (P;Promise Unit)]))
- (P;await (function recur [?inputs]
+ (:! (frp.Channel [(STM Unit) (P.Promise Unit)]))
+ (P.await (function recur [?inputs]
(io (case ?inputs
- #;Nil
+ #.Nil
[]
- (#;Cons head tail)
+ (#.Cons head tail)
(exec (process-commit head)
- (P;await recur tail)))))))
+ (P.await recur tail)))))))
(wrap [])))
(wrap [])))
)))
(def: #export (commit stm-proc)
- {#;doc "Commits a transaction and returns its result (asynchronously).
+ {#.doc "Commits a transaction and returns its result (asynchronously).
Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first.
For this reason, it's important to note that transactions must be free from side-effects, such as I/O."}
- (All [a] (-> (STM a) (P;Promise a)))
- (let [output (P;promise ($ +0))]
- (exec (io;run init-processor!)
- (io;run (write! [stm-proc output] pending-commits))
+ (All [a] (-> (STM a) (P.Promise a)))
+ (let [output (P.promise ($ +0))]
+ (exec (io.run init-processor!)
+ (io.run (write! [stm-proc output] pending-commits))
output)))