diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency')
-rw-r--r-- | stdlib/source/lux/control/concurrency/frp.lux | 22 | ||||
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 81 |
2 files changed, 63 insertions, 40 deletions
diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index a9beb4a0e..17ae28f41 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -8,7 +8,7 @@ ["." monad (#+ Monad do)]] [control ["." try (#+ Try)] - ["ex" exception (#+ exception:)] + ["." exception (#+ exception:)] ["." io (#+ IO io)]] [data ["." maybe ("#@." functor)] @@ -45,13 +45,13 @@ stopped? (current #.None)] (if stopped? ## I closed the sink. - (wrap (ex.return [])) + (wrap (exception.return [])) ## Someone else interacted with the sink. (do @ [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink. - (wrap (ex.throw channel-is-already-closed [])) + (wrap (exception.throw ..channel-is-already-closed [])) ## Someone else fed the sink while I was closing it. (recur []))))))) @@ -70,13 +70,13 @@ ## I fed the sink. (do @ [_ (atom.compare-and-swap current resolve-next sink)] - (wrap (ex.return []))) + (wrap (exception.return []))) ## Someone else interacted with the sink. (do @ [latter (atom.read sink)] (if (is? current latter) ## Someone else closed the sink while I was feeding it. - (wrap (ex.throw channel-is-already-closed [])) + (wrap (exception.throw ..channel-is-already-closed [])) ## Someone else fed the sink. (recur [])))))))))) @@ -85,14 +85,18 @@ (let [[promise resolve] (promise.promise [])] [promise (..sink resolve)])) -(structure: #export functor (Functor Channel) +(structure: #export functor + (Functor Channel) + (def: (map f) (promise@map (maybe@map (function (_ [head tail]) [(f head) (map f tail)]))))) -(structure: #export apply (Apply Channel) +(structure: #export apply + (Apply Channel) + (def: &functor ..functor) (def: (apply ff fa) @@ -108,7 +112,9 @@ (def: empty Channel (promise.resolved #.None)) -(structure: #export monad (Monad Channel) +(structure: #export monad + (Monad Channel) + (def: &functor ..functor) (def: (wrap a) diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux index 783bc2117..3065d8033 100644 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -3,9 +3,10 @@ [abstract [functor (#+ Functor)] [apply (#+ Apply)] - ["." monad (#+ do Monad)]] + ["." monad (#+ Monad do)]] [control - ["." io (#+ IO io)]] + ["." io (#+ IO io)] + ["." try]] [data ["." product] ["." maybe] @@ -16,53 +17,63 @@ [// ["." atom (#+ Atom atom)] ["." promise (#+ Promise Resolver)] - ["." frp]]) + ["." frp (#+ Channel Sink)]]) -(type: #export (Observer a) +(type: (Observer a) (-> a (IO Any))) (abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom [a (List (Observer a))]) + (Atom [a (List (Sink a))]) (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) (:abstraction (atom.atom [value (list)]))) - (def: read!! + (def: read! (All [a] (-> (Var a) a)) (|>> :representation atom.read io.run product.left)) - (def: #export (read! (^:representation var)) - {#.doc "Reads var immediately, without going through a transaction."} - (All [a] (-> (Var a) (IO a))) - (|> var - atom.read - (:: io.functor map product.left))) + (def: (un-follow sink var) + (All [a] (-> (Sink a) (Var a) (IO Any))) + (do io.monad + [_ (atom.update (function (_ [value observers]) + [value (list.filter (|>> (is? sink) not) observers)]) + (:representation var))] + (wrap []))) - (def: (write! new-value (^:representation var)) + (def: (write! new-value var) (All [a] (-> a (Var a) (IO Any))) (do io.monad - [(^@ old [_value _observers]) (atom.read var) - succeeded? (atom.compare-and-swap old [new-value _observers] var)] + [#let [var' (:representation var)] + (^@ old [old-value observers]) (atom.read var') + succeeded? (atom.compare-and-swap old [new-value observers] var')] (if succeeded? (do @ - [_ (monad.map @ (function (_ f) (f new-value)) _observers)] + [_ (monad.map @ (function (_ sink) + (do @ + [result (:: sink feed new-value)] + (case result + (#try.Success _) + (wrap []) + + (#try.Failure _) + (un-follow sink var)))) + observers)] (wrap [])) - (write! new-value (:abstraction var))))) + (write! new-value var)))) (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO (frp.Channel a)))) + (All [a] (-> (Var a) (IO [(Channel a) (Sink a)]))) (do io.monad - [#let [[channel source] (frp.channel []) - target (:representation target)] + [#let [[channel sink] (frp.channel [])] _ (atom.update (function (_ [value observers]) - [value (#.Cons (:: source feed) observers)]) - target)] - (wrap channel))) + [value (#.Cons sink observers)]) + (:representation target))] + (wrap [channel sink]))) ) (type: (Tx-Frame a) @@ -96,7 +107,7 @@ [tx value] #.None - (let [value (read!! var)] + (let [value (..read! var)] [(#.Cons [var value value] tx) value])))) @@ -129,16 +140,20 @@ []] #.None - [(#.Cons [var (read!! var) value] tx) + [(#.Cons [var (..read! var) value] tx) []]))) -(structure: #export functor (Functor STM) +(structure: #export functor + (Functor STM) + (def: (map f fa) (function (_ tx) (let [[tx' a] (fa tx)] [tx' (f a)])))) -(structure: #export apply (Apply STM) +(structure: #export apply + (Apply STM) + (def: &functor ..functor) (def: (apply ff fa) @@ -147,7 +162,9 @@ [tx'' a] (fa tx')] [tx'' (f a)])))) -(structure: #export monad (Monad STM) +(structure: #export monad + (Monad STM) + (def: &functor ..functor) (def: (wrap a) @@ -162,22 +179,22 @@ {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) (do ..monad - [a (read var) + [a (..read var) #let [a' (f a)] - _ (write a' var)] + _ (..write a' var)] (wrap [a a']))) (def: (can-commit? tx) (-> Tx Bit) (list.every? (function (_ [_var _original _current]) - (is? _original (read!! _var))) + (is? _original (..read! _var))) tx)) (def: (commit-var! [_var _original _current]) (-> (Ex [a] (Tx-Frame a)) (IO Any)) (if (is? _original _current) (io []) - (write! _current _var))) + (..write! _current _var))) (def: fresh-tx Tx (list)) |