(.using [library [lux {"-" and or} [abstract [functor {"+" Functor}] [apply {"+" Apply}] ["[0]" monad {"+" Monad do}]] [control ["[0]" pipe] ["[0]" function] ["[0]" io {"+" IO io}] ["[0]" maybe ("[1]#[0]" functor)]] [data ["[0]" product]] [macro ["^" pattern]] [type {"+" sharing} [abstract {"-" pattern}] ["[0]" variance {"+" Mutable}]]]] [// ["[0]" thread] ["[0]" atom {"+" Atom atom}]]) (abstract: .public (Async'' a) (Atom [(Maybe a) (List (-> a (IO Any)))]) (type: .public (Async' r w) (Async'' (Mutable r w))) (type: .public (Async a) (Async'' (Mutable a a))) (type: .public (Resolver w) (-> w (IO Bit))) ... Sets an async's value if it has not been done yet. (def: (resolver async) (All (_ r w) (-> (Async' r w) (Resolver w))) (function (resolve value) (let [async (representation async)] (do [! io.monad] [(^.let old [_value _observers]) (atom.read! async)] (case _value {.#Some _} (in #0) {.#None} (do ! [succeeded? (atom.compare_and_swap! old [{.#Some (variance.write value)} (list)] async)] (if succeeded? (do ! [_ (monad.each ! (function.on (variance.write value)) _observers)] (in #1)) (resolve value)))))))) (def: .public (resolved value) (All (_ a) (-> a (Async a))) (abstraction (atom [{.#Some (variance.write value)} (list)]))) (def: .public (async _) (All (_ r w) (-> Any [(Async' r w) (Resolver w)])) (let [async (abstraction (atom [{.#None} (list)]))] [async (..resolver async)])) (def: .public value (All (_ r w) (-> (Async' r w) (IO (Maybe r)))) (|>> representation atom.read! (# io.functor each (|>> product.left (maybe#each (|>> variance.read)))))) (def: .public (upon! f async) (All (_ r w) (-> (-> r (IO Any)) (Async' r w) (IO Any))) (do [! io.monad] [.let [async (representation async)] (^.let old [_value _observers]) (atom.read! async)] (case _value {.#Some value} (f (variance.read value)) {.#None} (do ! [swapped? (atom.compare_and_swap! old [_value {.#Item (|>> variance.read f) _observers}] async)] (if swapped? (in []) (upon! f (abstraction async))))))) ) (def: .public resolved? (All (_ r w) (-> (Async' r w) (IO Bit))) (|>> ..value (# io.functor each (|>> (pipe.case {.#None} #0 {.#Some _} #1))))) (implementation: .public functor (Functor Async) (def: (each f fa) (let [[fb resolve] (sharing [a b] (-> a b) f [(Async b) (Resolver b)] (..async []))] (exec (io.run! (..upon! (|>> f resolve) fa)) fb)))) (implementation: .public apply (Apply Async) (def: functor ..functor) (def: (on fa ff) (let [[fb resolve] (sharing [a b] (Async (-> a b)) ff [(Async b) (Resolver b)] (..async []))] (exec (io.run! (..upon! (function (_ f) (..upon! (|>> f resolve) fa)) ff)) fb)))) (implementation: .public monad (Monad Async) (def: functor ..functor) (def: in ..resolved) (def: (conjoint mma) (let [[ma resolve] (sharing [a] (Async (Async a)) mma [(Async a) (Resolver a)] (..async []))] (exec (io.run! (..upon! (..upon! resolve) mma)) ma)))) (def: .public (and left right) (All (_ lr lw rr rw) (-> (Async' lr lw) (Async' rr rw) (Async [lr rr]))) (let [[read! write!] (sharing [lr lw rr rw] [(Async' lr lw) (Async' rr rw)] [left right] [(Async [lr rr]) (Resolver [lr rr])] (..async [])) _ (io.run! (..upon! (function (_ left) (..upon! (function (_ right) (write! [left right])) right)) left))] read!)) (def: .public (or left right) (All (_ lr lw rr rw) (-> (Async' lr lw) (Async' rr rw) (Async (Or lr rr)))) (let [[left|right resolve] (sharing [lr lw rr rw] [(Async' lr lw) (Async' rr rw)] [left right] [(Async (Or lr rr)) (Resolver (Or lr rr))] (..async []))] (with_expansions [ (template [ ] [(io.run! (upon! (|>> {} resolve) ))] [left .#Left] [right .#Right] )] (exec left|right)))) (def: .public (either left right) (All (_ a lw rw) (-> (Async' a lw) (Async' a rw) (Async a))) (let [[left||right resolve] (sharing [a lw rw] [(Async' a lw) (Async' a rw)] [left right] [(Async a) (Resolver a)] (..async []))] (`` (exec (~~ (template [] [(io.run! (upon! resolve ))] [left] [right])) left||right)))) (def: .public (schedule! milli_seconds computation) (All (_ a) (-> Nat (IO a) (Async a))) (let [[!out resolve] (sharing [a] (IO a) computation [(Async a) (Resolver a)] (..async []))] (exec (|> (do io.monad [value computation] (resolve value)) (thread.schedule! milli_seconds) io.run!) !out))) (def: .public future (All (_ a) (-> (IO a) (Async a))) (..schedule! 0)) (def: .public (after milli_seconds value) (All (_ a) (-> Nat a (Async a))) (..schedule! milli_seconds (io value))) (def: .public (delay milli_seconds) (-> Nat (Async Any)) (..after milli_seconds [])) (def: .public (within milli_seconds async) (All (_ r w) (-> Nat (Async' r w) (Async (Maybe r)))) (..or (..delay milli_seconds) async))