diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/stm.lux | 119 |
1 files changed, 69 insertions, 50 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/stm.lux b/stdlib/source/library/lux/control/concurrency/stm.lux index be8203dde..918283f70 100644 --- a/stdlib/source/library/lux/control/concurrency/stm.lux +++ b/stdlib/source/library/lux/control/concurrency/stm.lux @@ -15,8 +15,9 @@ ["[0]" list]]] [macro ["^" pattern]] - [type - [abstract {"-" pattern}]]]] + [type {"+" sharing} + [abstract {"-" pattern}] + ["[0]" variance {"+" Mutable}]]]] [// ["[0]" atom {"+" Atom atom}] ["[0]" async {"+" Async Resolver}] @@ -25,70 +26,82 @@ (type: (Observer a) (-> a (IO Any))) -(abstract: .public (Var a) +(abstract: .public (Var'' a) (Atom [a (List (Sink a))]) + (type: .public (Var' r w) + (Var'' (Mutable r w))) + + (type: .public (Var a) + (Var'' (Mutable a a))) + (def: .public (var value) (All (_ a) (-> a (Var a))) - (abstraction (atom.atom [value (list)]))) + (abstraction (atom.atom [(variance.write value) (list)]))) (def: read! - (All (_ a) (-> (Var a) a)) - (|>> representation atom.read! io.run! product.left)) - - (def: (un_follow! sink var) - (All (_ a) (-> (Sink a) (Var a) (IO Any))) - (do io.monad - [_ (atom.update! (function (_ [value observers]) - [value (list.only (|>> (same? sink) not) observers)]) - (representation var))] - (in []))) + (All (_ r w) (-> (Var' r w) r)) + (|>> representation atom.read! io.run! product.left variance.read)) (def: (write! new_value var) - (All (_ a) (-> a (Var a) (IO Any))) + (All (_ r w) (-> w (Var' r w) (IO Any))) (do [! io.monad] [.let [var' (representation var)] - (^.let old [old_value observers]) (atom.read! var') - succeeded? (atom.compare_and_swap! old [new_value observers] var')] + (^.let old [_ observers]) (atom.read! var') + succeeded? (atom.compare_and_swap! old [(variance.write new_value) observers] var')] (if succeeded? (do ! - [_ (monad.each ! (function (_ sink) - (do ! - [result (# sink feed new_value)] - (case result - {try.#Success _} - (in []) - - {try.#Failure _} - (un_follow! sink var)))) - observers)] + [banned (monad.only ! (function (_ sink) + (do ! + [result (# sink feed (variance.write new_value))] + (in (case result + {try.#Success _} + false + + {try.#Failure _} + true)))) + observers) + _ (atom.update! (function (_ [value audience]) + (|> audience + (list.only (function (_ it) + (not (list.any? (same? it) banned)))) + [value])) + var')] (in [])) (write! new_value var)))) (def: .public (changes target) (All (_ a) (-> (Var a) (IO [(Channel a) (Sink a)]))) (do io.monad - [.let [[channel sink] (frp.channel [])] + [.let [[channel sink] (sharing [a] + (Var a) + target + + [(Channel a) (Sink a)] + (frp.channel []))] _ (atom.update! (function (_ [value observers]) - [value {.#Item sink observers}]) + [value {.#Item (implementation + (def: close (# sink close)) + (def: feed (|>> variance.read (# sink feed)))) + observers}]) (representation target))] (in [channel sink]))) ) -(type: (Tx_Frame a) +(type: (Tx_Frame r w) (Record - [#var (Var a) - #original a - #current a])) + [#var (Var' r w) + #original r + #current w])) (type: Tx - (List (Ex (_ a) (Tx_Frame a)))) + (List (Ex (_ r w) (Tx_Frame r w)))) (type: .public (STM a) (-> Tx [Tx a])) (def: (var_value var tx) - (All (_ a) (-> (Var a) Tx (Maybe a))) + (All (_ r w) (-> (Var' r w) Tx (Maybe r))) (|> tx (list.example (function (_ [_var _original _current]) (same? (as (Var Any) var) @@ -98,7 +111,7 @@ as_expected)) (def: .public (read var) - (All (_ a) (-> (Var a) (STM a))) + (All (_ r w) (-> (Var' r w) (STM r))) (function (_ tx) (case (var_value var tx) {.#Some value} @@ -106,11 +119,14 @@ {.#None} (let [value (..read! var)] - [{.#Item [var value value] tx} + [{.#Item [#var var + #original value + #current (as_expected value)] + tx} value])))) (def: (with_updated_var var value tx) - (All (_ a) (-> (Var a) a Tx Tx)) + (All (_ r w) (-> (Var' r w) w Tx Tx)) (case tx {.#End} {.#End} @@ -118,9 +134,9 @@ {.#Item [_var _original _current] tx'} (if (same? (as (Var Any) var) (as (Var Any) _var)) - {.#Item [#var (as (Var Any) _var) - #original (as Any _original) - #current (as Any value)] + {.#Item [#var _var + #original _original + #current (as_expected value)] tx'} {.#Item [#var _var #original _original @@ -128,7 +144,7 @@ (with_updated_var var value tx')}))) (def: .public (write value var) - (All (_ a) (-> a (Var a) (STM Any))) + (All (_ r w) (-> w (Var' r w) (STM Any))) (function (_ tx) (case (var_value var tx) {.#Some _} @@ -136,7 +152,10 @@ []] {.#None} - [{.#Item [var (..read! var) value] tx} + [{.#Item [#var var + #original (..read! var) + #current value] + tx} []]))) (implementation: .public functor @@ -173,12 +192,12 @@ (ma tx'))))) (def: .public (update f var) - (All (_ a) (-> (-> a a) (Var a) (STM [a a]))) + (All (_ r w) (-> (-> r w) (Var' r w) (STM [r w]))) (do ..monad - [a (..read var) - .let [a' (f a)] - _ (..write a' var)] - (in [a a']))) + [before (..read var) + .let [after (f before)] + _ (..write after var)] + (in [before after]))) (def: (can_commit? tx) (-> Tx Bit) @@ -187,8 +206,8 @@ tx)) (def: (commit_var! [_var _original _current]) - (-> (Ex (_ a) (Tx_Frame a)) (IO Any)) - (if (same? _original _current) + (-> (Ex (_ r w) (Tx_Frame r w)) (IO Any)) + (if (same? (as Any _original) (as Any _current)) (io []) (..write! _current _var))) |