diff options
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 81 |
1 files changed, 34 insertions, 47 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 7886dda36..1fee00b7e 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -1,23 +1,22 @@ (;module: lux - (lux (control ["F" functor] - ["A" applicative] - ["M" monad #+ do Monad]) + (lux (control [functor #+ Functor] + [applicative #+ Applicative] + [monad #+ do Monad]) [io #- run] - (data (coll [list "L/" Functor<List> Fold<List>] - [dict #+ Dict] - ["Q" queue]) + (data (coll [list "list/" Functor<List> Fold<List>] + [dict #+ Dict]) [product] [text] maybe - [number "Nat/" Codec<Text,Nat>] + [number "nat/" Codec<Text,Nat>] text/format) [macro] (macro [code] ["s" syntax #+ syntax: Syntax]) (concurrency [atom #+ Atom atom] ["P" promise] - [frp]) + [frp "frp/" Functor<Channel>]) )) (type: (Var-State a) @@ -48,7 +47,7 @@ (def: raw-read (All [a] (-> (Var a) a)) - (|>. atom;get io;run (get@ #value))) + (|>. atom;read io;run (get@ #value))) (def: (find-var-value var tx) (All [a] (-> (Var a) Tx (Maybe a))) @@ -76,7 +75,7 @@ {#;doc "Reads var immediately, without going through a transaction."} (All [a] (-> (Var a) (IO a))) (|> var - atom;get + atom;read (:: Functor<IO> map (get@ #value)))) (def: (update-tx-value var value tx) @@ -112,7 +111,7 @@ {#;doc "Writes value to var immediately, without going through a transaction."} (All [a] (-> a (Var a) (IO Unit))) (do Monad<IO> - [old (atom;get var) + [old (atom;read var) #let [old-value (get@ #value old) new (set@ #value new-value old)] succeeded? (atom;compare-and-swap old new var)] @@ -121,7 +120,7 @@ [_ (|> old (get@ #observers) dict;values - (M;map @ (function [f] (f new-value))))] + (monad;map @ (function [f] (f new-value))))] (wrap [])) (write! new-value var)))) @@ -143,26 +142,26 @@ (write! tail' channel-var)))] (do Monad<IO> [_ (atom;update (function [[value observers]] - (let [label (Nat/encode (L/fold (function [key base] - (case (Nat/decode key) - (#;Left _) - base - - (#;Right key-num) - (n.max key-num base))) - +0 - (dict;keys observers)))] + (let [label (nat/encode (list/fold (function [key base] + (case (nat/decode key) + (#;Left _) + base + + (#;Right key-num) + (n.max key-num base))) + +0 + (dict;keys observers)))] [value (dict;put label (observer label) observers)])) target)] (wrap head)))) -(struct: #export _ (F;Functor STM) +(struct: #export _ (Functor STM) (def: (map f fa) (function [tx] (let [[tx' a] (fa tx)] [tx' (f a)])))) -(struct: #export _ (A;Applicative STM) +(struct: #export _ (Applicative STM) (def: functor Functor<STM>) (def: (wrap a) @@ -186,7 +185,7 @@ {#;doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (IO [a a]))) (io (loop [_ []] - (let [(^@ state [value observers]) (io;run (atom;get var)) + (let [(^@ state [value observers]) (io;run (atom;read var)) value' (f value)] (if (io;run (atom;compare-and-swap state [value' observers] @@ -225,31 +224,18 @@ (Atom Bool) (atom false)) -(def: (process-commit commits) - (-> (frp;Channel [(STM Unit) (P;Promise Unit)]) - (P;Promise Unit)) - (do P;Monad<Promise> - [?head+tail commits] - (case ?head+tail - (#;Cons [stm-proc output] tail) - (do @ - [#let [[finished-tx value] (stm-proc fresh-tx)]] - (exec (if (can-commit? finished-tx) - (exec (L/map commit-var finished-tx) - (io;run (P;resolve value output)) - []) - (exec (io;run (write! [stm-proc output] pending-commits)) - [])) - (process-commit tail))) - - #;Nil - (undefined) - ))) +(def: (process-commit [stm-proc output]) + (-> [(STM Unit) (P;Promise Unit)] Top) + (let [[finished-tx value] (stm-proc fresh-tx)] + (if (can-commit? finished-tx) + (exec (list/map commit-var finished-tx) + (io;run (P;resolve value output))) + (io;run (write! [stm-proc output] pending-commits))))) (def: init-processor! (IO Unit) (do Monad<IO> - [flag (atom;get commit-processor-flag)] + [flag (atom;read commit-processor-flag)] (if flag (wrap []) (do @ @@ -257,8 +243,9 @@ (if was-first? (do Monad<IO> [inputs (follow pending-commits)] - (exec (process-commit (:! (frp;Channel [(STM Unit) (P;Promise Unit)]) - inputs)) + (exec (|> inputs + (:! (frp;Channel [(STM Unit) (P;Promise Unit)])) + (frp/map process-commit)) (wrap []))) (wrap []))) ))) |