aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux245
1 files changed, 245 insertions, 0 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
new file mode 100644
index 000000000..3203b2d52
--- /dev/null
+++ b/stdlib/source/lux/control/concurrency/stm.lux
@@ -0,0 +1,245 @@
+(.module:
+ [lux #*
+ [control
+ [functor (#+ Functor)]
+ [apply (#+ Apply)]
+ ["." monad (#+ do Monad)]]
+ ["." io (#+ IO io)]
+ [data
+ ["." product]
+ ["." maybe]
+ [collection
+ ["." list ("list/." Functor<List> Fold<List>)]]]
+ [type
+ abstract]]
+ [//
+ ["." atom (#+ Atom atom)]
+ ["." promise (#+ Promise promise)]
+ ["." frp ("frp/." Functor<Channel>)]])
+
+(type: #export (Observer a)
+ (-> a (IO Any)))
+
+(abstract: #export (Var a)
+ {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
+
+ (Atom [a (List (Observer a))])
+
+ (def: #export (var value)
+ {#.doc "Creates a new STM var, with a default value."}
+ (All [a] (-> a (Var a)))
+ (:abstraction (atom.atom [value (list)])))
+
+ (def: read!!
+ (All [a] (-> (Var a) a))
+ (|>> :representation atom.read io.run product.left))
+
+ (def: #export (read! (^:representation var))
+ {#.doc "Reads var immediately, without going through a transaction."}
+ (All [a] (-> (Var a) (IO a)))
+ (|> var
+ atom.read
+ (:: io.Functor<IO> map product.left)))
+
+ (def: (write! new-value (^:representation var))
+ (All [a] (-> a (Var a) (IO Any)))
+ (do io.Monad<IO>
+ [(^@ old [_value _observers]) (atom.read var)
+ succeeded? (atom.compare-and-swap old [new-value _observers] var)]
+ (if succeeded?
+ (do @
+ [_ (monad.map @ (function (_ f) (f new-value)) _observers)]
+ (wrap []))
+ (write! new-value (:abstraction var)))))
+
+ ## TODO: Remove when possible
+ (def: (helper|follow var)
+ (All [a] (-> (Var a) (frp.Channel a)))
+ (frp.channel []))
+ (def: #export (follow target)
+ {#.doc "Creates a channel that will receive all changes to the value of the given var."}
+ (All [a] (-> (Var a) (IO (frp.Channel a))))
+ (do io.Monad<IO>
+ [#let [channel (helper|follow target)
+ target (:representation target)]
+ _ (atom.update (function (_ [value observers])
+ [value (#.Cons (frp.publish channel) observers)])
+ target)]
+ (wrap channel)))
+ )
+
+(type: (Tx-Frame a)
+ {#var (Var a)
+ #original a
+ #current a})
+
+(type: Tx
+ (List (Ex [a] (Tx-Frame a))))
+
+(type: #export (STM a)
+ {#.doc "A computation which updates a transaction and produces a value."}
+ (-> Tx [Tx a]))
+
+(def: (find-var-value var tx)
+ (All [a] (-> (Var a) Tx (Maybe a)))
+ (|> tx
+ (list.find (function (_ [_var _original _current])
+ (is? (:coerce (Var Any) var)
+ (:coerce (Var Any) _var))))
+ (:: maybe.Monad<Maybe> map (function (_ [_var _original _current])
+ _current))
+ (:assume)
+ ))
+
+(def: #export (read var)
+ (All [a] (-> (Var a) (STM a)))
+ (function (_ tx)
+ (case (find-var-value var tx)
+ (#.Some value)
+ [tx value]
+
+ #.None
+ (let [value (read!! var)]
+ [(#.Cons [var value value] tx)
+ value]))))
+
+(def: (update-tx-value var value tx)
+ (All [a] (-> (Var a) a Tx Tx))
+ (case tx
+ #.Nil
+ #.Nil
+
+ (#.Cons [_var _original _current] tx')
+ (if (is? (:coerce (Var Any) var)
+ (:coerce (Var Any) _var))
+ (#.Cons {#var (:coerce (Var Any) _var)
+ #original (:coerce Any _original)
+ #current (:coerce Any value)}
+ tx')
+ (#.Cons {#var _var
+ #original _original
+ #current _current}
+ (update-tx-value var value tx')))
+ ))
+
+(def: #export (write value var)
+ {#.doc "Writes value to var."}
+ (All [a] (-> a (Var a) (STM Any)))
+ (function (_ tx)
+ (case (find-var-value var tx)
+ (#.Some _)
+ [(update-tx-value var value tx)
+ []]
+
+ #.None
+ [(#.Cons [var (read!! var) value] tx)
+ []])))
+
+(structure: #export _ (Functor STM)
+ (def: (map f fa)
+ (function (_ tx)
+ (let [[tx' a] (fa tx)]
+ [tx' (f a)]))))
+
+(structure: #export _ (Apply STM)
+ (def: functor Functor<STM>)
+
+ (def: (apply ff fa)
+ (function (_ tx)
+ (let [[tx' f] (ff tx)
+ [tx'' a] (fa tx')]
+ [tx'' (f a)]))))
+
+(structure: #export _ (Monad STM)
+ (def: functor Functor<STM>)
+
+ (def: (wrap a)
+ (function (_ tx) [tx a]))
+
+ (def: (join mma)
+ (function (_ tx)
+ (let [[tx' ma] (mma tx)]
+ (ma tx')))))
+
+(def: #export (update f var)
+ {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
+ (All [a] (-> (-> a a) (Var a) (STM [a a])))
+ (do Monad<STM>
+ [a (read var)
+ #let [a' (f a)]
+ _ (write a' var)]
+ (wrap [a a'])))
+
+(def: (can-commit? tx)
+ (-> Tx Bit)
+ (list.every? (function (_ [_var _original _current])
+ (is? _original (read!! _var)))
+ tx))
+
+(def: (commit-var! [_var _original _current])
+ (-> (Ex [a] (Tx-Frame a)) Any)
+ (if (is? _original _current)
+ []
+ (io.run (write! _current _var))))
+
+(def: fresh-tx Tx (list))
+
+(type: Commit (Ex [a] [(STM a) (Promise a)]))
+
+(def: pending-commits
+ (Atom (Rec Commits (Promise [Commit Commits])))
+ (atom (promise #.None)))
+
+(def: commit-processor-flag
+ (Atom Bit)
+ (atom #0))
+
+(def: (issue-commit commit)
+ (-> Commit (IO Any))
+ (let [entry [commit (promise #.None)]]
+ (loop [|commits| (io.run (atom.read pending-commits))]
+ (case (promise.poll |commits|)
+ #.None
+ (do io.Monad<IO>
+ [resolved? (promise.resolve entry |commits|)]
+ (if resolved?
+ (atom.write (product.right entry) pending-commits)
+ (recur |commits|)))
+
+ (#.Some [head tail])
+ (recur tail)))))
+
+(def: (process-commit [stm-proc output])
+ (-> [(STM Any) (Promise Any)] Any)
+ (let [[finished-tx value] (stm-proc fresh-tx)]
+ (io.run (if (can-commit? finished-tx)
+ (exec (list/map commit-var! finished-tx)
+ (promise.resolve value output))
+ (issue-commit [stm-proc output])))))
+
+(def: init-processor!
+ (IO Any)
+ (do io.Monad<IO>
+ [flag (atom.read commit-processor-flag)]
+ (if flag
+ (wrap [])
+ (do @
+ [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)]
+ (if was-first?
+ (exec (|> (io.run (atom.read pending-commits))
+ (promise.await (function (recur [head tail])
+ (io (exec (process-commit (:coerce [(STM Any) (Promise Any)] head))
+ (promise.await recur tail))))))
+ (wrap []))
+ (wrap [])))
+ )))
+
+(def: #export (commit stm-proc)
+ {#.doc (doc "Commits a transaction and returns its result (asynchronously)."
+ "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
+ "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
+ (All [a] (-> (STM a) (Promise a)))
+ (let [output (promise #.None)]
+ (exec (io.run init-processor!)
+ (io.run (issue-commit [stm-proc output]))
+ output)))