## Copyright (c) Eduardo Julian. All rights reserved. ## This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. ## If a copy of the MPL was not distributed with this file, ## You can obtain one at http://mozilla.org/MPL/2.0/. (;module: lux (lux (control functor applicative monad eq) (codata [io #- run] function) (data (struct [list]) text/format) [compiler] (macro ["s" syntax #+ syntax: Syntax])) (.. ["&" promise])) ## [Types] (type: #export (Chan a) {#;doc "An asynchronous channel of values which may be closed. Reading from a channel does not remove the read piece of data, as it can still be accessed if you have an earlier node of the channel."} (&;Promise (Maybe [a (Chan a)]))) ## [Syntax] (syntax: #export (chan [?type (s;opt s;any)]) {#;doc (doc "Makes an uninitialized Chan (in this case, of Unit)." (chan Unit) "The type is optional." (chan))} (case ?type (#;Some type) (wrap (list (` (: (Chan (~ type)) (&;promise))))) #;None (wrap (list (` (&;promise)))))) ## [Values] (def: #export (filter p xs) (All [a] (-> (-> a Bool) (Chan a) (Chan a))) (do &;Monad [?x+xs xs] (case ?x+xs #;None (wrap #;None) (#;Some [x xs']) (if (p x) (wrap (#;Some [x (filter p xs')])) (filter p xs'))))) (def: #export (write value chan) {#;doc "Write to a channel, so long as it's still open."} (All [a] (-> a (Chan a) (IO (Maybe (Chan a))))) (case (&;poll chan) (^template [ ] (do Monad [#let [new-tail (&;promise)] done? (&;resolve (#;Some [value new-tail]) )] (if done? (wrap (#;Some new-tail)) (write value )))) ([#;None chan] [(#;Some (#;Some [_ chan'])) chan']) _ (:: Monad wrap #;None) )) (def: #export (close chan) (All [a] (-> (Chan a) (IO Bool))) (case (&;poll chan) (^template [ ] (do Monad [done? (&;resolve #;None )] (if done? (wrap true) (close )))) ([#;None chan] [(#;Some (#;Some [_ chan'])) chan']) _ (:: Monad wrap false) )) (def: (pipe' input output) (All [a] (-> (Chan a) (Chan a) (&;Promise Unit))) (do &;Monad [?x+xs input] (case ?x+xs #;None (wrap []) (#;Some [x input']) (case (io;run (write x output)) #;None (wrap []) (#;Some output') (pipe' input' output'))))) (def: #export (pipe input output) {#;doc "Copy/pipe the contents of a channel on to another."} (All [a] (-> (Chan a) (Chan a) (&;Promise Unit))) (do &;Monad [_ (pipe' input output)] (exec (io;run (close output)) (wrap [])))) (def: #export (merge xss) {#;doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} (All [a] (-> (List (Chan a)) (Chan a))) (let [output (chan ($ +0))] (exec (do &;Monad [_ (mapM @ (lambda [input] (pipe' input output)) xss)] (exec (io;run (close output)) (wrap []))) output))) (def: #export (fold f init xs) {#;doc "Asynchronous fold over channels."} (All [a b] (-> (-> b a (&;Promise a)) a (Chan b) (&;Promise a))) (do &;Monad [?x+xs xs] (case ?x+xs #;None (wrap init) (#;Some [x xs']) (do @ [init' (f x init)] (fold f init' xs'))))) (def: (no-dups' eq last-one xs) (All [a] (-> (Eq a) a (Chan a) (Chan a))) (let [(^open) eq] (do &;Monad [?x+xs xs] (case ?x+xs #;None (wrap #;None) (#;Some [x xs']) (if (= x last-one) (no-dups' eq last-one xs') (wrap (#;Some [x (no-dups' eq x xs')]))))))) (def: #export (no-dups eq xs) {#;doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."} (All [a] (-> (Eq a) (Chan a) (Chan a))) (let [(^open) eq] (do &;Monad [?x+xs xs] (case ?x+xs #;None (wrap #;None) (#;Some [x xs']) (wrap (#;Some [x (no-dups' eq x xs')])))))) (def: #export (consume xs) {#;doc "Reads the entirety of a channel's contents and returns them as a list."} (All [a] (-> (Chan a) (&;Promise (List a)))) (do &;Monad [?x+xs' xs] (case ?x+xs' #;None (wrap #;Nil) (#;Some [x xs']) (do @ [=xs (consume xs')] (wrap (#;Cons x =xs)))))) (def: #export (as-chan p) (All [a] (-> (&;Promise a) (Chan a))) (do &;Monad [x p] (wrap (#;Some [x (wrap #;None)])))) ## [Structures] (struct: #export _ (Functor Chan) (def: (map f xs) (:: &;Functor map (lambda [?x+xs] (case ?x+xs #;None #;None (#;Some [x xs']) (#;Some [(f x) (map f xs')]))) xs))) (struct: #export _ (Applicative Chan) (def: functor Functor) (def: (wrap a) (let [(^open) &;Monad] (wrap (#;Some [a (wrap #;None)])))) (def: (apply ff fa) (let [fb (chan ($ +1))] (exec (let [(^open) Functor] (map (lambda [f] (pipe (map f fa) fb)) ff)) fb)))) (struct: #export _ (Monad Chan) (def: applicative Applicative) (def: (join mma) (let [output (chan ($ +0))] (exec (let [(^open) Functor] (map (lambda [ma] (pipe ma output)) mma)) output))))