aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/control/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/control/concurrency/stm.lux273
1 files changed, 0 insertions, 273 deletions
diff --git a/stdlib/source/lux/control/concurrency/stm.lux b/stdlib/source/lux/control/concurrency/stm.lux
deleted file mode 100644
index d375059a4..000000000
--- a/stdlib/source/lux/control/concurrency/stm.lux
+++ /dev/null
@@ -1,273 +0,0 @@
-(.module:
- [lux #*
- [abstract
- [functor (#+ Functor)]
- [apply (#+ Apply)]
- ["." monad (#+ Monad do)]]
- [control
- ["." io (#+ IO io)]
- ["." try]]
- [data
- ["." product]
- ["." maybe]
- [collection
- ["." list]]]
- [type
- abstract]]
- [//
- ["." atom (#+ Atom atom)]
- ["." promise (#+ Promise Resolver)]
- ["." frp (#+ Channel Sink)]])
-
-(type: (Observer a)
- (-> a (IO Any)))
-
-(abstract: #export (Var a)
- (Atom [a (List (Sink a))])
-
- {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
-
- (def: #export (var value)
- {#.doc "Creates a new STM var, with a default value."}
- (All [a] (-> a (Var a)))
- (:abstraction (atom.atom [value (list)])))
-
- (def: read!
- (All [a] (-> (Var a) a))
- (|>> :representation atom.read io.run product.left))
-
- (def: (un_follow sink var)
- (All [a] (-> (Sink a) (Var a) (IO Any)))
- (do io.monad
- [_ (atom.update (function (_ [value observers])
- [value (list.filter (|>> (is? sink) not) observers)])
- (:representation var))]
- (wrap [])))
-
- (def: (write! new_value var)
- (All [a] (-> a (Var a) (IO Any)))
- (do {! io.monad}
- [#let [var' (:representation var)]
- (^@ old [old_value observers]) (atom.read var')
- succeeded? (atom.compare_and_swap old [new_value observers] var')]
- (if succeeded?
- (do !
- [_ (monad.map ! (function (_ sink)
- (do !
- [result (\ sink feed new_value)]
- (case result
- (#try.Success _)
- (wrap [])
-
- (#try.Failure _)
- (un_follow sink var))))
- observers)]
- (wrap []))
- (write! new_value var))))
-
- (def: #export (follow target)
- {#.doc "Creates a channel that will receive all changes to the value of the given var."}
- (All [a] (-> (Var a) (IO [(Channel a) (Sink a)])))
- (do io.monad
- [#let [[channel sink] (frp.channel [])]
- _ (atom.update (function (_ [value observers])
- [value (#.Cons sink observers)])
- (:representation target))]
- (wrap [channel sink])))
- )
-
-(type: (Tx_Frame a)
- {#var (Var a)
- #original a
- #current a})
-
-(type: Tx
- (List (Ex [a] (Tx_Frame a))))
-
-(type: #export (STM a)
- {#.doc "A computation which updates a transaction and produces a value."}
- (-> Tx [Tx a]))
-
-(def: (find_var_value var tx)
- (All [a] (-> (Var a) Tx (Maybe a)))
- (|> tx
- (list.find (function (_ [_var _original _current])
- (is? (:as (Var Any) var)
- (:as (Var Any) _var))))
- (\ maybe.monad map (function (_ [_var _original _current])
- _current))
- (:assume)
- ))
-
-(def: #export (read var)
- (All [a] (-> (Var a) (STM a)))
- (function (_ tx)
- (case (find_var_value var tx)
- (#.Some value)
- [tx value]
-
- #.None
- (let [value (..read! var)]
- [(#.Cons [var value value] tx)
- value]))))
-
-(def: (update_tx_value var value tx)
- (All [a] (-> (Var a) a Tx Tx))
- (case tx
- #.Nil
- #.Nil
-
- (#.Cons [_var _original _current] tx')
- (if (is? (:as (Var Any) var)
- (:as (Var Any) _var))
- (#.Cons {#var (:as (Var Any) _var)
- #original (:as Any _original)
- #current (:as Any value)}
- tx')
- (#.Cons {#var _var
- #original _original
- #current _current}
- (update_tx_value var value tx')))))
-
-(def: #export (write value var)
- {#.doc "Writes value to var."}
- (All [a] (-> a (Var a) (STM Any)))
- (function (_ tx)
- (case (find_var_value var tx)
- (#.Some _)
- [(update_tx_value var value tx)
- []]
-
- #.None
- [(#.Cons [var (..read! var) value] tx)
- []])))
-
-(implementation: #export functor
- (Functor STM)
-
- (def: (map f fa)
- (function (_ tx)
- (let [[tx' a] (fa tx)]
- [tx' (f a)]))))
-
-(implementation: #export apply
- (Apply STM)
-
- (def: &functor ..functor)
-
- (def: (apply ff fa)
- (function (_ tx)
- (let [[tx' f] (ff tx)
- [tx'' a] (fa tx')]
- [tx'' (f a)]))))
-
-(implementation: #export monad
- (Monad STM)
-
- (def: &functor ..functor)
-
- (def: (wrap a)
- (function (_ tx)
- [tx a]))
-
- (def: (join mma)
- (function (_ tx)
- (let [[tx' ma] (mma tx)]
- (ma tx')))))
-
-(def: #export (update f var)
- {#.doc "Will update a Var's value, and return a tuple with the old and the new values."}
- (All [a] (-> (-> a a) (Var a) (STM [a a])))
- (do ..monad
- [a (..read var)
- #let [a' (f a)]
- _ (..write a' var)]
- (wrap [a a'])))
-
-(def: (can_commit? tx)
- (-> Tx Bit)
- (list.every? (function (_ [_var _original _current])
- (is? _original (..read! _var)))
- tx))
-
-(def: (commit_var! [_var _original _current])
- (-> (Ex [a] (Tx_Frame a)) (IO Any))
- (if (is? _original _current)
- (io [])
- (..write! _current _var)))
-
-(def: fresh_tx Tx (list))
-
-(type: (Commit a)
- [(STM a)
- (Promise a)
- (Resolver a)])
-
-(def: pending_commits
- (Atom (Rec Commits
- [(Promise [(Ex [a] (Commit a)) Commits])
- (Resolver [(Ex [a] (Commit a)) Commits])]))
- (atom (promise.promise [])))
-
-(def: commit_processor_flag
- (Atom Bit)
- (atom #0))
-
-(def: (issue_commit commit)
- (All [a] (-> (Commit a) (IO Any)))
- (let [entry [commit (promise.promise [])]]
- (do {! io.monad}
- [|commits|&resolve (atom.read pending_commits)]
- (loop [[|commits| resolve] |commits|&resolve]
- (do !
- [|commits| (promise.poll |commits|)]
- (case |commits|
- #.None
- (do io.monad
- [resolved? (resolve entry)]
- (if resolved?
- (atom.write (product.right entry) pending_commits)
- (recur |commits|&resolve)))
-
- (#.Some [head tail])
- (recur tail)))))))
-
-(def: (process_commit commit)
- (All [a] (-> (Commit a) (IO Any)))
- (let [[stm_proc output resolve] commit
- [finished_tx value] (stm_proc fresh_tx)]
- (if (can_commit? finished_tx)
- (do {! io.monad}
- [_ (monad.map ! commit_var! finished_tx)]
- (resolve value))
- (issue_commit commit))))
-
-(def: init_processor!
- (IO Any)
- (do {! io.monad}
- [flag (atom.read commit_processor_flag)]
- (if flag
- (wrap [])
- (do !
- [was_first? (atom.compare_and_swap flag #1 commit_processor_flag)]
- (if was_first?
- (do !
- [[promise resolve] (atom.read pending_commits)]
- (promise.await (function (recur [head [tail _resolve]])
- (do !
- [_ (process_commit head)]
- (promise.await recur tail)))
- promise))
- (wrap [])))
- )))
-
-(def: #export (commit stm_proc)
- {#.doc (doc "Commits a transaction and returns its result (asynchronously)."
- "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first."
- "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")}
- (All [a] (-> (STM a) (Promise a)))
- (let [[output resolver] (promise.promise [])]
- (exec (io.run (do io.monad
- [_ init_processor!]
- (issue_commit [stm_proc output resolver])))
- output)))