diff options
Diffstat (limited to 'stdlib/source/lux/concurrency/stm.lux')
-rw-r--r-- | stdlib/source/lux/concurrency/stm.lux | 237 |
1 files changed, 237 insertions, 0 deletions
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux new file mode 100644 index 000000000..80633a41e --- /dev/null +++ b/stdlib/source/lux/concurrency/stm.lux @@ -0,0 +1,237 @@ +## Copyright (c) Eduardo Julian. All rights reserved. +## This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +## If a copy of the MPL was not distributed with this file, +## You can obtain one at http://mozilla.org/MPL/2.0/. + +(;module: + lux + (lux (control functor + applicative + monad) + (codata [io #- run]) + (data (struct [list #* "List/" Functor<List>] + [dict #+ Dict]) + [product] + [text] + text/format) + host + [compiler] + (macro [ast] + ["s" syntax #+ syntax: Syntax]) + (concurrency [atom #+ Atom atom] + [promise #+ Promise "Promise/" Monad<Promise>] + [frp]) + )) + +(type: (Var-State a) + {#value a + #observers (Dict Text (-> a (IO Unit)))}) + +(type: #export (Var a) + (Atom (Var-State a))) + +(type: (Tx-Frame a) + {#var (Var a) + #original a + #current a}) + +(type: Tx + (List (Ex [a] (Tx-Frame a)))) + +(type: #export (STM a) + (-> Tx [Tx a])) + +(def: #export (var value) + (All [a] (-> a (Var a))) + (atom;atom {#value value + #observers (dict;new text;Hash<Text>)})) + +(def: raw-read + (All [a] (-> (Var a) a)) + (|>. atom;get io;run (get@ #value))) + +(def: (find-var-value var tx) + (All [a] (-> (Var a) Tx (Maybe a))) + (:! (Maybe ($ 0)) + (find (: (-> (Ex [a] (Tx-Frame a)) + (Maybe Unit)) + (lambda [[_var _original _current]] + (:! (Maybe Unit) + (if (== (:! (Var Unit) var) + (:! (Var Unit) _var)) + (#;Some _current) + #;None)))) + tx))) + +(def: #export (read var) + (All [a] (-> (Var a) (STM a))) + (lambda [tx] + (case (find-var-value var tx) + (#;Some value) + [tx value] + + #;None + (let [value (raw-read var)] + [(#;Cons [var value value] tx) + value])))) + +(def: #export (read! var) + {#;doc "Reads var immediately, without going through a transaction."} + (All [a] (-> (Var a) (IO a))) + (|> var + atom;get + (:: Functor<IO> map (get@ #value)))) + +(def: (update-tx-value var value tx) + (All [a] (-> (Var a) a Tx Tx)) + (case tx + #;Nil + #;Nil + + (#;Cons [_var _original _current] tx') + (if (== (:! (Var ($ 0)) var) + (:! (Var ($ 0)) _var)) + (#;Cons [(:! (Var ($ 0)) _var) + (:! ($ 0) _original) + (:! ($ 0) _current)] + tx') + (#;Cons [_var _original _current] + (update-tx-value var value tx'))) + )) + +(def: #export (write value var) + (All [a] (-> a (Var a) (STM Unit))) + (lambda [tx] + (case (find-var-value var tx) + (#;Some _) + [(update-tx-value var value tx) + []] + + #;None + [(#;Cons [var (raw-read var) value] tx) + []]))) + +(def: #export (write! new-value var) + {#;doc "Writes value to var immediately, without going through a transaction."} + (All [a] (-> a (Var a) (IO Unit))) + (do Monad<IO> + [old (atom;get var) + #let [old-value (get@ #value old) + new (set@ #value new-value old)] + succeeded? (atom;compare-and-swap old new var)] + (if succeeded? + (do @ + [_ (|> old + (get@ #observers) + dict;values + (mapM @ (lambda [f] (f new-value))))] + (wrap [])) + (write! new-value var)))) + +(def: #export (unfollow label target) + (All [a] (-> Text (Var a) (IO Unit))) + (do Monad<IO> + [[value observers] (atom;get target)] + (atom;set [value (dict;remove label observers)] + target))) + +(def: #export (follow label target) + {#;doc "Creates a channel (identified by a given text) that will receive all changes to the value of the given var."} + (All [a] (-> Text (Var a) (IO (frp;Chan a)))) + (let [head (frp;chan ($ 0)) + chan-var (var head) + observer (lambda [value] + (case (io;run (|> chan-var raw-read (frp;write value))) + #;None + ## By closing the output Chan, the + ## observer becomes obsolete. + (unfollow label chan-var) + + (#;Some tail') + (write! tail' chan-var)))] + (do Monad<IO> + [_ (atom;update (lambda [[value observers]] + [value (dict;put label observer observers)]) + target)] + (wrap head)))) + +(struct: #export _ (Functor STM) + (def: (map f fa) + (lambda [tx] + (let [[tx' a] (fa tx)] + [tx' (f a)])))) + +(struct: #export _ (Applicative STM) + (def: functor Functor<STM>) + + (def: (wrap a) + (lambda [tx] [tx a])) + + (def: (apply ff fa) + (lambda [tx] + (let [[tx' f] (ff tx) + [tx'' a] (fa tx')] + [tx'' (f a)])))) + +(struct: #export _ (Monad STM) + (def: applicative Applicative<STM>) + + (def: (join mma) + (lambda [tx] + (let [[tx' ma] (mma tx)] + (ma tx'))))) + +(def: #export (update! f var) + (All [a] (-> (-> a a) (Var a) (Promise [a a]))) + (promise;future (io (loop [_ []] + (let [(^@ state [value observers]) (io;run (atom;get var)) + value' (f value)] + (if (io;run (atom;compare-and-swap state + [value' observers] + var)) + [value value'] + (recur []))))))) + +(def: #export (update f var) + (All [a] (-> (-> a a) (Var a) (STM [a a]))) + (do Monad<STM> + [a (read var) + #let [a' (f a)] + _ (write a' var)] + (wrap [a a']))) + +(def: (can-commit? tx) + (-> Tx Bool) + (every? (lambda [[_var _original _current]] + (== _original (raw-read _var))) + tx)) + +(def: (commit-var [_var _original _current]) + (-> (Ex [a] (Tx-Frame a)) Unit) + (if (== _original _current) + [] + (io;run (write! _current _var)))) + +(def: fresh-tx Tx (list)) + +(def: (commit' output stm-proc) + (All [a] (-> (Promise a) (STM a) (Promise Unit))) + (promise;future (io (let [[finished-tx value] (stm-proc fresh-tx)] + (if (can-commit? finished-tx) + (exec (List/map commit-var finished-tx) + (io;run (promise;resolve value output)) + []) + (exec (commit' output stm-proc) + [])) + )))) + +(def: #export (commit stm-proc) + {#;doc "Commits a transaction and returns its result (asynchronously). + + Note that a transaction may be re-run an indeterminate number of times if other transactions involving the same variables successfully commit first. + + For this reason, it's important to note that transactions must be free from side-effects, such as I/O."} + (All [a] (-> (STM a) (Promise a))) + (let [output (promise;promise)] + (exec (commit' output stm-proc) + output))) |