diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/library/lux/control/concurrency/atom.lux | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/stdlib/source/library/lux/control/concurrency/atom.lux b/stdlib/source/library/lux/control/concurrency/atom.lux new file mode 100644 index 000000000..057bfd5b2 --- /dev/null +++ b/stdlib/source/library/lux/control/concurrency/atom.lux @@ -0,0 +1,103 @@ +(.module: + [library + [lux #* + ["." ffi] + ["@" target] + [abstract + [monad (#+ do)]] + [control + ["." function] + ["." io (#- run) ("#\." functor)]] + [data + ["." product] + [collection + ["." array]]] + [type + abstract]]]) + +(with_expansions [<jvm> (as_is (ffi.import: (java/util/concurrent/atomic/AtomicReference a) + ["#::." + (new [a]) + (get [] a) + (compareAndSet [a a] boolean)]))] + (for {@.old <jvm> + @.jvm <jvm>} + (as_is))) + +(with_expansions [<new> (for {@.js "js array new" + @.python "python array new" + @.lua "lua array new" + @.ruby "ruby array new" + @.php "php array new" + @.scheme "scheme array new"} + (as_is)) + <write> (for {@.js "js array write" + @.python "python array write" + @.lua "lua array write" + @.ruby "ruby array write" + @.php "php array write" + @.scheme "scheme array write"} + (as_is)) + + <read> (for {@.js "js array read" + @.python "python array read" + @.lua "lua array read" + @.ruby "ruby array read" + @.php "php array read" + @.scheme "scheme array read"} + (as_is))] + (abstract: #export (Atom a) + (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)] + (for {@.old <jvm> + @.jvm <jvm>} + (array.Array a))) + + {#.doc "Atomic references that are safe to mutate concurrently."} + + (def: #export (atom value) + (All [a] (-> a (Atom a))) + (:abstraction (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::new value)] + (for {@.old <jvm> + @.jvm <jvm>} + (<write> 0 value (<new> 1)))))) + + (def: #export (read atom) + (All [a] (-> (Atom a) (IO a))) + (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (:representation atom))] + (for {@.old <jvm> + @.jvm <jvm>} + (<read> 0 (:representation atom)))))) + + (def: #export (compare_and_swap current new atom) + {#.doc (doc "Only mutates an atom if you can present its current value." + "That guarantees that atom was not updated since you last read from it.")} + (All [a] (-> a a (Atom a) (IO Bit))) + (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))] + (for {@.old <jvm> + @.jvm <jvm>} + (let [old (<read> 0 (:representation atom))] + (if (is? old current) + (exec (<write> 0 new (:representation atom)) + true) + false)))))) + )) + +(def: #export (update f atom) + {#.doc (doc "Updates an atom by applying a function to its current value." + "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds." + "The retries will be done with the new values of the atom, as they show up.")} + (All [a] (-> (-> a a) (Atom a) (IO [a a]))) + (loop [_ []] + (do io.monad + [old (read atom) + #let [new (f old)] + swapped? (..compare_and_swap old new atom)] + (if swapped? + (wrap [old new]) + (recur []))))) + +(def: #export (write value atom) + (All [a] (-> a (Atom a) (IO a))) + (|> atom + (..update (function.constant value)) + (io\map product.left))) |