From 97329ec45dd93dc1008d3778c6173fdfbfbd7ab8 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Sat, 28 Jan 2017 23:53:11 -0400 Subject: - Expanded the lux/concurrency/frp module. - Some refactoring. --- stdlib/source/lux/concurrency/frp.lux | 133 ++++++++++++++++++++++++++++-- stdlib/source/lux/concurrency/promise.lux | 12 ++- 2 files changed, 137 insertions(+), 8 deletions(-) (limited to 'stdlib/source') diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux index 3f25bb913..f84103e3f 100644 --- a/stdlib/source/lux/concurrency/frp.lux +++ b/stdlib/source/lux/concurrency/frp.lux @@ -6,7 +6,7 @@ eq) [io #- run] (codata function) - (data (coll [list]) + (data (coll [list "L/" Monoid]) text/format) [compiler] (macro ["s" syntax #+ syntax: Syntax])) @@ -123,7 +123,18 @@ [init' (f x init)] (fold f init' xs'))))) -(def: (no-dups' eq last-one xs) +(def: #export (folds f init xs) + {#;doc "A channel of folds."} + (All [a b] (-> (-> b a (&;Promise a)) a (Chan b) (Chan a))) + (do &;Monad + [?x+xs xs] + (case ?x+xs + #;None (wrap (#;Some [init (wrap #;None)])) + (#;Some [x xs']) (do @ + [init' (f x init)] + (folds f init' xs'))))) + +(def: (distinct' eq last-one xs) (All [a] (-> (Eq a) a (Chan a) (Chan a))) (let [(^open) eq] (do &;Monad @@ -131,10 +142,10 @@ (case ?x+xs #;None (wrap #;None) (#;Some [x xs']) (if (= x last-one) - (no-dups' eq last-one xs') - (wrap (#;Some [x (no-dups' eq x xs')]))))))) + (distinct' eq last-one xs') + (wrap (#;Some [x (distinct' eq x xs')]))))))) -(def: #export (no-dups eq xs) +(def: #export (distinct eq xs) {#;doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."} (All [a] (-> (Eq a) (Chan a) (Chan a))) (let [(^open) eq] @@ -142,7 +153,7 @@ [?x+xs xs] (case ?x+xs #;None (wrap #;None) - (#;Some [x xs']) (wrap (#;Some [x (no-dups' eq x xs')])))))) + (#;Some [x xs']) (wrap (#;Some [x (distinct' eq x xs')])))))) (def: #export (consume xs) {#;doc "Reads the entirety of a channel's contents and returns them as a list."} @@ -158,12 +169,120 @@ [=xs (consume xs')] (wrap (#;Cons x =xs)))))) -(def: #export (as-chan p) +(def: #export (once p) (All [a] (-> (&;Promise a) (Chan a))) (do &;Monad [x p] (wrap (#;Some [x (wrap #;None)])))) +(def: #export (poll time action) + (All [a] (-> Nat (IO (Maybe a)) (Chan a))) + (do &;Monad + [?output (&;future action)] + (case ?output + #;None + (wrap #;None) + + (#;Some head) + (do @ + [_ (&;wait time)] + (wrap (#;Some [head (poll time action)])))))) + +(def: #export (periodic time value) + (All [a] (-> Nat a (Chan a))) + (do &;Monad + [] + (wrap (#;Some [value (do @ + [_ (&;wait time)] + (periodic time value))])))) + +(def: #export (sequential time xs) + (All [a] (-> Nat (List a) (Chan a))) + (do &;Monad + [] + (case xs + #;Nil + (wrap #;None) + + (#;Cons x xs') + (wrap (#;Some [x (do @ + [_ (&;wait time)] + (sequential time xs'))]))))) + +(def: #export (cycle time values) + (All [a] (-> Nat (List a) (Chan a))) + (do &;Monad + [] + (case values + #;Nil + (wrap #;None) + + _ + (loop [xs values] + (case xs + #;Nil + (recur values) + + (#;Cons x xs') + (wrap (#;Some [x (do @ + [_ (&;wait time)] + (recur xs'))]))))))) + +## Utils +(def: (tail xs) + (All [a] (-> (List a) (List a))) + (case xs + #;Nil + #;Nil + + (#;Cons _ xs') + xs')) + +(def: #export (sliding-window max inputs) + (All [a] (-> Nat (Chan a) (Chan (List a)))) + (let [(^open) &;Monad] + (folds (lambda [input window] + (let [window' (L/append window (list input))] + (wrap (if (n.<= max (list;size window')) + window' + (tail window'))))) + (list) + inputs))) + +(def: #export (iterate f init) + (All [a] (-> (-> a (&;Promise (Maybe a))) a (Chan a))) + (do &;Monad + [] + (wrap (#;Some [init (do @ + [?next (f init)] + (case ?next + #;None + (wrap #;None) + + (#;Some init') + (iterate f init')))])))) + +(def: #export (sample time inputs) + (All [a] (-> Nat (Chan a) (Chan a))) + (do &;Monad + [?h+t inputs] + (case ?h+t + #;None + (wrap #;None) + + (#;Some [value inputs']) + (do @ + [_ (&;wait time) + #let [next-inputs (loop [last-resolved-node inputs'] + (case (&;poll last-resolved-node) + (^=> (#;Some (#;Some [_ next-node])) + (&;resolved? next-node)) + (recur next-node) + + _ + last-resolved-node))]] + (wrap (#;Some [value (sample time next-inputs)])))))) + ## [Structures] (struct: #export _ (Functor Chan) (def: (map f xs) diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux index 0dd8ebf00..3c10e785d 100644 --- a/stdlib/source/lux/concurrency/promise.lux +++ b/stdlib/source/lux/concurrency/promise.lux @@ -82,12 +82,22 @@ (wrap (list (` (promise' #;None)))))) (def: #export (poll promise) - {#;doc "Checks whether an Promise's value has already been resolved."} + {#;doc "Polls a Promise's value."} (All [a] (-> (Promise a) (Maybe a))) (|> (atom;get promise) io;run (get@ #value))) +(def: #export (resolved? promise) + {#;doc "Checks whether a Promise's value has already been resolved."} + (All [a] (-> (Promise a) Bool)) + (case (poll promise) + #;None + false + + (#;Some _) + true)) + (def: #export (resolve value promise) {#;doc "Sets an Promise's value if it hasn't been done yet."} (All [a] (-> a (Promise a) (IO Bool))) -- cgit v1.2.3