aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/atom.lux
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/library/lux/control/concurrency/atom.lux103
1 files changed, 103 insertions, 0 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/atom.lux b/stdlib/source/library/lux/control/concurrency/atom.lux
new file mode 100644
index 000000000..057bfd5b2
--- /dev/null
+++ b/stdlib/source/library/lux/control/concurrency/atom.lux
@@ -0,0 +1,103 @@
+(.module:
+ [library
+ [lux #*
+ ["." ffi]
+ ["@" target]
+ [abstract
+ [monad (#+ do)]]
+ [control
+ ["." function]
+ ["." io (#- run) ("#\." functor)]]
+ [data
+ ["." product]
+ [collection
+ ["." array]]]
+ [type
+ abstract]]])
+
+(with_expansions [<jvm> (as_is (ffi.import: (java/util/concurrent/atomic/AtomicReference a)
+ ["#::."
+ (new [a])
+ (get [] a)
+ (compareAndSet [a a] boolean)]))]
+ (for {@.old <jvm>
+ @.jvm <jvm>}
+ (as_is)))
+
+(with_expansions [<new> (for {@.js "js array new"
+ @.python "python array new"
+ @.lua "lua array new"
+ @.ruby "ruby array new"
+ @.php "php array new"
+ @.scheme "scheme array new"}
+ (as_is))
+ <write> (for {@.js "js array write"
+ @.python "python array write"
+ @.lua "lua array write"
+ @.ruby "ruby array write"
+ @.php "php array write"
+ @.scheme "scheme array write"}
+ (as_is))
+
+ <read> (for {@.js "js array read"
+ @.python "python array read"
+ @.lua "lua array read"
+ @.ruby "ruby array read"
+ @.php "php array read"
+ @.scheme "scheme array read"}
+ (as_is))]
+ (abstract: #export (Atom a)
+ (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)]
+ (for {@.old <jvm>
+ @.jvm <jvm>}
+ (array.Array a)))
+
+ {#.doc "Atomic references that are safe to mutate concurrently."}
+
+ (def: #export (atom value)
+ (All [a] (-> a (Atom a)))
+ (:abstraction (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::new value)]
+ (for {@.old <jvm>
+ @.jvm <jvm>}
+ (<write> 0 value (<new> 1))))))
+
+ (def: #export (read atom)
+ (All [a] (-> (Atom a) (IO a)))
+ (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (:representation atom))]
+ (for {@.old <jvm>
+ @.jvm <jvm>}
+ (<read> 0 (:representation atom))))))
+
+ (def: #export (compare_and_swap current new atom)
+ {#.doc (doc "Only mutates an atom if you can present its current value."
+ "That guarantees that atom was not updated since you last read from it.")}
+ (All [a] (-> a a (Atom a) (IO Bit)))
+ (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))]
+ (for {@.old <jvm>
+ @.jvm <jvm>}
+ (let [old (<read> 0 (:representation atom))]
+ (if (is? old current)
+ (exec (<write> 0 new (:representation atom))
+ true)
+ false))))))
+ ))
+
+(def: #export (update f atom)
+ {#.doc (doc "Updates an atom by applying a function to its current value."
+ "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds."
+ "The retries will be done with the new values of the atom, as they show up.")}
+ (All [a] (-> (-> a a) (Atom a) (IO [a a])))
+ (loop [_ []]
+ (do io.monad
+ [old (read atom)
+ #let [new (f old)]
+ swapped? (..compare_and_swap old new atom)]
+ (if swapped?
+ (wrap [old new])
+ (recur [])))))
+
+(def: #export (write value atom)
+ (All [a] (-> a (Atom a) (IO a)))
+ (|> atom
+ (..update (function.constant value))
+ (io\map product.left)))