diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 245 |
1 files changed, 245 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux new file mode 100644 index 000000000..3203b2d52 --- /dev/null +++ b/stdlib/source/lux/control/concurrency/stm.lux @@ -0,0 +1,245 @@ +(.module: + [lux #* + [control + [functor (#+ Functor)] + [apply (#+ Apply)] + ["." monad (#+ do Monad)]] + ["." io (#+ IO io)] + [data + ["." product] + ["." maybe] + [collection + ["." list ("list/." Functor<List> Fold<List>)]]] + [type + abstract]] + [// + ["." atom (#+ Atom atom)] + ["." promise (#+ Promise promise)] + ["." frp ("frp/." Functor<Channel>)]]) + +(type: #export (Observer a) + (-> a (IO Any))) + +(abstract: #export (Var a) + {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} + + (Atom [a (List (Observer a))]) + + (def: #export (var value) + {#.doc "Creates a new STM var, with a default value."} + (All [a] (-> a (Var a))) + (:abstraction (atom.atom [value (list)]))) + + (def: read!! + (All [a] (-> (Var a) a)) + (|>> :representation atom.read io.run product.left)) + + (def: #export (read! (^:representation var)) + {#.doc "Reads var immediately, without going through a transaction."} + (All [a] (-> (Var a) (IO a))) + (|> var + atom.read + (:: io.Functor<IO> map product.left))) + + (def: (write! new-value (^:representation var)) + (All [a] (-> a (Var a) (IO Any))) + (do io.Monad<IO> + [(^@ old [_value _observers]) (atom.read var) + succeeded? (atom.compare-and-swap old [new-value _observers] var)] + (if succeeded? + (do @ + [_ (monad.map @ (function (_ f) (f new-value)) _observers)] + (wrap [])) + (write! new-value (:abstraction var))))) + + ## TODO: Remove when possible + (def: (helper|follow var) + (All [a] (-> (Var a) (frp.Channel a))) + (frp.channel [])) + (def: #export (follow target) + {#.doc "Creates a channel that will receive all changes to the value of the given var."} + (All [a] (-> (Var a) (IO (frp.Channel a)))) + (do io.Monad<IO> + [#let [channel (helper|follow target) + target (:representation target)] + _ (atom.update (function (_ [value observers]) + [value (#.Cons (frp.publish channel) observers)]) + target)] + (wrap channel))) + ) + +(type: (Tx-Frame a) + {#var (Var a) + #original a + #current a}) + +(type: Tx + (List (Ex [a] (Tx-Frame a)))) + +(type: #export (STM a) + {#.doc "A computation which updates a transaction and produces a value."} + (-> Tx [Tx a])) + +(def: (find-var-value var tx) + (All [a] (-> (Var a) Tx (Maybe a))) + (|> tx + (list.find (function (_ [_var _original _current]) + (is? (:coerce (Var Any) var) + (:coerce (Var Any) _var)))) + (:: maybe.Monad<Maybe> map (function (_ [_var _original _current]) + _current)) + (:assume) + )) + +(def: #export (read var) + (All [a] (-> (Var a) (STM a))) + (function (_ tx) + (case (find-var-value var tx) + (#.Some value) + [tx value] + + #.None + (let [value (read!! var)] + [(#.Cons [var value value] tx) + value])))) + +(def: (update-tx-value var value tx) + (All [a] (-> (Var a) a Tx Tx)) + (case tx + #.Nil + #.Nil + + (#.Cons [_var _original _current] tx') + (if (is? (:coerce (Var Any) var) + (:coerce (Var Any) _var)) + (#.Cons {#var (:coerce (Var Any) _var) + #original (:coerce Any _original) + #current (:coerce Any value)} + tx') + (#.Cons {#var _var + #original _original + #current _current} + (update-tx-value var value tx'))) + )) + +(def: #export (write value var) + {#.doc "Writes value to var."} + (All [a] (-> a (Var a) (STM Any))) + (function (_ tx) + (case (find-var-value var tx) + (#.Some _) + [(update-tx-value var value tx) + []] + + #.None + [(#.Cons [var (read!! var) value] tx) + []]))) + +(structure: #export _ (Functor STM) + (def: (map f fa) + (function (_ tx) + (let [[tx' a] (fa tx)] + [tx' (f a)])))) + +(structure: #export _ (Apply STM) + (def: functor Functor<STM>) + + (def: (apply ff fa) + (function (_ tx) + (let [[tx' f] (ff tx) + [tx'' a] (fa tx')] + [tx'' (f a)])))) + +(structure: #export _ (Monad STM) + (def: functor Functor<STM>) + + (def: (wrap a) + (function (_ tx) [tx a])) + + (def: (join mma) + (function (_ tx) + (let [[tx' ma] (mma tx)] + (ma tx'))))) + +(def: #export (update f var) + {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} + (All [a] (-> (-> a a) (Var a) (STM [a a]))) + (do Monad<STM> + [a (read var) + #let [a' (f a)] + _ (write a' var)] + (wrap [a a']))) + +(def: (can-commit? tx) + (-> Tx Bit) + (list.every? (function (_ [_var _original _current]) + (is? _original (read!! _var))) + tx)) + +(def: (commit-var! [_var _original _current]) + (-> (Ex [a] (Tx-Frame a)) Any) + (if (is? _original _current) + [] + (io.run (write! _current _var)))) + +(def: fresh-tx Tx (list)) + +(type: Commit (Ex [a] [(STM a) (Promise a)])) + +(def: pending-commits + (Atom (Rec Commits (Promise [Commit Commits]))) + (atom (promise #.None))) + +(def: commit-processor-flag + (Atom Bit) + (atom #0)) + +(def: (issue-commit commit) + (-> Commit (IO Any)) + (let [entry [commit (promise #.None)]] + (loop [|commits| (io.run (atom.read pending-commits))] + (case (promise.poll |commits|) + #.None + (do io.Monad<IO> + [resolved? (promise.resolve entry |commits|)] + (if resolved? + (atom.write (product.right entry) pending-commits) + (recur |commits|))) + + (#.Some [head tail]) + (recur tail))))) + +(def: (process-commit [stm-proc output]) + (-> [(STM Any) (Promise Any)] Any) + (let [[finished-tx value] (stm-proc fresh-tx)] + (io.run (if (can-commit? finished-tx) + (exec (list/map commit-var! finished-tx) + (promise.resolve value output)) + (issue-commit [stm-proc output]))))) + +(def: init-processor! + (IO Any) + (do io.Monad<IO> + [flag (atom.read commit-processor-flag)] + (if flag + (wrap []) + (do @ + [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] + (if was-first? + (exec (|> (io.run (atom.read pending-commits)) + (promise.await (function (recur [head tail]) + (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head)) + (promise.await recur tail)))))) + (wrap [])) + (wrap []))) + ))) + +(def: #export (commit stm-proc) + {#.doc (doc "Commits a transaction and returns its result (asynchronously)." + "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." + "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} + (All [a] (-> (STM a) (Promise a))) + (let [output (promise #.None)] + (exec (io.run init-processor!) + (io.run (issue-commit [stm-proc output])) + output))) |