diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 192 |
1 files changed, 101 insertions, 91 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 09daa5e2d..9b1cabe01 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -3,7 +3,7 @@ (lux (control [functor #+ Functor] [applicative #+ Applicative] [monad #+ do Monad]) - [io #- run] + [io #+ IO io] (data [product] [maybe] [number "nat/" Codec<Text,Nat>] @@ -11,13 +11,75 @@ (coll [list "list/" Functor<List> Fold<List>] [dict #+ Dict])) (concurrency [atom #+ Atom atom] - ["P" promise] + [promise #+ Promise promise] [frp "frp/" Functor<Channel>]) - )) + (type abstract))) -(type: #export (Var a) +(abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom [a (Dict Text (-> a (IO Unit)))])) + (Atom [a (Dict Text (-> a (IO Unit)))]) + + (def: #export (var value) + {#.doc "Creates a new STM var, with a default value."} + (All [a] (-> a (Var a))) + (@abstraction (atom.atom [value (dict.new text.Hash<Text>)]))) + + (def: read!! + (All [a] (-> (Var a) a)) + (|>> @representation atom.read io.run product.left)) + + (def: #export (read! (^@representation var)) + {#.doc "Reads var immediately, without going through a transaction."} + (All [a] (-> (Var a) (IO a))) + (|> var + atom.read + (:: io.Functor<IO> map product.left))) + + (def: (write! new-value (^@representation var)) + (All [a] (-> a (Var a) (IO Unit))) + (do io.Monad<IO> + [(^@ old [_value _observers]) (atom.read var) + succeeded? (atom.compare-and-swap old [new-value _observers] var)] + (if succeeded? + (do @ + [_ (|> _observers + dict.values + (monad.map @ (function [f] (f new-value))))] + (wrap [])) + (write! new-value (@abstraction var))))) + + (def: #export (follow (^@representation target)) + {#.doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp.Channel a)))) + (let [head (: (frp.Channel ($ +0)) (frp.channel)) + ## head (frp.channel) + channel-var (var head) + observer (function [label value] + (case (io.run (|> channel-var read!! (frp.write value))) + #.None + ## By closing the output Channel, the + ## observer becomes obsolete. + (atom.update (function [[value observers]] + [value (dict.remove label observers)]) + target) + + (#.Some tail') + (write! tail' channel-var)))] + (do io.Monad<IO> + [_ (atom.update (function [[value observers]] + (let [label (nat/encode (list/fold (function [key base] + (case (nat/decode key) + (#.Left _) + base + + (#.Right key-num) + (n/max key-num base))) + +0 + (dict.keys observers)))] + [value (dict.put label (observer label) observers)])) + target)] + (wrap head)))) + ) (type: (Tx-Frame a) {#var (Var a) @@ -31,15 +93,6 @@ {#.doc "A computation which updates a transaction and produces a value."} (-> Tx [Tx a])) -(def: #export (var value) - {#.doc "Creates a new STM var, with a default value."} - (All [a] (-> a (Var a))) - (atom.atom [value (dict.new text.Hash<Text>)])) - -(def: raw-read - (All [a] (-> (Var a) a)) - (|>> atom.read io.run product.left)) - (def: (find-var-value var tx) (All [a] (-> (Var a) Tx (Maybe a))) (|> tx @@ -59,17 +112,10 @@ [tx value] #.None - (let [value (raw-read var)] + (let [value (read!! var)] [(#.Cons [var value value] tx) value])))) -(def: #export (read! var) - {#.doc "Reads var immediately, without going through a transaction."} - (All [a] (-> (Var a) (IO a))) - (|> var - atom.read - (:: Functor<IO> map product.left))) - (def: (update-tx-value var value tx) (All [a] (-> (Var a) a Tx Tx)) (case tx @@ -99,54 +145,9 @@ []] #.None - [(#.Cons [var (raw-read var) value] tx) + [(#.Cons [var (read!! var) value] tx) []]))) -(def: (write! new-value var) - (All [a] (-> a (Var a) (IO Unit))) - (do Monad<IO> - [(^@ old [_value _observers]) (atom.read var) - succeeded? (atom.compare-and-swap old [new-value _observers] var)] - (if succeeded? - (do @ - [_ (|> _observers - dict.values - (monad.map @ (function [f] (f new-value))))] - (wrap [])) - (write! new-value var)))) - -(def: #export (follow target) - {#.doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO (frp.Channel a)))) - (let [head (: (frp.Channel ($ +0)) (frp.channel)) - ## head (frp.channel) - channel-var (var head) - observer (function [label value] - (case (io.run (|> channel-var raw-read (frp.write value))) - #.None - ## By closing the output Channel, the - ## observer becomes obsolete. - (atom.update (function [[value observers]] - [value (dict.remove label observers)]) - target) - - (#.Some tail') - (write! tail' channel-var)))] - (do Monad<IO> - [_ (atom.update (function [[value observers]] - (let [label (nat/encode (list/fold (function [key base] - (case (nat/decode key) - (#.Left _) - base - - (#.Right key-num) - (n/max key-num base))) - +0 - (dict.keys observers)))] - [value (dict.put label (observer label) observers)])) - target)] - (wrap head)))) - (struct: #export _ (Functor STM) (def: (map f fa) (function [tx] @@ -185,7 +186,7 @@ (def: (can-commit? tx) (-> Tx Bool) (list.every? (function [[_var _original _current]] - (is _original (raw-read _var))) + (is _original (read!! _var))) tx)) (def: (commit-var! [_var _original _current]) @@ -196,44 +197,53 @@ (def: fresh-tx Tx (list)) +(type: Commit (Ex [a] [(STM a) (Promise a)])) + (def: pending-commits - (Var (Ex [a] [(STM a) (P.Promise a)])) - (var (:!! []))) + (Atom (Rec Commits (Promise [Commit Commits]))) + (atom (promise #.None))) (def: commit-processor-flag (Atom Bool) (atom false)) +(def: (issue-commit commit) + (-> Commit (IO Unit)) + (let [entry [commit (promise #.None)]] + (loop [|commits| (io.run (atom.read pending-commits))] + (case (promise.poll |commits|) + #.None + (do io.Monad<IO> + [resolved? (promise.resolve entry |commits|)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|))) + + (#.Some [head tail]) + (recur tail))))) + (def: (process-commit [stm-proc output]) - (-> [(STM Unit) (P.Promise Unit)] Top) + (-> [(STM Unit) (Promise Unit)] Top) (let [[finished-tx value] (stm-proc fresh-tx)] (io.run (if (can-commit? finished-tx) (exec (list/map commit-var! finished-tx) - (P.resolve value output)) - (write! [stm-proc output] pending-commits))))) + (promise.resolve value output)) + (issue-commit [stm-proc output]))))) (def: init-processor! (IO Unit) - (do Monad<IO> + (do io.Monad<IO> [flag (atom.read commit-processor-flag)] (if flag (wrap []) (do @ [was-first? (atom.compare-and-swap flag true commit-processor-flag)] (if was-first? - (do Monad<IO> - [inputs (follow pending-commits)] - (exec (|> inputs - (:! (frp.Channel [(STM Unit) (P.Promise Unit)])) - (P.await (function recur [?inputs] - (io (case ?inputs - #.None - [] - - (#.Some [head tail]) - (exec (process-commit head) - (P.await recur tail))))))) - (wrap []))) + (exec (|> (io.run (atom.read pending-commits)) + (promise.await (function recur [[head tail]] + (io (exec (process-commit (:! [(STM Unit) (Promise Unit)] head)) + (promise.await recur tail)))))) + (wrap [])) (wrap []))) ))) @@ -243,8 +253,8 @@ Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first. For this reason, it's important to note that transactions must be free from side-effects, such as I/O."} - (All [a] (-> (STM a) (P.Promise a))) - (let [output (P.promise #.None)] + (All [a] (-> (STM a) (Promise a))) + (let [output (promise #.None)] (exec (io.run init-processor!) - (io.run (write! [stm-proc output] pending-commits)) + (io.run (issue-commit [stm-proc output])) output))) |