diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/concurrency/actor.lux | 2 | ||||
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 38 | ||||
-rw-r--r-- | stdlib/test/test/lux/concurrency/stm.lux | 2 |
3 files changed, 22 insertions, 20 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux index e2842752b..97ce7dec3 100644 --- a/stdlib/source/lux/concurrency/actor.lux +++ b/stdlib/source/lux/concurrency/actor.lux @@ -40,7 +40,7 @@ self {#mailbox mailbox #kill-signal kill-signal #obituary obituary} - mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox)) + mailbox-chan (io;run (stm;follow mailbox)) step (step self) |mailbox| (stm;var mailbox-chan) _ (:: Monad<Promise> map diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 9d1459a2b..36eb6854e 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -4,12 +4,13 @@ applicative monad) [io #- run] - (data (coll [list #* "List/" Functor<List>] + (data (coll [list #* "List/" Functor<List> Fold<List>] [dict #+ Dict] ["Q" queue]) [product] [text] maybe + [number "Nat/" Codec<Text,Nat>] text/format) host [compiler] @@ -125,33 +126,34 @@ (wrap [])) (write! new-value var)))) -(def: #export (unfollow label target) - {#;doc "Stop tracking the changes to a Var. - - Caveat emptor: It won't close any Chan that used to track the changes."} - (All [a] (-> Text (Var a) (IO Unit))) - (do Monad<IO> - [[value observers] (atom;get target)] - (atom;set [value (dict;remove label observers)] - target))) - -(def: #export (follow label target) - {#;doc "Creates a channel (identified by a label) that will receive all changes to the value of the given var."} - (All [a] (-> Text (Var a) (IO (frp;Chan a)))) +(def: #export (follow target) + {#;doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp;Chan a)))) (let [head (frp;chan ($ +0)) chan-var (var head) - observer (lambda [value] + observer (lambda [label value] (case (io;run (|> chan-var raw-read (frp;write value))) #;None ## By closing the output Chan, the ## observer becomes obsolete. - (unfollow label chan-var) + (atom;update (lambda [[value observers]] + [value (dict;remove label observers)]) + target) (#;Some tail') (write! tail' chan-var)))] (do Monad<IO> [_ (atom;update (lambda [[value observers]] - [value (dict;put label observer observers)]) + (let [label (Nat/encode (List/fold (lambda [key base] + (case (Nat/decode key) + (#;Left _) + base + + (#;Right key-num) + (n.max key-num base))) + +0 + (dict;keys observers)))] + [value (dict;put label (observer label) observers)])) target)] (wrap head)))) @@ -255,7 +257,7 @@ [was-first? (atom;compare-and-swap flag true commit-processor-flag)] (if was-first? (do Monad<IO> - [inputs (follow "commit-processor" pending-commits)] + [inputs (follow pending-commits)] (exec (process-commit (:! (frp;Chan [(STM Unit) (Promise Unit)]) inputs)) (wrap []))) diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux index ef100d744..10c6a5242 100644 --- a/stdlib/test/test/lux/concurrency/stm.lux +++ b/stdlib/test/test/lux/concurrency/stm.lux @@ -17,7 +17,7 @@ (test: "STM" (do promise;Monad<Promise> [#let [_var (&;var 0) - changes (io;run (&;follow "test" _var))] + changes (io;run (&;follow _var))] output1 (&;commit (&;read _var)) output2 (&;commit (do &;Monad<STM> [_ (&;write 5 _var)] |