aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r--stdlib/source/lux/control/concurrency/frp.lux22
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux81
2 files changed, 63 insertions, 40 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux
index a9beb4a0e..17ae28f41 100644
--- a/stdlib/source/lux/control/concurrency/frp.lux
+++ b/stdlib/source/lux/control/concurrency/frp.lux
@@ -8,7 +8,7 @@
["." monad (#+ Monad do)]]
[control
["." try (#+ Try)]
- ["ex" exception (#+ exception:)]
+ ["." exception (#+ exception:)]
["." io (#+ IO io)]]
[data
["." maybe ("#@." functor)]
@@ -45,13 +45,13 @@
stopped? (current #.None)]
(if stopped?
## I closed the sink.
- (wrap (ex.return []))
+ (wrap (exception.return []))
## Someone else interacted with the sink.
(do @
[latter (atom.read sink)]
(if (is? current latter)
## Someone else closed the sink.
- (wrap (ex.throw channel-is-already-closed []))
+ (wrap (exception.throw ..channel-is-already-closed []))
## Someone else fed the sink while I was closing it.
(recur [])))))))
@@ -70,13 +70,13 @@
## I fed the sink.
(do @
[_ (atom.compare-and-swap current resolve-next sink)]
- (wrap (ex.return [])))
+ (wrap (exception.return [])))
## Someone else interacted with the sink.
(do @
[latter (atom.read sink)]
(if (is? current latter)
## Someone else closed the sink while I was feeding it.
- (wrap (ex.throw channel-is-already-closed []))
+ (wrap (exception.throw ..channel-is-already-closed []))
## Someone else fed the sink.
(recur []))))))))))
@@ -85,14 +85,18 @@
(let [[promise resolve] (promise.promise [])]
[promise (..sink resolve)]))
-(structure: #export functor (Functor Channel)
+(structure: #export functor
+ (Functor Channel)
+
(def: (map f)
(promise@map
(maybe@map
(function (_ [head tail])
[(f head) (map f tail)])))))
-(structure: #export apply (Apply Channel)
+(structure: #export apply
+ (Apply Channel)
+
(def: &functor ..functor)
(def: (apply ff fa)
@@ -108,7 +112,9 @@
(def: empty Channel (promise.resolved #.None))
-(structure: #export monad (Monad Channel)
+(structure: #export monad
+ (Monad Channel)
+
(def: &functor ..functor)
(def: (wrap a)
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
index 783bc2117..3065d8033 100644
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -3,9 +3,10 @@
[abstract
[functor (#+ Functor)]
[apply (#+ Apply)]
- ["." monad (#+ do Monad)]]
+ ["." monad (#+ Monad do)]]
[control
- ["." io (#+ IO io)]]
+ ["." io (#+ IO io)]
+ ["." try]]
[data
["." product]
["." maybe]
@@ -16,53 +17,63 @@
[//
["." atom (#+ Atom atom)]
["." promise (#+ Promise Resolver)]
- ["." frp]])
+ ["." frp (#+ Channel Sink)]])
-(type: #export (Observer a)
+(type: (Observer a)
(-> a (IO Any)))
(abstract: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom [a (List (Observer a))])
+ (Atom [a (List (Sink a))])
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
(:abstraction (atom.atom [value (list)])))
- (def: read!!
+ (def: read!
(All [a] (-> (Var a) a))
(|>> :representation atom.read io.run product.left))
- (def: #export (read! (^:representation var))
- {#.doc "Reads var immediately, without going through a transaction."}
- (All [a] (-> (Var a) (IO a)))
- (|> var
- atom.read
- (:: io.functor map product.left)))
+ (def: (un-follow sink var)
+ (All [a] (-> (Sink a) (Var a) (IO Any)))
+ (do io.monad
+ [_ (atom.update (function (_ [value observers])
+ [value (list.filter (|>> (is? sink) not) observers)])
+ (:representation var))]
+ (wrap [])))
- (def: (write! new-value (^:representation var))
+ (def: (write! new-value var)
(All [a] (-> a (Var a) (IO Any)))
(do io.monad
- [(^@ old [_value _observers]) (atom.read var)
- succeeded? (atom.compare-and-swap old [new-value _observers] var)]
+ [#let [var' (:representation var)]
+ (^@ old [old-value observers]) (atom.read var')
+ succeeded? (atom.compare-and-swap old [new-value observers] var')]
(if succeeded?
(do @
- [_ (monad.map @ (function (_ f) (f new-value)) _observers)]
+ [_ (monad.map @ (function (_ sink)
+ (do @
+ [result (:: sink feed new-value)]
+ (case result
+ (#try.Success _)
+ (wrap [])
+
+ (#try.Failure _)
+ (un-follow sink var))))
+ observers)]
(wrap []))
- (write! new-value (:abstraction var)))))
+ (write! new-value var))))
(def: #export (follow target)
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
- (All [a] (-> (Var a) (IO (frp.Channel a))))
+ (All [a] (-> (Var a) (IO [(Channel a) (Sink a)])))
(do io.monad
- [#let [[channel source] (frp.channel [])
- target (:representation target)]
+ [#let [[channel sink] (frp.channel [])]
_ (atom.update (function (_ [value observers])
- [value (#.Cons (:: source feed) observers)])
- target)]
- (wrap channel)))
+ [value (#.Cons sink observers)])
+ (:representation target))]
+ (wrap [channel sink])))
)
(type: (Tx-Frame a)
@@ -96,7 +107,7 @@
[tx value]
#.None
- (let [value (read!! var)]
+ (let [value (..read! var)]
[(#.Cons [var value value] tx)
value]))))
@@ -129,16 +140,20 @@
[]]
#.None
- [(#.Cons [var (read!! var) value] tx)
+ [(#.Cons [var (..read! var) value] tx)
[]])))
-(structure: #export functor (Functor STM)
+(structure: #export functor
+ (Functor STM)
+
(def: (map f fa)
(function (_ tx)
(let [[tx' a] (fa tx)]
[tx' (f a)]))))
-(structure: #export apply (Apply STM)
+(structure: #export apply
+ (Apply STM)
+
(def: &functor ..functor)
(def: (apply ff fa)
@@ -147,7 +162,9 @@
[tx'' a] (fa tx')]
[tx'' (f a)]))))
-(structure: #export monad (Monad STM)
+(structure: #export monad
+ (Monad STM)
+
(def: &functor ..functor)
(def: (wrap a)
@@ -162,22 +179,22 @@
{#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
(All [a] (-> (-> a a) (Var a) (STM [a a])))
(do ..monad
- [a (read var)
+ [a (..read var)
#let [a' (f a)]
- _ (write a' var)]
+ _ (..write a' var)]
(wrap [a a'])))
(def: (can-commit? tx)
(-> Tx Bit)
(list.every? (function (_ [_var _original _current])
- (is? _original (read!! _var)))
+ (is? _original (..read! _var)))
tx))
(def: (commit-var! [_var _original _current])
(-> (Ex [a] (Tx-Frame a)) (IO Any))
(if (is? _original _current)
(io [])
- (write! _current _var)))
+ (..write! _current _var)))
(def: fresh-tx Tx (list))