aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/library/lux/control/concurrency/stm.lux')
-rw-r--r--stdlib/source/library/lux/control/concurrency/stm.lux119
1 files changed, 69 insertions, 50 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/stm.lux b/stdlib/source/library/lux/control/concurrency/stm.lux
index be8203dde..918283f70 100644
--- a/stdlib/source/library/lux/control/concurrency/stm.lux
+++ b/stdlib/source/library/lux/control/concurrency/stm.lux
@@ -15,8 +15,9 @@
["[0]" list]]]
[macro
["^" pattern]]
- [type
- [abstract {"-" pattern}]]]]
+ [type {"+" sharing}
+ [abstract {"-" pattern}]
+ ["[0]" variance {"+" Mutable}]]]]
[//
["[0]" atom {"+" Atom atom}]
["[0]" async {"+" Async Resolver}]
@@ -25,70 +26,82 @@
(type: (Observer a)
(-> a (IO Any)))
-(abstract: .public (Var a)
+(abstract: .public (Var'' a)
(Atom [a (List (Sink a))])
+ (type: .public (Var' r w)
+ (Var'' (Mutable r w)))
+
+ (type: .public (Var a)
+ (Var'' (Mutable a a)))
+
(def: .public (var value)
(All (_ a) (-> a (Var a)))
- (abstraction (atom.atom [value (list)])))
+ (abstraction (atom.atom [(variance.write value) (list)])))
(def: read!
- (All (_ a) (-> (Var a) a))
- (|>> representation atom.read! io.run! product.left))
-
- (def: (un_follow! sink var)
- (All (_ a) (-> (Sink a) (Var a) (IO Any)))
- (do io.monad
- [_ (atom.update! (function (_ [value observers])
- [value (list.only (|>> (same? sink) not) observers)])
- (representation var))]
- (in [])))
+ (All (_ r w) (-> (Var' r w) r))
+ (|>> representation atom.read! io.run! product.left variance.read))
(def: (write! new_value var)
- (All (_ a) (-> a (Var a) (IO Any)))
+ (All (_ r w) (-> w (Var' r w) (IO Any)))
(do [! io.monad]
[.let [var' (representation var)]
- (^.let old [old_value observers]) (atom.read! var')
- succeeded? (atom.compare_and_swap! old [new_value observers] var')]
+ (^.let old [_ observers]) (atom.read! var')
+ succeeded? (atom.compare_and_swap! old [(variance.write new_value) observers] var')]
(if succeeded?
(do !
- [_ (monad.each ! (function (_ sink)
- (do !
- [result (# sink feed new_value)]
- (case result
- {try.#Success _}
- (in [])
-
- {try.#Failure _}
- (un_follow! sink var))))
- observers)]
+ [banned (monad.only ! (function (_ sink)
+ (do !
+ [result (# sink feed (variance.write new_value))]
+ (in (case result
+ {try.#Success _}
+ false
+
+ {try.#Failure _}
+ true))))
+ observers)
+ _ (atom.update! (function (_ [value audience])
+ (|> audience
+ (list.only (function (_ it)
+ (not (list.any? (same? it) banned))))
+ [value]))
+ var')]
(in []))
(write! new_value var))))
(def: .public (changes target)
(All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)])))
(do io.monad
- [.let [[channel sink] (frp.channel [])]
+ [.let [[channel sink] (sharing [a]
+ (Var a)
+ target
+
+ [(Channel a) (Sink a)]
+ (frp.channel []))]
_ (atom.update! (function (_ [value observers])
- [value {.#Item sink observers}])
+ [value {.#Item (implementation
+ (def: close (# sink close))
+ (def: feed (|>> variance.read (# sink feed))))
+ observers}])
(representation target))]
(in [channel sink])))
)
-(type: (Tx_Frame a)
+(type: (Tx_Frame r w)
(Record
- [#var (Var a)
- #original a
- #current a]))
+ [#var (Var' r w)
+ #original r
+ #current w]))
(type: Tx
- (List (Ex (_ a) (Tx_Frame a))))
+ (List (Ex (_ r w) (Tx_Frame r w))))
(type: .public (STM a)
(-> Tx [Tx a]))
(def: (var_value var tx)
- (All (_ a) (-> (Var a) Tx (Maybe a)))
+ (All (_ r w) (-> (Var' r w) Tx (Maybe r)))
(|> tx
(list.example (function (_ [_var _original _current])
(same? (as (Var Any) var)
@@ -98,7 +111,7 @@
as_expected))
(def: .public (read var)
- (All (_ a) (-> (Var a) (STM a)))
+ (All (_ r w) (-> (Var' r w) (STM r)))
(function (_ tx)
(case (var_value var tx)
{.#Some value}
@@ -106,11 +119,14 @@
{.#None}
(let [value (..read! var)]
- [{.#Item [var value value] tx}
+ [{.#Item [#var var
+ #original value
+ #current (as_expected value)]
+ tx}
value]))))
(def: (with_updated_var var value tx)
- (All (_ a) (-> (Var a) a Tx Tx))
+ (All (_ r w) (-> (Var' r w) w Tx Tx))
(case tx
{.#End}
{.#End}
@@ -118,9 +134,9 @@
{.#Item [_var _original _current] tx'}
(if (same? (as (Var Any) var)
(as (Var Any) _var))
- {.#Item [#var (as (Var Any) _var)
- #original (as Any _original)
- #current (as Any value)]
+ {.#Item [#var _var
+ #original _original
+ #current (as_expected value)]
tx'}
{.#Item [#var _var
#original _original
@@ -128,7 +144,7 @@
(with_updated_var var value tx')})))
(def: .public (write value var)
- (All (_ a) (-> a (Var a) (STM Any)))
+ (All (_ r w) (-> w (Var' r w) (STM Any)))
(function (_ tx)
(case (var_value var tx)
{.#Some _}
@@ -136,7 +152,10 @@
[]]
{.#None}
- [{.#Item [var (..read! var) value] tx}
+ [{.#Item [#var var
+ #original (..read! var)
+ #current value]
+ tx}
[]])))
(implementation: .public functor
@@ -173,12 +192,12 @@
(ma tx')))))
(def: .public (update f var)
- (All (_ a) (-> (-> a a) (Var a) (STM [a a])))
+ (All (_ r w) (-> (-> r w) (Var' r w) (STM [r w])))
(do ..monad
- [a (..read var)
- .let [a' (f a)]
- _ (..write a' var)]
- (in [a a'])))
+ [before (..read var)
+ .let [after (f before)]
+ _ (..write after var)]
+ (in [before after])))
(def: (can_commit? tx)
(-> Tx Bit)
@@ -187,8 +206,8 @@
tx))
(def: (commit_var! [_var _original _current])
- (-> (Ex (_ a) (Tx_Frame a)) (IO Any))
- (if (same? _original _current)
+ (-> (Ex (_ r w) (Tx_Frame r w)) (IO Any))
+ (if (same? (as Any _original) (as Any _current))
(io [])
(..write! _current _var)))