aboutsummaryrefslogtreecommitdiff
path: root/stdlib
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib')
-rw-r--r--stdlib/source/lux/concurrency/frp.lux133
-rw-r--r--stdlib/source/lux/concurrency/promise.lux12
-rw-r--r--stdlib/test/test/lux/concurrency/frp.lux4
3 files changed, 139 insertions, 10 deletions
diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux
index 3f25bb913..f84103e3f 100644
--- a/stdlib/source/lux/concurrency/frp.lux
+++ b/stdlib/source/lux/concurrency/frp.lux
@@ -6,7 +6,7 @@
eq)
[io #- run]
(codata function)
- (data (coll [list])
+ (data (coll [list "L/" Monoid<List>])
text/format)
[compiler]
(macro ["s" syntax #+ syntax: Syntax]))
@@ -123,7 +123,18 @@
[init' (f x init)]
(fold f init' xs')))))
-(def: (no-dups' eq last-one xs)
+(def: #export (folds f init xs)
+ {#;doc "A channel of folds."}
+ (All [a b] (-> (-> b a (&;Promise a)) a (Chan b) (Chan a)))
+ (do &;Monad<Promise>
+ [?x+xs xs]
+ (case ?x+xs
+ #;None (wrap (#;Some [init (wrap #;None)]))
+ (#;Some [x xs']) (do @
+ [init' (f x init)]
+ (folds f init' xs')))))
+
+(def: (distinct' eq last-one xs)
(All [a] (-> (Eq a) a (Chan a) (Chan a)))
(let [(^open) eq]
(do &;Monad<Promise>
@@ -131,10 +142,10 @@
(case ?x+xs
#;None (wrap #;None)
(#;Some [x xs']) (if (= x last-one)
- (no-dups' eq last-one xs')
- (wrap (#;Some [x (no-dups' eq x xs')])))))))
+ (distinct' eq last-one xs')
+ (wrap (#;Some [x (distinct' eq x xs')])))))))
-(def: #export (no-dups eq xs)
+(def: #export (distinct eq xs)
{#;doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."}
(All [a] (-> (Eq a) (Chan a) (Chan a)))
(let [(^open) eq]
@@ -142,7 +153,7 @@
[?x+xs xs]
(case ?x+xs
#;None (wrap #;None)
- (#;Some [x xs']) (wrap (#;Some [x (no-dups' eq x xs')]))))))
+ (#;Some [x xs']) (wrap (#;Some [x (distinct' eq x xs')]))))))
(def: #export (consume xs)
{#;doc "Reads the entirety of a channel's contents and returns them as a list."}
@@ -158,12 +169,120 @@
[=xs (consume xs')]
(wrap (#;Cons x =xs))))))
-(def: #export (as-chan p)
+(def: #export (once p)
(All [a] (-> (&;Promise a) (Chan a)))
(do &;Monad<Promise>
[x p]
(wrap (#;Some [x (wrap #;None)]))))
+(def: #export (poll time action)
+ (All [a] (-> Nat (IO (Maybe a)) (Chan a)))
+ (do &;Monad<Promise>
+ [?output (&;future action)]
+ (case ?output
+ #;None
+ (wrap #;None)
+
+ (#;Some head)
+ (do @
+ [_ (&;wait time)]
+ (wrap (#;Some [head (poll time action)]))))))
+
+(def: #export (periodic time value)
+ (All [a] (-> Nat a (Chan a)))
+ (do &;Monad<Promise>
+ []
+ (wrap (#;Some [value (do @
+ [_ (&;wait time)]
+ (periodic time value))]))))
+
+(def: #export (sequential time xs)
+ (All [a] (-> Nat (List a) (Chan a)))
+ (do &;Monad<Promise>
+ []
+ (case xs
+ #;Nil
+ (wrap #;None)
+
+ (#;Cons x xs')
+ (wrap (#;Some [x (do @
+ [_ (&;wait time)]
+ (sequential time xs'))])))))
+
+(def: #export (cycle time values)
+ (All [a] (-> Nat (List a) (Chan a)))
+ (do &;Monad<Promise>
+ []
+ (case values
+ #;Nil
+ (wrap #;None)
+
+ _
+ (loop [xs values]
+ (case xs
+ #;Nil
+ (recur values)
+
+ (#;Cons x xs')
+ (wrap (#;Some [x (do @
+ [_ (&;wait time)]
+ (recur xs'))])))))))
+
+## Utils
+(def: (tail xs)
+ (All [a] (-> (List a) (List a)))
+ (case xs
+ #;Nil
+ #;Nil
+
+ (#;Cons _ xs')
+ xs'))
+
+(def: #export (sliding-window max inputs)
+ (All [a] (-> Nat (Chan a) (Chan (List a))))
+ (let [(^open) &;Monad<Promise>]
+ (folds (lambda [input window]
+ (let [window' (L/append window (list input))]
+ (wrap (if (n.<= max (list;size window'))
+ window'
+ (tail window')))))
+ (list)
+ inputs)))
+
+(def: #export (iterate f init)
+ (All [a] (-> (-> a (&;Promise (Maybe a))) a (Chan a)))
+ (do &;Monad<Promise>
+ []
+ (wrap (#;Some [init (do @
+ [?next (f init)]
+ (case ?next
+ #;None
+ (wrap #;None)
+
+ (#;Some init')
+ (iterate f init')))]))))
+
+(def: #export (sample time inputs)
+ (All [a] (-> Nat (Chan a) (Chan a)))
+ (do &;Monad<Promise>
+ [?h+t inputs]
+ (case ?h+t
+ #;None
+ (wrap #;None)
+
+ (#;Some [value inputs'])
+ (do @
+ [_ (&;wait time)
+ #let [next-inputs (loop [last-resolved-node inputs']
+ (case (&;poll last-resolved-node)
+ (^=> (#;Some (#;Some [_ next-node]))
+ (&;resolved? next-node))
+ (recur next-node)
+
+ _
+ last-resolved-node))]]
+ (wrap (#;Some [value (sample time next-inputs)]))))))
+
## [Structures]
(struct: #export _ (Functor Chan)
(def: (map f xs)
diff --git a/stdlib/source/lux/concurrency/promise.lux b/stdlib/source/lux/concurrency/promise.lux
index 0dd8ebf00..3c10e785d 100644
--- a/stdlib/source/lux/concurrency/promise.lux
+++ b/stdlib/source/lux/concurrency/promise.lux
@@ -82,12 +82,22 @@
(wrap (list (` (promise' #;None))))))
(def: #export (poll promise)
- {#;doc "Checks whether an Promise's value has already been resolved."}
+ {#;doc "Polls a Promise's value."}
(All [a] (-> (Promise a) (Maybe a)))
(|> (atom;get promise)
io;run
(get@ #value)))
+(def: #export (resolved? promise)
+ {#;doc "Checks whether a Promise's value has already been resolved."}
+ (All [a] (-> (Promise a) Bool))
+ (case (poll promise)
+ #;None
+ false
+
+ (#;Some _)
+ true))
+
(def: #export (resolve value promise)
{#;doc "Sets an Promise's value if it hasn't been done yet."}
(All [a] (-> a (Promise a) (IO Bool)))
diff --git a/stdlib/test/test/lux/concurrency/frp.lux b/stdlib/test/test/lux/concurrency/frp.lux
index ec128b119..80f15ad3d 100644
--- a/stdlib/test/test/lux/concurrency/frp.lux
+++ b/stdlib/test/test/lux/concurrency/frp.lux
@@ -71,7 +71,7 @@
(i.= 15 output)))
(do Monad<Promise>
- [elems (&;consume (&;no-dups number;Eq<Int> (List->Chan (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))]
+ [elems (&;consume (&;distinct number;Eq<Int> (List->Chan (list 0 0 0 1 2 2 3 3 3 3 4 4 4 5 5))))]
(assert "Can avoid immediate repetition in the channel."
(case elems
(^ (list 0 1 2 3 4 5))
@@ -81,7 +81,7 @@
false)))
(do Monad<Promise>
- [elems (&;consume (&;as-chan (:: promise;Monad<Promise> wrap 12345)))]
+ [elems (&;consume (&;once (:: promise;Monad<Promise> wrap 12345)))]
(assert "Can convert a promise into a single-value channel."
(case elems
(^ (list 12345))