aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/concurrency/stm.lux81
1 files changed, 34 insertions, 47 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 7886dda36..1fee00b7e 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -1,23 +1,22 @@
(;module:
lux
- (lux (control ["F" functor]
- ["A" applicative]
- ["M" monad #+ do Monad])
+ (lux (control [functor #+ Functor]
+ [applicative #+ Applicative]
+ [monad #+ do Monad])
[io #- run]
- (data (coll [list "L/" Functor<List> Fold<List>]
- [dict #+ Dict]
- ["Q" queue])
+ (data (coll [list "list/" Functor<List> Fold<List>]
+ [dict #+ Dict])
[product]
[text]
maybe
- [number "Nat/" Codec<Text,Nat>]
+ [number "nat/" Codec<Text,Nat>]
text/format)
[macro]
(macro [code]
["s" syntax #+ syntax: Syntax])
(concurrency [atom #+ Atom atom]
["P" promise]
- [frp])
+ [frp "frp/" Functor<Channel>])
))
(type: (Var-State a)
@@ -48,7 +47,7 @@
(def: raw-read
(All [a] (-> (Var a) a))
- (|>. atom;get io;run (get@ #value)))
+ (|>. atom;read io;run (get@ #value)))
(def: (find-var-value var tx)
(All [a] (-> (Var a) Tx (Maybe a)))
@@ -76,7 +75,7 @@
{#;doc "Reads var immediately, without going through a transaction."}
(All [a] (-> (Var a) (IO a)))
(|> var
- atom;get
+ atom;read
(:: Functor<IO> map (get@ #value))))
(def: (update-tx-value var value tx)
@@ -112,7 +111,7 @@
{#;doc "Writes value to var immediately, without going through a transaction."}
(All [a] (-> a (Var a) (IO Unit)))
(do Monad<IO>
- [old (atom;get var)
+ [old (atom;read var)
#let [old-value (get@ #value old)
new (set@ #value new-value old)]
succeeded? (atom;compare-and-swap old new var)]
@@ -121,7 +120,7 @@
[_ (|> old
(get@ #observers)
dict;values
- (M;map @ (function [f] (f new-value))))]
+ (monad;map @ (function [f] (f new-value))))]
(wrap []))
(write! new-value var))))
@@ -143,26 +142,26 @@
(write! tail' channel-var)))]
(do Monad<IO>
[_ (atom;update (function [[value observers]]
- (let [label (Nat/encode (L/fold (function [key base]
- (case (Nat/decode key)
- (#;Left _)
- base
-
- (#;Right key-num)
- (n.max key-num base)))
- +0
- (dict;keys observers)))]
+ (let [label (nat/encode (list/fold (function [key base]
+ (case (nat/decode key)
+ (#;Left _)
+ base
+
+ (#;Right key-num)
+ (n.max key-num base)))
+ +0
+ (dict;keys observers)))]
[value (dict;put label (observer label) observers)]))
target)]
(wrap head))))
-(struct: #export _ (F;Functor STM)
+(struct: #export _ (Functor STM)
(def: (map f fa)
(function [tx]
(let [[tx' a] (fa tx)]
[tx' (f a)]))))
-(struct: #export _ (A;Applicative STM)
+(struct: #export _ (Applicative STM)
(def: functor Functor<STM>)
(def: (wrap a)
@@ -186,7 +185,7 @@
{#;doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (IO [a a])))
(io (loop [_ []]
- (let [(^@ state [value observers]) (io;run (atom;get var))
+ (let [(^@ state [value observers]) (io;run (atom;read var))
value' (f value)]
(if (io;run (atom;compare-and-swap state
[value' observers]
@@ -225,31 +224,18 @@
(Atom Bool)
(atom false))
-(def: (process-commit commits)
- (-> (frp;Channel [(STM Unit) (P;Promise Unit)])
- (P;Promise Unit))
- (do P;Monad<Promise>
- [?head+tail commits]
- (case ?head+tail
- (#;Cons [stm-proc output] tail)
- (do @
- [#let [[finished-tx value] (stm-proc fresh-tx)]]
- (exec (if (can-commit? finished-tx)
- (exec (L/map commit-var finished-tx)
- (io;run (P;resolve value output))
- [])
- (exec (io;run (write! [stm-proc output] pending-commits))
- []))
- (process-commit tail)))
-
- #;Nil
- (undefined)
- )))
+(def: (process-commit [stm-proc output])
+ (-> [(STM Unit) (P;Promise Unit)] Top)
+ (let [[finished-tx value] (stm-proc fresh-tx)]
+ (if (can-commit? finished-tx)
+ (exec (list/map commit-var finished-tx)
+ (io;run (P;resolve value output)))
+ (io;run (write! [stm-proc output] pending-commits)))))
(def: init-processor!
(IO Unit)
(do Monad<IO>
- [flag (atom;get commit-processor-flag)]
+ [flag (atom;read commit-processor-flag)]
(if flag
(wrap [])
(do @
@@ -257,8 +243,9 @@
(if was-first?
(do Monad<IO>
[inputs (follow pending-commits)]
- (exec (process-commit (:! (frp;Channel [(STM Unit) (P;Promise Unit)])
- inputs))
+ (exec (|> inputs
+ (:! (frp;Channel [(STM Unit) (P;Promise Unit)]))
+ (frp/map process-commit))
(wrap [])))
(wrap [])))
)))