From e5b99ce316436fbf38dd7c686e6a10f13c8b56d4 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Thu, 21 Mar 2019 20:02:50 -0400 Subject: Fixed a bug in the FRP/channel monad. --- stdlib/source/lux/control/concurrency/frp.lux | 111 ++++++++++++--------- stdlib/source/test/lux/control.lux | 9 +- stdlib/source/test/lux/control/concurrency/frp.lux | 18 ++-- 3 files changed, 81 insertions(+), 57 deletions(-) diff --git a/stdlib/source/lux/control/concurrency/frp.lux b/stdlib/source/lux/control/concurrency/frp.lux index a91a4f531..907ffa97d 100644 --- a/stdlib/source/lux/control/concurrency/frp.lux +++ b/stdlib/source/lux/control/concurrency/frp.lux @@ -9,15 +9,15 @@ ["." monad (#+ Monad do)] ["ex" exception (#+ exception:)]] [data - ["." maybe ("#;." functor)] + ["." maybe ("#@." functor)] ["." error (#+ Error)] [collection - ["." list ("#;." monoid)]]] + ["." list ("#@." monoid)]]] [type (#+ :share) abstract]] [// ["." atom (#+ Atom)] - ["." promise (#+ Promise) ("#;." functor)]]) + ["." promise (#+ Promise) ("#@." functor)]]) (type: #export (Channel a) {#.doc "An asynchronous channel to distribute values."} @@ -31,33 +31,33 @@ (: (-> a (IO (Error Any))) feed)) -(def: (source resolve) +(def: (sink resolve) (All [a] (-> (promise.Resolver (Maybe [a (Channel a)])) (Sink a))) - (let [source (atom.atom resolve)] + (let [sink (atom.atom resolve)] (structure (def: close (loop [_ []] (do io.monad - [current (atom.read source) + [current (atom.read sink) stopped? (current #.None)] (if stopped? - ## I closed the source. + ## I closed the sink. (wrap (ex.return [])) - ## Someone else interacted with the source. + ## Someone else interacted with the sink. (do @ - [latter (atom.read source)] + [latter (atom.read sink)] (if (is? current latter) - ## Someone else closed the source. + ## Someone else closed the sink. (wrap (ex.throw channel-is-already-closed [])) - ## Someone else fed the source while I was closing it. + ## Someone else fed the sink while I was closing it. (recur []))))))) (def: (feed value) (loop [_ []] (do io.monad - [current (atom.read source) + [current (atom.read sink) #let [[next resolve-next] (:share [a] {(promise.Resolver (Maybe [a (Channel a)])) current} @@ -66,43 +66,28 @@ (promise.promise [])})] fed? (current (#.Some [value next]))] (if fed? - ## I fed the source. + ## I fed the sink. (do @ - [_ (atom.compare-and-swap current resolve-next source)] + [_ (atom.compare-and-swap current resolve-next sink)] (wrap (ex.return []))) - ## Someone else interacted with the source. + ## Someone else interacted with the sink. (do @ - [latter (atom.read source)] + [latter (atom.read sink)] (if (is? current latter) - ## Someone else closed the source while I was feeding it. + ## Someone else closed the sink while I was feeding it. (wrap (ex.throw channel-is-already-closed [])) - ## Someone else fed the source. + ## Someone else fed the sink. (recur [])))))))))) (def: #export (channel _) (All [a] (-> Any [(Channel a) (Sink a)])) (let [[promise resolve] (promise.promise [])] - [promise (..source resolve)])) - -(def: #export (listen listener channel) - (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) - (io (exec (: (Promise Any) - (loop [channel channel] - (do promise.monad - [cons channel] - (case cons - (#.Some [head tail]) - (exec (io.run (listener head)) - (recur tail)) - - #.None - (wrap []))))) - []))) + [promise (..sink resolve)])) (structure: #export functor (Functor Channel) (def: (map f) - (promise;map - (maybe;map + (promise@map + (maybe@map (function (_ [head tail]) [(f head) (map f tail)]))))) @@ -120,18 +105,54 @@ _ (wrap #.None))))) +(def: empty Channel (promise.resolved #.None)) + (structure: #export monad (Monad Channel) (def: &functor ..functor) (def: (wrap a) - (promise.resolved (#.Some [a (promise.resolved #.None)]))) + (promise.resolved (#.Some [a ..empty]))) (def: (join mma) - (let [[output source] (channel [])] - (exec (io.run (..listen (..listen (:: source feed)) - mma)) + (let [[output sink] (channel [])] + (exec (: (Promise Any) + (loop [mma mma] + (do promise.monad + [?mma mma] + (case ?mma + (#.Some [ma mma']) + (do @ + [_ (loop [ma ma] + (do @ + [?ma ma] + (case ?ma + (#.Some [a ma']) + (exec (io.run (:: sink feed a)) + (recur ma')) + + #.None + (wrap []))))] + (recur mma')) + + #.None + (wrap (: Any (io.run (:: sink close)))))))) output)))) +(def: #export (listen listener channel) + (All [a] (-> (-> a (IO Any)) (Channel a) (IO Any))) + (io (exec (: (Promise Any) + (loop [channel channel] + (do promise.monad + [cons channel] + (case cons + (#.Some [head tail]) + (exec (io.run (listener head)) + (recur tail)) + + #.None + (wrap []))))) + []))) + (def: #export (filter pass? channel) (All [a] (-> (Predicate a) (Channel a) (Channel a))) (do promise.monad @@ -148,8 +169,8 @@ (def: #export (from-promise promise) (All [a] (-> (Promise a) (Channel a))) - (promise;map (function (_ value) - (#.Some [value (promise.resolved #.None)])) + (promise@map (function (_ value) + (#.Some [value ..empty])) promise)) (def: #export (fold f init channel) @@ -186,11 +207,11 @@ (def: #export (poll milli-seconds action) (All [a] (-> Nat (IO a) (Channel a))) - (let [[output source] (channel [])] + (let [[output sink] (channel [])] (exec (io.run (loop [_ []] (do io.monad [value action - _ (:: source feed value)] + _ (:: sink feed value)] (promise.await recur (promise.wait milli-seconds))))) output))) @@ -250,7 +271,7 @@ (All [a] (-> Nat (List a) (Channel a))) (case values #.Nil - (promise.resolved #.None) + ..empty (#.Cons head tail) (promise.resolved (#.Some [head (do promise.monad diff --git a/stdlib/source/test/lux/control.lux b/stdlib/source/test/lux/control.lux index 22bd9741c..8bfd57da0 100644 --- a/stdlib/source/test/lux/control.lux +++ b/stdlib/source/test/lux/control.lux @@ -25,13 +25,15 @@ ($_ _.and /actor.test /atom.test - /frp.test)) + /frp.test + )) (def: security Test ($_ _.and /privacy.test - /integrity.test)) + /integrity.test + )) (def: #export test Test @@ -48,4 +50,5 @@ /thread.test /writer.test ..concurrency - ..security)) + ..security + )) diff --git a/stdlib/source/test/lux/control/concurrency/frp.lux b/stdlib/source/test/lux/control/concurrency/frp.lux index ea4d7adad..92e4838a8 100644 --- a/stdlib/source/test/lux/control/concurrency/frp.lux +++ b/stdlib/source/test/lux/control/concurrency/frp.lux @@ -5,13 +5,13 @@ [control ["." monad (#+ do)] [concurrency - ["." promise ("#;." monad)] + ["." promise ("#@." monad)] ["." atom (#+ Atom atom)]]] [data [number ["." nat]] [collection - ["." list ("#;." functor)]]] + ["." list ("#@." functor)]]] [math ["r" random]]] {1 @@ -19,7 +19,7 @@ (def: #export test Test - (let [(^open "list;.") (list.equivalence nat.equivalence)] + (let [(^open "list@.") (list.equivalence nat.equivalence)] (do r.monad [inputs (r.list 5 r.nat) sample r.nat] @@ -30,7 +30,7 @@ (/.filter n/even?) /.consume)] (_.assert "Can filter a channel's elements." - (list;= (list.filter n/even? inputs) + (list@= (list.filter n/even? inputs) output)))) (wrap (do promise.monad [output (|> inputs @@ -38,22 +38,22 @@ (:: /.functor map inc) /.consume)] (_.assert "Functor goes over every element in a channel." - (list;= (list;map inc inputs) + (list@= (list@map inc inputs) output)))) (wrap (do promise.monad [output (/.consume (:: /.apply apply (/.sequential 0 (list inc)) (/.sequential 0 (list sample))))] (_.assert "Apply works over all channel values." - (list;= (list (inc sample)) + (list@= (list (inc sample)) output)))) (wrap (do promise.monad [output (/.consume (do /.monad - [f (/.from-promise (promise;wrap inc)) - a (/.from-promise (promise;wrap sample))] + [f (/.from-promise (promise@wrap inc)) + a (/.from-promise (promise@wrap sample))] (wrap (f a))))] (_.assert "Valid monad." - (list;= (list (inc sample)) + (list@= (list (inc sample)) output)))) )))) -- cgit v1.2.3