aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/stm.lux')
-rw-r--r--stdlib/source/library/lux/control/concurrency/stm.lux92
1 files changed, 45 insertions, 47 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/stm.lux b/stdlib/source/library/lux/control/concurrency/stm.lux
index c62540890..6b89926ff 100644
--- a/stdlib/source/library/lux/control/concurrency/stm.lux
+++ b/stdlib/source/library/lux/control/concurrency/stm.lux
@@ -24,55 +24,53 @@
(-> a (IO Any)))
(abstract: .public (Var a)
- {}
-
(Atom [a (List (Sink a))])
- (def: .public (var value)
- (All (_ a) (-> a (Var a)))
- (:abstraction (atom.atom [value (list)])))
-
- (def: read!
- (All (_ a) (-> (Var a) a))
- (|>> :representation atom.read! io.run! product.left))
-
- (def: (un_follow! sink var)
- (All (_ a) (-> (Sink a) (Var a) (IO Any)))
- (do io.monad
- [_ (atom.update! (function (_ [value observers])
- [value (list.only (|>> (same? sink) not) observers)])
- (:representation var))]
- (in [])))
-
- (def: (write! new_value var)
- (All (_ a) (-> a (Var a) (IO Any)))
- (do [! io.monad]
- [.let [var' (:representation var)]
- (^@ old [old_value observers]) (atom.read! var')
- succeeded? (atom.compare_and_swap! old [new_value observers] var')]
- (if succeeded?
- (do !
- [_ (monad.each ! (function (_ sink)
- (do !
- [result (\ sink feed new_value)]
- (case result
- (#try.Success _)
- (in [])
-
- (#try.Failure _)
- (un_follow! sink var))))
- observers)]
- (in []))
- (write! new_value var))))
-
- (def: .public (follow! target)
- (All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)])))
- (do io.monad
- [.let [[channel sink] (frp.channel [])]
- _ (atom.update! (function (_ [value observers])
- [value (#.Item sink observers)])
- (:representation target))]
- (in [channel sink])))
+ [(def: .public (var value)
+ (All (_ a) (-> a (Var a)))
+ (:abstraction (atom.atom [value (list)])))
+
+ (def: read!
+ (All (_ a) (-> (Var a) a))
+ (|>> :representation atom.read! io.run! product.left))
+
+ (def: (un_follow! sink var)
+ (All (_ a) (-> (Sink a) (Var a) (IO Any)))
+ (do io.monad
+ [_ (atom.update! (function (_ [value observers])
+ [value (list.only (|>> (same? sink) not) observers)])
+ (:representation var))]
+ (in [])))
+
+ (def: (write! new_value var)
+ (All (_ a) (-> a (Var a) (IO Any)))
+ (do [! io.monad]
+ [.let [var' (:representation var)]
+ (^@ old [old_value observers]) (atom.read! var')
+ succeeded? (atom.compare_and_swap! old [new_value observers] var')]
+ (if succeeded?
+ (do !
+ [_ (monad.each ! (function (_ sink)
+ (do !
+ [result (\ sink feed new_value)]
+ (case result
+ (#try.Success _)
+ (in [])
+
+ (#try.Failure _)
+ (un_follow! sink var))))
+ observers)]
+ (in []))
+ (write! new_value var))))
+
+ (def: .public (follow! target)
+ (All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)])))
+ (do io.monad
+ [.let [[channel sink] (frp.channel [])]
+ _ (atom.update! (function (_ [value observers])
+ [value (#.Item sink observers)])
+ (:representation target))]
+ (in [channel sink])))]
)
(type: (Tx_Frame a)