diff options
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/stm.lux | 92 |
1 files changed, 45 insertions, 47 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/stm.lux b/stdlib/source/library/lux/control/concurrency/stm.lux index c62540890..6b89926ff 100644 --- a/stdlib/source/library/lux/control/concurrency/stm.lux +++ b/stdlib/source/library/lux/control/concurrency/stm.lux @@ -24,55 +24,53 @@ (-> a (IO Any))) (abstract: .public (Var a) - {} - (Atom [a (List (Sink a))]) - (def: .public (var value) - (All (_ a) (-> a (Var a))) - (:abstraction (atom.atom [value (list)]))) - - (def: read! - (All (_ a) (-> (Var a) a)) - (|>> :representation atom.read! io.run! product.left)) - - (def: (un_follow! sink var) - (All (_ a) (-> (Sink a) (Var a) (IO Any))) - (do io.monad - [_ (atom.update! (function (_ [value observers]) - [value (list.only (|>> (same? sink) not) observers)]) - (:representation var))] - (in []))) - - (def: (write! new_value var) - (All (_ a) (-> a (Var a) (IO Any))) - (do [! io.monad] - [.let [var' (:representation var)] - (^@ old [old_value observers]) (atom.read! var') - succeeded? (atom.compare_and_swap! old [new_value observers] var')] - (if succeeded? - (do ! - [_ (monad.each ! (function (_ sink) - (do ! - [result (\ sink feed new_value)] - (case result - (#try.Success _) - (in []) - - (#try.Failure _) - (un_follow! sink var)))) - observers)] - (in [])) - (write! new_value var)))) - - (def: .public (follow! target) - (All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)]))) - (do io.monad - [.let [[channel sink] (frp.channel [])] - _ (atom.update! (function (_ [value observers]) - [value (#.Item sink observers)]) - (:representation target))] - (in [channel sink]))) + [(def: .public (var value) + (All (_ a) (-> a (Var a))) + (:abstraction (atom.atom [value (list)]))) + + (def: read! + (All (_ a) (-> (Var a) a)) + (|>> :representation atom.read! io.run! product.left)) + + (def: (un_follow! sink var) + (All (_ a) (-> (Sink a) (Var a) (IO Any))) + (do io.monad + [_ (atom.update! (function (_ [value observers]) + [value (list.only (|>> (same? sink) not) observers)]) + (:representation var))] + (in []))) + + (def: (write! new_value var) + (All (_ a) (-> a (Var a) (IO Any))) + (do [! io.monad] + [.let [var' (:representation var)] + (^@ old [old_value observers]) (atom.read! var') + succeeded? (atom.compare_and_swap! old [new_value observers] var')] + (if succeeded? + (do ! + [_ (monad.each ! (function (_ sink) + (do ! + [result (\ sink feed new_value)] + (case result + (#try.Success _) + (in []) + + (#try.Failure _) + (un_follow! sink var)))) + observers)] + (in [])) + (write! new_value var)))) + + (def: .public (follow! target) + (All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)]))) + (do io.monad + [.let [[channel sink] (frp.channel [])] + _ (atom.update! (function (_ [value observers]) + [value (#.Item sink observers)]) + (:representation target))] + (in [channel sink])))] ) (type: (Tx_Frame a) |