(.module: [library [lux #* ["_" test (#+ Test)] [abstract [monad (#+ do)] [\\specification ["$." functor (#+ Injection Comparison)] ["$." apply] ["$." monad]]] [control ["." try] ["." exception] ["." io (#+ IO io)]] [data [collection ["." list ("#\." fold monoid)] ["." row (#+ Row)]]] [math ["." random] [number ["n" nat]]]]] [\\library ["." / [// ["." async (#+ Async) ("#\." monad)] ["." atom (#+ Atom atom)]]]]) (def: injection (Injection /.Channel) (|>> async.resolved /.of_async)) (def: comparison (Comparison /.Channel) (function (_ == left right) (io.run (do io.monad [?left (async.poll left) ?right (async.poll right)] (in (case [?left ?right] [(#.Some (#.Some [left _])) (#.Some (#.Some [right _]))] (== left right) _ false)))))) (def: (take_amount amount_of_polls [channel sink]) (All [a] (-> Nat [(/.Channel a) (/.Sink a)] (Async (List a)))) (case amount_of_polls 0 (do async.monad [_ (async.future (\ sink close))] (in #.End)) _ (do {! async.monad} [event channel] (case event #.None (in #.End) (#.Some [head tail]) (\ ! map (|>> (#.Item head)) (take_amount (dec amount_of_polls) [channel sink])))))) (def: #export test Test (<| (_.covering /._) (let [(^open "list\.") (list.equivalence n.equivalence)] (do {! random.monad} [inputs (random.list 5 random.nat) sample random.nat distint/0 random.nat distint/1 (|> random.nat (random.only (|>> (n.= distint/0) not))) distint/2 (|> random.nat (random.only (function (_ value) (not (or (n.= distint/0 value) (n.= distint/1 value)))))) shift random.nat] ($_ _.and (_.for [/.functor] ($functor.spec ..injection ..comparison /.functor)) (_.for [/.apply] ($apply.spec ..injection ..comparison /.apply)) (_.for [/.monad] ($monad.spec ..injection ..comparison /.monad)) (_.cover [/.Channel /.Sink /.channel] (case (io.run (do (try.with io.monad) [#let [[channel sink] (/.channel [])] _ (\ sink feed sample) _ (\ sink close)] (in channel))) (#try.Success channel) (io.run (do io.monad [?actual (async.poll channel)] (in (case ?actual (#.Some (#.Some [actual _])) (n.= sample actual) _ false)))) (#try.Failure error) false)) (_.cover [/.channel_is_already_closed] (case (io.run (do (try.with io.monad) [#let [[channel sink] (/.channel [])] _ (\ sink close)] (\ sink feed sample))) (#try.Success _) false (#try.Failure error) (exception.match? /.channel_is_already_closed error))) (in (do async.monad [output (|> sample async.resolved /.of_async /.consume)] (_.cover' [/.of_async /.consume] (list\= (list sample) output)))) (in (do async.monad [output (|> inputs (/.sequential 0) /.consume)] (_.cover' [/.sequential] (list\= inputs output)))) (in (do async.monad [output (|> inputs (/.sequential 0) (/.only n.even?) /.consume)] (_.cover' [/.only] (list\= (list.only n.even? inputs) output)))) (in (do {! async.monad} [#let [[?signal !signal] (: [(async.Async Any) (async.Resolver Any)] (async.async [])) sink (: (Atom (Row Nat)) (atom.atom row.empty))] _ (async.future (/.subscribe (function (_ value) (do {! io.monad} [current (atom.read sink) _ (atom.update (row.add value) sink)] (if (n.< (list.size inputs) (inc (row.size current))) (in (#.Some [])) (do ! [_ (!signal [])] (in #.None))))) (/.sequential 0 (list\compose inputs inputs)))) _ ?signal listened (|> sink atom.read async.future (\ ! map row.to_list))] (_.cover' [/.Subscriber /.subscribe] (list\= inputs listened)))) (in (do async.monad [actual (/.fold (function (_ input total) (async.resolved (n.+ input total))) 0 (/.sequential 0 inputs))] (_.cover' [/.fold] (n.= (list\fold n.+ 0 inputs) actual)))) (in (do async.monad [actual (|> inputs (/.sequential 0) (/.folds (function (_ input total) (async.resolved (n.+ input total))) 0) /.consume)] (_.cover' [/.folds] (list\= (list.folds n.+ 0 inputs) actual)))) (in (do async.monad [actual (|> (list distint/0 distint/0 distint/0 distint/1 distint/2 distint/2) (/.sequential 0) (/.distinct n.equivalence) /.consume)] (_.cover' [/.distinct] (list\= (list distint/0 distint/1 distint/2) actual)))) (do ! [polling_delay (\ ! map (|>> (n.% 10) inc) random.nat) amount_of_polls (\ ! map (|>> (n.% 10) inc) random.nat)] ($_ _.and (in (do {! async.monad} [actual (..take_amount amount_of_polls (/.poll polling_delay (: (IO Nat) (io.io sample)))) #let [correct_values! (list.every? (n.= sample) actual) enough_polls! (n.= amount_of_polls (list.size actual))]] (_.cover' [/.poll] (and correct_values! enough_polls!)))) (in (do {! async.monad} [actual (..take_amount amount_of_polls (/.periodic polling_delay))] (_.cover' [/.periodic] (n.= amount_of_polls (list.size actual))))))) (in (do async.monad [#let [max_iterations 10] actual (|> [0 sample] (/.iterate (function (_ [iterations current]) (async.resolved (if (n.< max_iterations iterations) (#.Some [[(inc iterations) (n.+ shift current)] current]) #.None)))) /.consume)] (_.cover' [/.iterate] (and (n.= max_iterations (list.size actual)) (list\= (list.folds n.+ sample (list.repeat (dec max_iterations) shift)) actual))))) )))))