aboutsummaryrefslogtreecommitdiff
path: root/stdlib
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib')
-rw-r--r--stdlib/source/lux/concurrency/actor.lux2
-rw-r--r--stdlib/source/lux/concurrency/stm.lux38
-rw-r--r--stdlib/test/test/lux/concurrency/stm.lux2
3 files changed, 22 insertions, 20 deletions
diff --git a/stdlib/source/lux/concurrency/actor.lux b/stdlib/source/lux/concurrency/actor.lux
index e2842752b..97ce7dec3 100644
--- a/stdlib/source/lux/concurrency/actor.lux
+++ b/stdlib/source/lux/concurrency/actor.lux
@@ -40,7 +40,7 @@
self {#mailbox mailbox
#kill-signal kill-signal
#obituary obituary}
- mailbox-chan (io;run (stm;follow "\tmailbox\t" mailbox))
+ mailbox-chan (io;run (stm;follow mailbox))
step (step self)
|mailbox| (stm;var mailbox-chan)
_ (:: Monad<Promise> map
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 9d1459a2b..36eb6854e 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -4,12 +4,13 @@
applicative
monad)
[io #- run]
- (data (coll [list #* "List/" Functor<List>]
+ (data (coll [list #* "List/" Functor<List> Fold<List>]
[dict #+ Dict]
["Q" queue])
[product]
[text]
maybe
+ [number "Nat/" Codec<Text,Nat>]
text/format)
host
[compiler]
@@ -125,33 +126,34 @@
(wrap []))
(write! new-value var))))
-(def: #export (unfollow label target)
- {#;doc "Stop tracking the changes to a Var.
-
- Caveat emptor: It won't close any Chan that used to track the changes."}
- (All [a] (-> Text (Var a) (IO Unit)))
- (do Monad<IO>
- [[value observers] (atom;get target)]
- (atom;set [value (dict;remove label observers)]
- target)))
-
-(def: #export (follow label target)
- {#;doc "Creates a channel (identified by a label) that will receive all changes to the value of the given var."}
- (All [a] (-> Text (Var a) (IO (frp;Chan a))))
+(def: #export (follow target)
+ {#;doc "Creates a channel that will receive all changes to the value of the given var."}
+ (All [a] (-> (Var a) (IO (frp;Chan a))))
(let [head (frp;chan ($ +0))
chan-var (var head)
- observer (lambda [value]
+ observer (lambda [label value]
(case (io;run (|> chan-var raw-read (frp;write value)))
#;None
## By closing the output Chan, the
## observer becomes obsolete.
- (unfollow label chan-var)
+ (atom;update (lambda [[value observers]]
+ [value (dict;remove label observers)])
+ target)
(#;Some tail')
(write! tail' chan-var)))]
(do Monad<IO>
[_ (atom;update (lambda [[value observers]]
- [value (dict;put label observer observers)])
+ (let [label (Nat/encode (List/fold (lambda [key base]
+ (case (Nat/decode key)
+ (#;Left _)
+ base
+
+ (#;Right key-num)
+ (n.max key-num base)))
+ +0
+ (dict;keys observers)))]
+ [value (dict;put label (observer label) observers)]))
target)]
(wrap head))))
@@ -255,7 +257,7 @@
[was-first? (atom;compare-and-swap flag true commit-processor-flag)]
(if was-first?
(do Monad<IO>
- [inputs (follow "commit-processor" pending-commits)]
+ [inputs (follow pending-commits)]
(exec (process-commit (:! (frp;Chan [(STM Unit) (Promise Unit)])
inputs))
(wrap [])))
diff --git a/stdlib/test/test/lux/concurrency/stm.lux b/stdlib/test/test/lux/concurrency/stm.lux
index ef100d744..10c6a5242 100644
--- a/stdlib/test/test/lux/concurrency/stm.lux
+++ b/stdlib/test/test/lux/concurrency/stm.lux
@@ -17,7 +17,7 @@
(test: "STM"
(do promise;Monad<Promise>
[#let [_var (&;var 0)
- changes (io;run (&;follow "test" _var))]
+ changes (io;run (&;follow _var))]
output1 (&;commit (&;read _var))
output2 (&;commit (do &;Monad<STM>
[_ (&;write 5 _var)]