(.require [library [lux (.except try) [abstract [functor (.only Functor)] [monad (.only Monad do)]] [control ["[0]" try (.only Try) (.use "[1]#[0]" monad)] ["[0]" exception (.only Exception)]]]] [// ["[0]" async (.only Async) (.use "[1]#[0]" monad)] ["[0]" frp]]) (type .public (Process a) (Async (Try a))) (type .public Channel' frp.Channel') (type .public Channel frp.Channel) (type .public Sink frp.Sink) (def .public channel (All (_ a) (-> Any [(Channel a) (Sink a)])) frp.channel) (def .public functor (Functor Process) (implementation (def (each $) (async#each (try#each $))))) (def .public monad (Monad Process) (implementation (def functor ..functor) (def in (|>> try#in async#in)) (def (conjoint atatx) (do async.monad [tatx atatx] (when tatx {try.#Success atx} atx {try.#Failure error} (in {try.#Failure error})))))) (exception.def .public channel_has_been_closed) (def .public (read it) (All (_ r w) (-> (Channel' r w) (Process [r (Channel' r w)]))) (let [[output resolver] (async.async [])] (exec (async.future (async.upon! (function (_ head,tail) (resolver (when head,tail {.#Some [head tail]} {try.#Success [head tail]} {.#None} (exception.except ..channel_has_been_closed [])))) it)) output))) (def .public (write value sink) (All (_ w) (-> w (Sink w) (Process Any))) (async.future (of sink feed value))) (def .public (close sink) (All (_ w) (-> (Sink w) (Process Any))) (async.future (of sink close))) (def .public try (All (_ a) (-> (Process a) (Process (Try a)))) (async#each (|>> {try.#Success})))