diff options
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/control/concurrency/stm.lux | 273 |
1 files changed, 0 insertions, 273 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux deleted file mode 100644 index d375059a4..000000000 --- a/stdlib/source/lux/control/concurrency/stm.lux +++ /dev/null @@ -1,273 +0,0 @@ -(.module: - [lux #* - [abstract - [functor (#+ Functor)] - [apply (#+ Apply)] - ["." monad (#+ Monad do)]] - [control - ["." io (#+ IO io)] - ["." try]] - [data - ["." product] - ["." maybe] - [collection - ["." list]]] - [type - abstract]] - [// - ["." atom (#+ Atom atom)] - ["." promise (#+ Promise Resolver)] - ["." frp (#+ Channel Sink)]]) - -(type: (Observer a) - (-> a (IO Any))) - -(abstract: #export (Var a) - (Atom [a (List (Sink a))]) - - {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - - (def: #export (var value) - {#.doc "Creates a new STM var, with a default value."} - (All [a] (-> a (Var a))) - (:abstraction (atom.atom [value (list)]))) - - (def: read! - (All [a] (-> (Var a) a)) - (|>> :representation atom.read io.run product.left)) - - (def: (un_follow sink var) - (All [a] (-> (Sink a) (Var a) (IO Any))) - (do io.monad - [_ (atom.update (function (_ [value observers]) - [value (list.filter (|>> (is? sink) not) observers)]) - (:representation var))] - (wrap []))) - - (def: (write! new_value var) - (All [a] (-> a (Var a) (IO Any))) - (do {! io.monad} - [#let [var' (:representation var)] - (^@ old [old_value observers]) (atom.read var') - succeeded? (atom.compare_and_swap old [new_value observers] var')] - (if succeeded? - (do ! - [_ (monad.map ! (function (_ sink) - (do ! - [result (\ sink feed new_value)] - (case result - (#try.Success _) - (wrap []) - - (#try.Failure _) - (un_follow sink var)))) - observers)] - (wrap [])) - (write! new_value var)))) - - (def: #export (follow target) - {#.doc "Creates a channel that will receive all changes to the value of the given var."} - (All [a] (-> (Var a) (IO [(Channel a) (Sink a)]))) - (do io.monad - [#let [[channel sink] (frp.channel [])] - _ (atom.update (function (_ [value observers]) - [value (#.Cons sink observers)]) - (:representation target))] - (wrap [channel sink]))) - ) - -(type: (Tx_Frame a) - {#var (Var a) - #original a - #current a}) - -(type: Tx - (List (Ex [a] (Tx_Frame a)))) - -(type: #export (STM a) - {#.doc "A computation which updates a transaction and produces a value."} - (-> Tx [Tx a])) - -(def: (find_var_value var tx) - (All [a] (-> (Var a) Tx (Maybe a))) - (|> tx - (list.find (function (_ [_var _original _current]) - (is? (:as (Var Any) var) - (:as (Var Any) _var)))) - (\ maybe.monad map (function (_ [_var _original _current]) - _current)) - (:assume) - )) - -(def: #export (read var) - (All [a] (-> (Var a) (STM a))) - (function (_ tx) - (case (find_var_value var tx) - (#.Some value) - [tx value] - - #.None - (let [value (..read! var)] - [(#.Cons [var value value] tx) - value])))) - -(def: (update_tx_value var value tx) - (All [a] (-> (Var a) a Tx Tx)) - (case tx - #.Nil - #.Nil - - (#.Cons [_var _original _current] tx') - (if (is? (:as (Var Any) var) - (:as (Var Any) _var)) - (#.Cons {#var (:as (Var Any) _var) - #original (:as Any _original) - #current (:as Any value)} - tx') - (#.Cons {#var _var - #original _original - #current _current} - (update_tx_value var value tx'))))) - -(def: #export (write value var) - {#.doc "Writes value to var."} - (All [a] (-> a (Var a) (STM Any))) - (function (_ tx) - (case (find_var_value var tx) - (#.Some _) - [(update_tx_value var value tx) - []] - - #.None - [(#.Cons [var (..read! var) value] tx) - []]))) - -(implementation: #export functor - (Functor STM) - - (def: (map f fa) - (function (_ tx) - (let [[tx' a] (fa tx)] - [tx' (f a)])))) - -(implementation: #export apply - (Apply STM) - - (def: &functor ..functor) - - (def: (apply ff fa) - (function (_ tx) - (let [[tx' f] (ff tx) - [tx'' a] (fa tx')] - [tx'' (f a)])))) - -(implementation: #export monad - (Monad STM) - - (def: &functor ..functor) - - (def: (wrap a) - (function (_ tx) - [tx a])) - - (def: (join mma) - (function (_ tx) - (let [[tx' ma] (mma tx)] - (ma tx'))))) - -(def: #export (update f var) - {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} - (All [a] (-> (-> a a) (Var a) (STM [a a]))) - (do ..monad - [a (..read var) - #let [a' (f a)] - _ (..write a' var)] - (wrap [a a']))) - -(def: (can_commit? tx) - (-> Tx Bit) - (list.every? (function (_ [_var _original _current]) - (is? _original (..read! _var))) - tx)) - -(def: (commit_var! [_var _original _current]) - (-> (Ex [a] (Tx_Frame a)) (IO Any)) - (if (is? _original _current) - (io []) - (..write! _current _var))) - -(def: fresh_tx Tx (list)) - -(type: (Commit a) - [(STM a) - (Promise a) - (Resolver a)]) - -(def: pending_commits - (Atom (Rec Commits - [(Promise [(Ex [a] (Commit a)) Commits]) - (Resolver [(Ex [a] (Commit a)) Commits])])) - (atom (promise.promise []))) - -(def: commit_processor_flag - (Atom Bit) - (atom #0)) - -(def: (issue_commit commit) - (All [a] (-> (Commit a) (IO Any))) - (let [entry [commit (promise.promise [])]] - (do {! io.monad} - [|commits|&resolve (atom.read pending_commits)] - (loop [[|commits| resolve] |commits|&resolve] - (do ! - [|commits| (promise.poll |commits|)] - (case |commits| - #.None - (do io.monad - [resolved? (resolve entry)] - (if resolved? - (atom.write (product.right entry) pending_commits) - (recur |commits|&resolve))) - - (#.Some [head tail]) - (recur tail))))))) - -(def: (process_commit commit) - (All [a] (-> (Commit a) (IO Any))) - (let [[stm_proc output resolve] commit - [finished_tx value] (stm_proc fresh_tx)] - (if (can_commit? finished_tx) - (do {! io.monad} - [_ (monad.map ! commit_var! finished_tx)] - (resolve value)) - (issue_commit commit)))) - -(def: init_processor! - (IO Any) - (do {! io.monad} - [flag (atom.read commit_processor_flag)] - (if flag - (wrap []) - (do ! - [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)] - (if was_first? - (do ! - [[promise resolve] (atom.read pending_commits)] - (promise.await (function (recur [head [tail _resolve]]) - (do ! - [_ (process_commit head)] - (promise.await recur tail))) - promise)) - (wrap []))) - ))) - -(def: #export (commit stm_proc) - {#.doc (doc "Commits a transaction and returns its result (asynchronously)." - "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." - "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} - (All [a] (-> (STM a) (Promise a))) - (let [[output resolver] (promise.promise [])] - (exec (io.run (do io.monad - [_ init_processor!] - (issue_commit [stm_proc output resolver]))) - output))) |