diff options
author | Eduardo Julian | 2017-01-27 06:10:37 -0400 |
---|---|---|
committer | Eduardo Julian | 2017-01-27 06:10:37 -0400 |
commit | 3f76b52f46b58ab0d1dd8846edb43db95c391219 (patch) | |
tree | e6e5e2118c31d177b53becabb697f3c41931c8cb /stdlib/source | |
parent | f81fd718643bfbf81420d3904bfc35d44fa25ed4 (diff) |
- Now, the only way to unfollow a Var is by closing the Chan.
- Following a var no longer requires a label.
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/concurrency/actor.lux | 2 | ||||
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 38 |
2 files changed, 21 insertions, 19 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux index e2842752b..97ce7dec3 100644 --- a/stdlib/source/lux/concurrency/actor.lux +++ b/stdlib/source/lux/concurrency/actor.lux @@ -40,7 +40,7 @@ self {#mailbox mailbox #kill-signal kill-signal #obituary obituary} - mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox)) + mailbox-chan (io;run (stm;follow mailbox)) step (step self) |mailbox| (stm;var mailbox-chan) _ (:: Monad<Promise> map diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 9d1459a2b..36eb6854e 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -4,12 +4,13 @@ applicative monad) [io #- run] - (data (coll [list #* "List/" Functor<List>] + (data (coll [list #* "List/" Functor<List> Fold<List>] [dict #+ Dict] ["Q" queue]) [product] [text] maybe + [number "Nat/" Codec<Text,Nat>] text/format) host [compiler] @@ -125,33 +126,34 @@ (wrap [])) (write! new-value var)))) -(def: #export (unfollow label target) - {#;doc "Stop tracking the changes to a Var. - - Caveat emptor: It won't close any Chan that used to track the changes."} - (All [a] (-> Text (Var a) (IO Unit))) - (do Monad<IO> - [[value observers] (atom;get target)] - (atom;set [value (dict;remove label observers)] - target))) - -(def: #export (follow label target) - {#;doc "Creates a channel (identified by a label) that will receive all changes to the value of the given var."} - (All [a] (-> Text (Var a) (IO (frp;Chan a)))) +(def: #export (follow target) + {#;doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp;Chan a)))) (let [head (frp;chan ($ +0)) chan-var (var head) - observer (lambda [value] + observer (lambda [label value] (case (io;run (|> chan-var raw-read (frp;write value))) #;None ## By closing the output Chan, the ## observer becomes obsolete. - (unfollow label chan-var) + (atom;update (lambda [[value observers]] + [value (dict;remove label observers)]) + target) (#;Some tail') (write! tail' chan-var)))] (do Monad<IO> [_ (atom;update (lambda [[value observers]] - [value (dict;put label observer observers)]) + (let [label (Nat/encode (List/fold (lambda [key base] + (case (Nat/decode key) + (#;Left _) + base + + (#;Right key-num) + (n.max key-num base))) + +0 + (dict;keys observers)))] + [value (dict;put label (observer label) observers)])) target)] (wrap head)))) @@ -255,7 +257,7 @@ [was-first? (atom;compare-and-swap flag true commit-processor-flag)] (if was-first? (do Monad<IO> - [inputs (follow "commit-processor" pending-commits)] + [inputs (follow pending-commits)] (exec (process-commit (:! (frp;Chan [(STM Unit) (Promise Unit)]) inputs)) (wrap []))) |