diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 38 |
1 files changed, 20 insertions, 18 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 9d1459a2b..36eb6854e 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -4,12 +4,13 @@ applicative monad) [io #- run] - (data (coll [list #* "List/" Functor<List>] + (data (coll [list #* "List/" Functor<List> Fold<List>] [dict #+ Dict] ["Q" queue]) [product] [text] maybe + [number "Nat/" Codec<Text,Nat>] text/format) host [compiler] @@ -125,33 +126,34 @@ (wrap [])) (write! new-value var)))) -(def: #export (unfollow label target) - {#;doc "Stop tracking the changes to a Var. - - Caveat emptor: It won't close any Chan that used to track the changes."} - (All [a] (-> Text (Var a) (IO Unit))) - (do Monad<IO> - [[value observers] (atom;get target)] - (atom;set [value (dict;remove label observers)] - target))) - -(def: #export (follow label target) - {#;doc "Creates a channel (identified by a label) that will receive all changes to the value of the given var."} - (All [a] (-> Text (Var a) (IO (frp;Chan a)))) +(def: #export (follow target) + {#;doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp;Chan a)))) (let [head (frp;chan ($ +0)) chan-var (var head) - observer (lambda [value] + observer (lambda [label value] (case (io;run (|> chan-var raw-read (frp;write value))) #;None ## By closing the output Chan, the ## observer becomes obsolete. - (unfollow label chan-var) + (atom;update (lambda [[value observers]] + [value (dict;remove label observers)]) + target) (#;Some tail') (write! tail' chan-var)))] (do Monad<IO> [_ (atom;update (lambda [[value observers]] - [value (dict;put label observer observers)]) + (let [label (Nat/encode (List/fold (lambda [key base] + (case (Nat/decode key) + (#;Left _) + base + + (#;Right key-num) + (n.max key-num base))) + +0 + (dict;keys observers)))] + [value (dict;put label (observer label) observers)])) target)] (wrap head)))) @@ -255,7 +257,7 @@ [was-first? (atom;compare-and-swap flag true commit-processor-flag)] (if was-first? (do Monad<IO> - [inputs (follow "commit-processor" pending-commits)] + [inputs (follow pending-commits)] (exec (process-commit (:! (frp;Chan [(STM Unit) (Promise Unit)]) inputs)) (wrap []))) |