(.using [library [lux (.except list only) [abstract [equivalence (.only Equivalence)] [functor (.only Functor)] [apply (.only Apply)] ["[0]" monad (.only Monad do)]] [control ["[0]" maybe (.open: "[1]#[0]" functor)] ["[0]" try (.only Try)] ["[0]" exception (.only exception:)] ["[0]" io (.only IO io)]] [type (.only sharing) ["[0]" variance (.only Mutable)]]]] [// ["[0]" atom (.only Atom)] ["[0]" async (.only Async Async') (.open: "[1]#[0]" monad)]]) (type: .public (Channel'' a) (Async (Maybe [a (Channel'' a)]))) (type: .public (Channel' r w) (Channel'' (Mutable r w))) (type: .public (Channel a) (Channel'' (Mutable a a))) (exception: .public channel_is_already_closed) (type: .public (Sink w) (Interface (is (IO (Try Any)) close) (is (-> w (IO (Try Any))) feed))) (def: (sink resolution) (All (_ a) (-> (async.Resolver (Maybe [(Mutable a a) (Channel a)])) (Sink a))) (let [sink (atom.atom resolution)] (implementation (def: close (loop (again [_ []]) (do [! io.monad] [current (atom.read! sink) stopped? (current {.#None})] (if stopped? ... I closed the sink. (in {try.#Success []}) ... Someone else interacted with the sink. (do ! [latter (atom.read! sink)] (if (same? current latter) ... Someone else closed the sink. (in (exception.except ..channel_is_already_closed [])) ... Someone else fed the sink while I was closing it. (again []))))))) (def: (feed value) (loop (again [_ []]) (do [! io.monad] [current (atom.read! sink) .let [[next resolve_next] (sharing [a] (async.Resolver (Maybe [(Mutable a a) (Channel a)])) current [(Channel a) (async.Resolver (Maybe [(Mutable a a) (Channel a)]))] (async.async []))] fed? (current {.#Some [(variance.write value) next]})] (if fed? ... I fed the sink. (do ! [_ (atom.compare_and_swap! current resolve_next sink)] (in {try.#Success []})) ... Someone else interacted with the sink. (do ! [latter (atom.read! sink)] (if (same? current latter) ... Someone else closed the sink while I was feeding it. (in (exception.except ..channel_is_already_closed [])) ... Someone else fed the sink. (again [])))))))))) (def: .public (channel _) (All (_ a) (-> Any [(Channel a) (Sink a)])) (let [[async resolve] (async.async [])] [async (..sink resolve)])) (def: .public functor (Functor Channel) (implementation (def: (each f) (async#each (maybe#each (function (_ [head tail]) [(variance.write (f (variance.read head))) (each f tail)])))))) (def: .public apply (Apply Channel) (implementation (def: functor ..functor) (def: (on fa ff) (do async.monad [item_f ff item_a fa] (case [item_f item_a] [{.#Some [head_f tail_f]} {.#Some [head_a tail_a]}] (in {.#Some [(variance.write ((variance.read head_f) (variance.read head_a))) (on tail_a tail_f)]}) _ (in {.#None})))))) (def: empty Channel (async.resolved {.#None})) (def: .public monad (Monad Channel) (implementation (def: functor ..functor) (def: (in a) (async.resolved {.#Some [(variance.write a) ..empty]})) (def: (conjoint mma) (let [[output sink] (sharing [a] (Channel (Channel a)) mma [(Channel a) (Sink a)] (channel []))] (exec (is (Async Any) (loop (again [mma mma]) (do [! async.monad] [?mma mma] (case ?mma {.#Some [ma mma']} (do ! [_ (loop (again [ma ma]) (do ! [?ma (variance.read ma)] (case ?ma {.#Some [a ma']} (exec (io.run! (at sink feed (variance.read a))) (again (variance.write ma'))) {.#None} (in []))))] (again mma')) {.#None} (in (is Any (io.run! (at sink close)))))))) output))))) (type: .public (Subscriber a) (-> a (IO (Maybe Any)))) (def: .public (subscribe! subscriber channel) (All (_ r w) (-> (Subscriber r) (Channel' r w) (IO Any))) (io (exec (is (Async Any) (loop (again [channel channel]) (do async.monad [item channel] (case item {.#Some [head tail]} (case (io.run! (subscriber (variance.read head))) {.#Some _} (again tail) {.#None} (in [])) {.#None} (in []))))) []))) (def: .public (only pass? channel) (All (_ a) (-> (-> a Bit) (Channel a) (Channel a))) (do async.monad [item channel] (case item {.#Some [head tail]} (let [tail' (only pass? tail)] (if (pass? (variance.read head)) (in {.#Some [head tail']}) tail')) {.#None} (in {.#None})))) (def: .public (of_async async) (All (_ a) (-> (Async a) (Channel a))) (async#each (function (_ value) {.#Some [(variance.write value) ..empty]}) async)) (def: .public (mix f init channel) (All (_ a b) (-> (-> b a (Async a)) a (Channel b) (Async a))) (do [! async.monad] [item channel] (case item {.#None} (in init) {.#Some [head tail]} (do ! [init' (f (variance.read head) init)] (mix f init' tail))))) (def: .public (mixes f init channel) (All (_ a b) (-> (-> b a (Async a)) a (Channel b) (Channel a))) (<| async#in {.#Some} [(variance.write init)] (do [! async.monad] [item channel] (case item {.#None} (in {.#None}) {.#Some [head tail]} (do ! [init' (f (variance.read head) init)] (mixes f init' tail)))))) (def: .public (poll milli_seconds action) (All (_ a) (-> Nat (IO a) [(Channel a) (Sink a)])) (let [[output sink] (channel [])] (exec (io.run! (loop (again [_ []]) (do io.monad [value action _ (at sink feed value)] (async.upon! again (async.delay milli_seconds))))) [output sink]))) (def: .public (periodic milli_seconds) (-> Nat [(Channel Any) (Sink Any)]) (..poll milli_seconds (io []))) (def: .public (iterations f init) (All (_ s o) (-> (-> s (Async (Maybe [s o]))) s (Channel o))) (do async.monad [?next (f init)] (in (case ?next {.#Some [state output]} {.#Some [(variance.write output) (iterations f state)]} {.#None} {.#None})))) (def: (distinct' equivalence previous channel) (All (_ a) (-> (Equivalence a) a (Channel a) (Channel a))) (do async.monad [item channel] (case item {.#Some [head tail]} (if (at equivalence = previous (variance.read head)) (distinct' equivalence previous tail) (in {.#Some [head (distinct' equivalence (variance.read head) tail)]})) {.#None} (in {.#None})))) (def: .public (distinct equivalence channel) (All (_ a) (-> (Equivalence a) (Channel a) (Channel a))) (do async.monad [item channel] (in (case item {.#Some [head tail]} {.#Some [head (distinct' equivalence (variance.read head) tail)]} {.#None} {.#None})))) (def: .public (list channel) (All (_ a) (-> (Channel a) (Async (List a)))) (do [! async.monad] [item channel] (case item {.#Some [head tail]} (at ! each (|>> {.#Item (variance.read head)}) (list tail)) {.#None} (in {.#End})))) (def: .public (sequential milli_seconds values) (All (_ a) (-> Nat (List a) (Channel a))) (case values {.#End} ..empty {.#Item head tail} (async.resolved {.#Some [(variance.write head) (do async.monad [_ (async.delay milli_seconds)] (sequential milli_seconds tail))]})))