aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/concurrency/stm.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r--stdlib/source/lux/concurrency/stm.lux237
1 files changed, 237 insertions, 0 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
new file mode 100644
index 000000000..80633a41e
--- /dev/null
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -0,0 +1,237 @@
+## Copyright (c) Eduardo Julian. All rights reserved.
+## This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
+## If a copy of the MPL was not distributed with this file,
+## You can obtain one at http://mozilla.org/MPL/2.0/.
+
+(;module:
+ lux
+ (lux (control functor
+ applicative
+ monad)
+ (codata [io #- run])
+ (data (struct [list #* "List/" Functor<List>]
+ [dict #+ Dict])
+ [product]
+ [text]
+ text/format)
+ host
+ [compiler]
+ (macro [ast]
+ ["s" syntax #+ syntax: Syntax])
+ (concurrency [atom #+ Atom atom]
+ [promise #+ Promise "Promise/" Monad<Promise>]
+ [frp])
+ ))
+
+(type: (Var-State a)
+ {#value a
+ #observers (Dict Text (-> a (IO Unit)))})
+
+(type: #export (Var a)
+ (Atom (Var-State a)))
+
+(type: (Tx-Frame a)
+ {#var (Var a)
+ #original a
+ #current a})
+
+(type: Tx
+ (List (Ex [a] (Tx-Frame a))))
+
+(type: #export (STM a)
+ (-> Tx [Tx a]))
+
+(def: #export (var value)
+ (All [a] (-> a (Var a)))
+ (atom;atom {#value value
+ #observers (dict;new text;Hash<Text>)}))
+
+(def: raw-read
+ (All [a] (-> (Var a) a))
+ (|>. atom;get io;run (get@ #value)))
+
+(def: (find-var-value var tx)
+ (All [a] (-> (Var a) Tx (Maybe a)))
+ (:! (Maybe ($ 0))
+ (find (: (-> (Ex [a] (Tx-Frame a))
+ (Maybe Unit))
+ (lambda [[_var _original _current]]
+ (:! (Maybe Unit)
+ (if (== (:! (Var Unit) var)
+ (:! (Var Unit) _var))
+ (#;Some _current)
+ #;None))))
+ tx)))
+
+(def: #export (read var)
+ (All [a] (-> (Var a) (STM a)))
+ (lambda [tx]
+ (case (find-var-value var tx)
+ (#;Some value)
+ [tx value]
+
+ #;None
+ (let [value (raw-read var)]
+ [(#;Cons [var value value] tx)
+ value]))))
+
+(def: #export (read! var)
+ {#;doc "Reads var immediately, without going through a transaction."}
+ (All [a] (-> (Var a) (IO a)))
+ (|> var
+ atom;get
+ (:: Functor<IO> map (get@ #value))))
+
+(def: (update-tx-value var value tx)
+ (All [a] (-> (Var a) a Tx Tx))
+ (case tx
+ #;Nil
+ #;Nil
+
+ (#;Cons [_var _original _current] tx')
+ (if (== (:! (Var ($ 0)) var)
+ (:! (Var ($ 0)) _var))
+ (#;Cons [(:! (Var ($ 0)) _var)
+ (:! ($ 0) _original)
+ (:! ($ 0) _current)]
+ tx')
+ (#;Cons [_var _original _current]
+ (update-tx-value var value tx')))
+ ))
+
+(def: #export (write value var)
+ (All [a] (-> a (Var a) (STM Unit)))
+ (lambda [tx]
+ (case (find-var-value var tx)
+ (#;Some _)
+ [(update-tx-value var value tx)
+ []]
+
+ #;None
+ [(#;Cons [var (raw-read var) value] tx)
+ []])))
+
+(def: #export (write! new-value var)
+ {#;doc "Writes value to var immediately, without going through a transaction."}
+ (All [a] (-> a (Var a) (IO Unit)))
+ (do Monad<IO>
+ [old (atom;get var)
+ #let [old-value (get@ #value old)
+ new (set@ #value new-value old)]
+ succeeded? (atom;compare-and-swap old new var)]
+ (if succeeded?
+ (do @
+ [_ (|> old
+ (get@ #observers)
+ dict;values
+ (mapM @ (lambda [f] (f new-value))))]
+ (wrap []))
+ (write! new-value var))))
+
+(def: #export (unfollow label target)
+ (All [a] (-> Text (Var a) (IO Unit)))
+ (do Monad<IO>
+ [[value observers] (atom;get target)]
+ (atom;set [value (dict;remove label observers)]
+ target)))
+
+(def: #export (follow label target)
+ {#;doc "Creates a channel (identified by a given text) that will receive all changes to the value of the given var."}
+ (All [a] (-> Text (Var a) (IO (frp;Chan a))))
+ (let [head (frp;chan ($ 0))
+ chan-var (var head)
+ observer (lambda [value]
+ (case (io;run (|> chan-var raw-read (frp;write value)))
+ #;None
+ ## By closing the output Chan, the
+ ## observer becomes obsolete.
+ (unfollow label chan-var)
+
+ (#;Some tail')
+ (write! tail' chan-var)))]
+ (do Monad<IO>
+ [_ (atom;update (lambda [[value observers]]
+ [value (dict;put label observer observers)])
+ target)]
+ (wrap head))))
+
+(struct: #export _ (Functor STM)
+ (def: (map f fa)
+ (lambda [tx]
+ (let [[tx' a] (fa tx)]
+ [tx' (f a)]))))
+
+(struct: #export _ (Applicative STM)
+ (def: functor Functor<STM>)
+
+ (def: (wrap a)
+ (lambda [tx] [tx a]))
+
+ (def: (apply ff fa)
+ (lambda [tx]
+ (let [[tx' f] (ff tx)
+ [tx'' a] (fa tx')]
+ [tx'' (f a)]))))
+
+(struct: #export _ (Monad STM)
+ (def: applicative Applicative<STM>)
+
+ (def: (join mma)
+ (lambda [tx]
+ (let [[tx' ma] (mma tx)]
+ (ma tx')))))
+
+(def: #export (update! f var)
+ (All [a] (-> (-> a a) (Var a) (Promise [a a])))
+ (promise;future (io (loop [_ []]
+ (let [(^@ state [value observers]) (io;run (atom;get var))
+ value' (f value)]
+ (if (io;run (atom;compare-and-swap state
+ [value' observers]
+ var))
+ [value value']
+ (recur [])))))))
+
+(def: #export (update f var)
+ (All [a] (-> (-> a a) (Var a) (STM [a a])))
+ (do Monad<STM>
+ [a (read var)
+ #let [a' (f a)]
+ _ (write a' var)]
+ (wrap [a a'])))
+
+(def: (can-commit? tx)
+ (-> Tx Bool)
+ (every? (lambda [[_var _original _current]]
+ (== _original (raw-read _var)))
+ tx))
+
+(def: (commit-var [_var _original _current])
+ (-> (Ex [a] (Tx-Frame a)) Unit)
+ (if (== _original _current)
+ []
+ (io;run (write! _current _var))))
+
+(def: fresh-tx Tx (list))
+
+(def: (commit' output stm-proc)
+ (All [a] (-> (Promise a) (STM a) (Promise Unit)))
+ (promise;future (io (let [[finished-tx value] (stm-proc fresh-tx)]
+ (if (can-commit? finished-tx)
+ (exec (List/map commit-var finished-tx)
+ (io;run (promise;resolve value output))
+ [])
+ (exec (commit' output stm-proc)
+ []))
+ ))))
+
+(def: #export (commit stm-proc)
+ {#;doc "Commits a transaction and returns its result (asynchronously).
+
+ Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first.
+
+ For this reason, it's important to note that transactions must be free from side-effects, such as I/O."}
+ (All [a] (-> (STM a) (Promise a)))
+ (let [output (promise;promise)]
+ (exec (commit' output stm-proc)
+ output)))