(.module: [lux #* [abstract [functor (#+ Functor)] [apply (#+ Apply)] ["." monad (#+ do Monad)]] [control ["." io (#+ IO io)]] [data ["." product] ["." maybe] [collection ["." list]]] [type abstract]] [// ["." atom (#+ Atom atom)] ["." promise (#+ Promise Resolver)] ["." frp]]) (type: #export (Observer a) (-> a (IO Any))) (abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} (Atom [a (List (Observer a))]) (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) (:abstraction (atom.atom [value (list)]))) (def: read!! (All [a] (-> (Var a) a)) (|>> :representation atom.read io.run product.left)) (def: #export (read! (^:representation var)) {#.doc "Reads var immediately, without going through a transaction."} (All [a] (-> (Var a) (IO a))) (|> var atom.read (:: io.functor map product.left))) (def: (write! new-value (^:representation var)) (All [a] (-> a (Var a) (IO Any))) (do io.monad [(^@ old [_value _observers]) (atom.read var) succeeded? (atom.compare-and-swap old [new-value _observers] var)] (if succeeded? (do @ [_ (monad.map @ (function (_ f) (f new-value)) _observers)] (wrap [])) (write! new-value (:abstraction var))))) (def: #export (follow target) {#.doc "Creates a channel that will receive all changes to the value of the given var."} (All [a] (-> (Var a) (IO (frp.Channel a)))) (do io.monad [#let [[channel source] (frp.channel []) target (:representation target)] _ (atom.update (function (_ [value observers]) [value (#.Cons (:: source feed) observers)]) target)] (wrap channel))) ) (type: (Tx-Frame a) {#var (Var a) #original a #current a}) (type: Tx (List (Ex [a] (Tx-Frame a)))) (type: #export (STM a) {#.doc "A computation which updates a transaction and produces a value."} (-> Tx [Tx a])) (def: (find-var-value var tx) (All [a] (-> (Var a) Tx (Maybe a))) (|> tx (list.find (function (_ [_var _original _current]) (is? (:coerce (Var Any) var) (:coerce (Var Any) _var)))) (:: maybe.monad map (function (_ [_var _original _current]) _current)) (:assume) )) (def: #export (read var) (All [a] (-> (Var a) (STM a))) (function (_ tx) (case (find-var-value var tx) (#.Some value) [tx value] #.None (let [value (read!! var)] [(#.Cons [var value value] tx) value])))) (def: (update-tx-value var value tx) (All [a] (-> (Var a) a Tx Tx)) (case tx #.Nil #.Nil (#.Cons [_var _original _current] tx') (if (is? (:coerce (Var Any) var) (:coerce (Var Any) _var)) (#.Cons {#var (:coerce (Var Any) _var) #original (:coerce Any _original) #current (:coerce Any value)} tx') (#.Cons {#var _var #original _original #current _current} (update-tx-value var value tx'))) )) (def: #export (write value var) {#.doc "Writes value to var."} (All [a] (-> a (Var a) (STM Any))) (function (_ tx) (case (find-var-value var tx) (#.Some _) [(update-tx-value var value tx) []] #.None [(#.Cons [var (read!! var) value] tx) []]))) (structure: #export functor (Functor STM) (def: (map f fa) (function (_ tx) (let [[tx' a] (fa tx)] [tx' (f a)])))) (structure: #export apply (Apply STM) (def: &functor ..functor) (def: (apply ff fa) (function (_ tx) (let [[tx' f] (ff tx) [tx'' a] (fa tx')] [tx'' (f a)])))) (structure: #export monad (Monad STM) (def: &functor ..functor) (def: (wrap a) (function (_ tx) [tx a])) (def: (join mma) (function (_ tx) (let [[tx' ma] (mma tx)] (ma tx'))))) (def: #export (update f var) {#.doc "Will update a Var's value, and return a tuple with the old and the new values."} (All [a] (-> (-> a a) (Var a) (STM [a a]))) (do ..monad [a (read var) #let [a' (f a)] _ (write a' var)] (wrap [a a']))) (def: (can-commit? tx) (-> Tx Bit) (list.every? (function (_ [_var _original _current]) (is? _original (read!! _var))) tx)) (def: (commit-var! [_var _original _current]) (-> (Ex [a] (Tx-Frame a)) (IO Any)) (if (is? _original _current) (io []) (write! _current _var))) (def: fresh-tx Tx (list)) (type: (Commit a) [(STM a) (Promise a) (Resolver a)]) (def: pending-commits (Atom (Rec Commits [(Promise [(Ex [a] (Commit a)) Commits]) (Resolver [(Ex [a] (Commit a)) Commits])])) (atom (promise.promise []))) (def: commit-processor-flag (Atom Bit) (atom #0)) (def: (issue-commit commit) (All [a] (-> (Commit a) (IO Any))) (let [entry [commit (promise.promise [])]] (do io.monad [|commits|&resolve (atom.read pending-commits)] (loop [[|commits| resolve] |commits|&resolve] (do @ [|commits| (promise.poll |commits|)] (case |commits| #.None (do io.monad [resolved? (resolve entry)] (if resolved? (atom.write (product.right entry) pending-commits) (recur |commits|&resolve))) (#.Some [head tail]) (recur tail))))))) (def: (process-commit commit) (All [a] (-> (Commit a) (IO Any))) (let [[stm-proc output resolve] commit [finished-tx value] (stm-proc fresh-tx)] (if (can-commit? finished-tx) (do io.monad [_ (monad.map @ commit-var! finished-tx)] (resolve value)) (issue-commit commit)))) (def: init-processor! (IO Any) (do io.monad [flag (atom.read commit-processor-flag)] (if flag (wrap []) (do @ [was-first? (atom.compare-and-swap flag #1 commit-processor-flag)] (if was-first? (do @ [[promise resolve] (atom.read pending-commits)] (promise.await (function (recur [head [tail _resolve]]) (do @ [_ (process-commit head)] (promise.await recur tail))) promise)) (wrap []))) ))) (def: #export (commit stm-proc) {#.doc (doc "Commits a transaction and returns its result (asynchronously)." "Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first." "For this reason, it's important to note that transactions must be free from side-effects, such as I/O.")} (All [a] (-> (STM a) (Promise a))) (let [[output resolver] (promise.promise [])] (exec (io.run (do io.monad [_ init-processor!] (issue-commit [stm-proc output resolver]))) output)))