aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux81
1 files changed, 49 insertions, 32 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 783bc2117..3065d8033 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -3,9 +3,10 @@
[abstract
[functor (#+ Functor)]
[apply (#+ Apply)]
- ["." monad (#+ do Monad)]]
+ ["." monad (#+ Monad do)]]
[control
- ["." io (#+ IO io)]]
+ ["." io (#+ IO io)]
+ ["." try]]
[data
["." product]
["." maybe]
@@ -16,53 +17,63 @@
[//
["." atom (#+ Atom atom)]
["." promise (#+ Promise Resolver)]
- ["." frp]])
+ ["." frp (#+ Channel Sink)]])
-(type: #export (Observer a)
+(type: (Observer a)
(-> a (IO Any)))
(abstract: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom [a (List (Observer a))])
+ (Atom [a (List (Sink a))])
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
(:abstraction (atom.atom [value (list)])))
- (def: read!!
+ (def: read!
(All [a] (-> (Var a) a))
(|>> :representation atom.read io.run product.left))
- (def: #export (read! (^:representation var))
- {#.doc "Reads var immediately, without going through a transaction."}
- (All [a] (-> (Var a) (IO a)))
- (|> var
- atom.read
- (:: io.functor map product.left)))
+ (def: (un-follow sink var)
+ (All [a] (-> (Sink a) (Var a) (IO Any)))
+ (do io.monad
+ [_ (atom.update (function (_ [value observers])
+ [value (list.filter (|>> (is? sink) not) observers)])
+ (:representation var))]
+ (wrap [])))
- (def: (write! new-value (^:representation var))
+ (def: (write! new-value var)
(All [a] (-> a (Var a) (IO Any)))
(do io.monad
- [(^@ old [_value _observers]) (atom.read var)
- succeeded? (atom.compare-and-swap old [new-value _observers] var)]
+ [#let [var' (:representation var)]
+ (^@ old [old-value observers]) (atom.read var')
+ succeeded? (atom.compare-and-swap old [new-value observers] var')]
(if succeeded?
(do @
- [_ (monad.map @ (function (_ f) (f new-value)) _observers)]
+ [_ (monad.map @ (function (_ sink)
+ (do @
+ [result (:: sink feed new-value)]
+ (case result
+ (#try.Success _)
+ (wrap [])
+
+ (#try.Failure _)
+ (un-follow sink var))))
+ observers)]
(wrap []))
- (write! new-value (:abstraction var)))))
+ (write! new-value var))))
(def: #export (follow target)
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
- (All [a] (-> (Var a) (IO (frp.Channel a))))
+ (All [a] (-> (Var a) (IO [(Channel a) (Sink a)])))
(do io.monad
- [#let [[channel source] (frp.channel [])
- target (:representation target)]
+ [#let [[channel sink] (frp.channel [])]
_ (atom.update (function (_ [value observers])
- [value (#.Cons (:: source feed) observers)])
- target)]
- (wrap channel)))
+ [value (#.Cons sink observers)])
+ (:representation target))]
+ (wrap [channel sink])))
)
(type: (Tx-Frame a)
@@ -96,7 +107,7 @@
[tx value]
#.None
- (let [value (read!! var)]
+ (let [value (..read! var)]
[(#.Cons [var value value] tx)
value]))))
@@ -129,16 +140,20 @@
[]]
#.None
- [(#.Cons [var (read!! var) value] tx)
+ [(#.Cons [var (..read! var) value] tx)
[]])))
-(structure: #export functor (Functor STM)
+(structure: #export functor
+ (Functor STM)
+
(def: (map f fa)
(function (_ tx)
(let [[tx' a] (fa tx)]
[tx' (f a)]))))
-(structure: #export apply (Apply STM)
+(structure: #export apply
+ (Apply STM)
+
(def: &functor ..functor)
(def: (apply ff fa)
@@ -147,7 +162,9 @@
[tx'' a] (fa tx')]
[tx'' (f a)]))))
-(structure: #export monad (Monad STM)
+(structure: #export monad
+ (Monad STM)
+
(def: &functor ..functor)
(def: (wrap a)
@@ -162,22 +179,22 @@
{#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (STM [a a])))
(do ..monad
- [a (read var)
+ [a (..read var)
#let [a' (f a)]
- _ (write a' var)]
+ _ (..write a' var)]
(wrap [a a'])))
(def: (can-commit? tx)
(-> Tx Bit)
(list.every? (function (_ [_var _original _current])
- (is? _original (read!! _var)))
+ (is? _original (..read! _var)))
tx))
(def: (commit-var! [_var _original _current])
(-> (Ex [a] (Tx-Frame a)) (IO Any))
(if (is? _original _current)
(io [])
- (write! _current _var)))
+ (..write! _current _var)))
(def: fresh-tx Tx (list))