From ea32f6d5500b5affa7f5b11cdc05b48ad4fe7a46 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Tue, 12 Dec 2017 08:28:55 -0400 Subject: - FRP channels are no longer asynchronously-built lists, but rather mediums for spreading data, that can be chained together to form networks for data distribution. --- stdlib/source/lux/concurrency/frp.lux | 375 ++++++++------------------------ stdlib/source/lux/concurrency/stm.lux | 43 +--- stdlib/source/lux/world/net/tcp.jvm.lux | 122 +++++------ stdlib/source/lux/world/net/udp.jvm.lux | 3 +- 4 files changed, 159 insertions(+), 384 deletions(-) (limited to 'stdlib/source') diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux index 533d2a7e5..e7efcbf3d 100644 --- a/stdlib/source/lux/concurrency/frp.lux +++ b/stdlib/source/lux/concurrency/frp.lux @@ -1,310 +1,127 @@ (.module: lux - (lux (control ["F" functor] - ["A" applicative] - ["M" monad #+ do Monad] - [eq #+ Eq] - ["p" parser]) - [io #- run] - (data (coll [list "L/" Monoid]) - text/format) - [macro] - (macro ["s" syntax #+ syntax: Syntax])) - (// ["&" promise])) + (lux (control [functor #+ Functor] + [applicative #+ Applicative] + [monad #+ do Monad]) + [io #+ IO io] + (data (coll [list "list/" Monoid])) + (type abstract)) + (// [atom #+ Atom atom] + [promise #+ Promise])) ## [Types] -(type: #export (Channel a) - {#.doc "An asynchronous channel of values which may be closed. - - Reading from a channel does not remove the read piece of data, as it can still be accessed if you have an earlier node of the channel."} - (&.Promise (Maybe [a (Channel a)]))) - -## [Syntax] -(syntax: #export (channel) - {#.doc (doc "Makes an uninitialized Channel." - (channel))} - (wrap (list (` (&.promise #.None))))) +(abstract: #export (Channel a) + {#.doc "An asynchronous channel to distribute values."} + (Atom (List (-> a (IO Top)))) + + (def: #export (channel _) + (All [a] (-> Top (Channel a))) + (@abstraction (atom (list)))) + + (def: #export (listen listener (^@representation channel)) + (All [a] (-> (-> a (IO Top)) (Channel a) (IO Unit))) + (atom.update (|>> (#.Cons listener)) channel)) + + (def: #export (publish (^@representation channel) value) + {#.doc "Publish to a channel."} + (All [a] (-> (Channel a) a (IO Unit))) + (do io.Monad + [listeners (atom.read channel) + _ (monad.map @ (function [listener] (listener value)) listeners)] + (wrap []))) + ) ## [Values] -(def: #export (filter p xs) +(def: #export (filter predicate input) (All [a] (-> (-> a Bool) (Channel a) (Channel a))) - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap #.None) - (#.Some [x xs']) (if (p x) - (wrap (#.Some [x (filter p xs')])) - (filter p xs'))))) - -(def: #export (write value target) - {#.doc "Write to a channel, so long as it's still open."} - (All [a] (-> a (Channel a) (IO (Maybe (Channel a))))) - (case (&.poll target) - (^template [ ] - - (do Monad - [#let [new-tail (channel)] - done? (&.resolve (#.Some [value new-tail]) )] - (if done? - (wrap (#.Some new-tail)) - (write value )))) - ([#.None target] - [(#.Some (#.Some [_ target'])) target']) - - _ - (:: Monad wrap #.None) - )) - -(def: #export (close target) - (All [a] (-> (Channel a) (IO Bool))) - (case (&.poll target) - (^template [ ] - - (do Monad - [done? (&.resolve #.None )] - (if done? - (wrap true) - (close )))) - ([#.None target] - [(#.Some (#.Some [_ target'])) target']) - - _ - (:: Monad wrap false) - )) - -(def: (pipe' input output) - (All [a] (-> (Channel a) (Channel a) (&.Promise Unit))) - (do &.Monad - [?x+xs input] - (case ?x+xs - #.None (wrap []) - (#.Some [x input']) (case (io.run (write x output)) - #.None - (wrap []) - - (#.Some output') - (pipe' input' output'))))) + (let [output (channel [])] + (exec (io.run (listen (function [value] + (if (predicate value) + (publish output value) + (io []))) + input)) + output))) -(def: #export (pipe input output) +(def: #export (pipe output input) {#.doc "Copy/pipe the contents of a channel on to another."} - (All [a] (-> (Channel a) (Channel a) (&.Promise Unit))) - (do &.Monad - [_ (pipe' input output)] - (exec (io.run (close output)) - (wrap [])))) + (All [a] (-> (Channel a) (Channel a) (IO Unit))) + (listen (publish output) + input)) -(def: #export (merge xss) +(def: #export (merge inputs) {#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."} - (All [a] (-> (List (Channel a)) (Channel a))) - (let [output (channel)] - (exec (do &.Monad - [_ (M.map @ (function [input] (pipe' input output)) xss)] - (exec (io.run (close output)) - (wrap []))) + (All [a] (-> (List (Channel a)) (IO (Channel a)))) + (let [output (channel [])] + (do io.Monad + [_ (monad.map @ (pipe output) inputs)] + (wrap output)))) + +(def: #export (from-promise promise) + (All [a] (-> (Promise a) (Channel a))) + (let [output (channel [])] + (exec (promise.await (publish output) promise) output))) -(def: #export (fold f init xs) - {#.doc "Asynchronous fold over channels."} - (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (&.Promise a))) - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap init) - (#.Some [x xs']) (do @ - [init' (f x init)] - (fold f init' xs'))))) - -(def: #export (folds f init xs) - {#.doc "A channel of folds."} - (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (Channel a))) - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap (#.Some [init (wrap #.None)])) - (#.Some [x xs']) (do @ - [init' (f x init)] - (folds f init' xs'))))) - -(def: (distinct' eq last-one xs) - (All [a] (-> (Eq a) a (Channel a) (Channel a))) - (let [(^open) eq] - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap #.None) - (#.Some [x xs']) (if (= x last-one) - (distinct' eq last-one xs') - (wrap (#.Some [x (distinct' eq x xs')]))))))) - -(def: #export (distinct eq xs) - {#.doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."} - (All [a] (-> (Eq a) (Channel a) (Channel a))) - (let [(^open) eq] - (do &.Monad - [?x+xs xs] - (case ?x+xs - #.None (wrap #.None) - (#.Some [x xs']) (wrap (#.Some [x (distinct' eq x xs')])))))) - -(def: #export (consume xs) - {#.doc "Reads the entirety of a channel's contents and returns them as a list."} - (All [a] (-> (Channel a) (&.Promise (List a)))) - (do &.Monad - [?x+xs' xs] - (case ?x+xs' - #.None - (wrap #.Nil) - - (#.Some [x xs']) - (do @ - [=xs (consume xs')] - (wrap (#.Cons x =xs)))))) - -(def: #export (once p) - (All [a] (-> (&.Promise a) (Channel a))) - (do &.Monad - [x p] - (wrap (#.Some [x (wrap #.None)])))) - (def: #export (poll time action) - (All [a] (-> Nat (IO (Maybe a)) (Channel a))) - (do &.Monad - [?output (&.future action)] - (case ?output - #.None - (wrap #.None) - - (#.Some head) - (do @ - [_ (&.wait time)] - (wrap (#.Some [head (poll time action)])))))) - -(def: #export (periodic time value) - (All [a] (-> Nat a (Channel a))) - (do &.Monad - [] - (wrap (#.Some [value (do @ - [_ (&.wait time)] - (periodic time value))])))) - -(def: #export (sequential time xs) - (All [a] (-> Nat (List a) (Channel a))) - (do &.Monad - [] - (case xs - #.Nil - (wrap #.None) - - (#.Cons x xs') - (wrap (#.Some [x (do @ - [_ (&.wait time)] - (sequential time xs'))]))))) - -(def: #export (cycle time values) - (All [a] (-> Nat (List a) (Channel a))) - (do &.Monad - [] - (case values - #.Nil - (wrap #.None) - - _ - (loop [xs values] - (case xs - #.Nil - (recur values) - - (#.Cons x xs') - (wrap (#.Some [x (do @ - [_ (&.wait time)] - (recur xs'))]))))))) - -## Utils -(def: (tail xs) - (All [a] (-> (List a) (List a))) - (case xs - #.Nil - #.Nil - - (#.Cons _ xs') - xs')) + (All [a] (-> Nat (IO a) (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad + [value action + _ (publish output value)] + (wrap (promise.await recur (promise.wait time)))))) + output))) -(def: #export (sliding-window max inputs) - (All [a] (-> Nat (Channel a) (Channel (List a)))) - (let [(^open) &.Monad] - (folds (function [input window] - (let [window' (L/compose window (list input))] - (wrap (if (n/<= max (list.size window')) - window' - (tail window'))))) - (list) - inputs))) +(def: #export (periodic time) + (All [a] (-> Nat (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [_ []] + (do io.Monad + [_ (publish output [])] + (wrap (promise.await recur (promise.wait time)))))) + output))) (def: #export (iterate f init) - (All [a] (-> (-> a (&.Promise (Maybe a))) a (Channel a))) - (do &.Monad - [] - (wrap (#.Some [init (do @ - [?next (f init)] - (case ?next - #.None - (wrap #.None) - - (#.Some init') - (iterate f init')))])))) - -(def: #export (sample time inputs) - (All [a] (-> Nat (Channel a) (Channel a))) - (do &.Monad - [?h+t inputs] - (case ?h+t - #.None - (wrap #.None) - - (#.Some [value inputs']) - (do @ - [_ (&.wait time) - #let [next-inputs (loop [last-resolved-node inputs'] - (case (&.poll last-resolved-node) - (^multi (#.Some (#.Some [_ next-node])) - (&.resolved? next-node)) - (recur next-node) - - _ - last-resolved-node))]] - (wrap (#.Some [value (sample time next-inputs)])))))) + (All [a] (-> (-> a (Promise a)) a (Channel a))) + (let [output (channel [])] + (exec (io.run + (loop [zero init] + (do io.Monad + [_ (publish output zero)] + (wrap (promise.await recur (f zero)))))) + output))) ## [Structures] -(struct: #export _ (F.Functor Channel) - (def: (map f xs) - (:: &.Functor map - (function [?x+xs] - (case ?x+xs - #.None #.None - (#.Some [x xs']) (#.Some [(f x) (map f xs')]))) - xs))) +(struct: #export _ (Functor Channel) + (def: (map f input) + (let [output (channel [])] + (exec (io.run (listen (|>> f (publish output)) + input)) + output)))) -(struct: #export _ (A.Applicative Channel) +(struct: #export _ (Applicative Channel) (def: functor Functor) (def: (wrap a) - (let [(^open) &.Monad] - (wrap (#.Some [a (wrap #.None)])))) + (let [output (channel [])] + (exec (io.run (publish output a)) + output))) (def: (apply ff fa) - (let [fb (channel)] - (exec (let [(^open) Functor] - (map (function [f] (pipe (map f fa) fb)) - ff)) - fb)))) + (let [output (channel [])] + (exec (io.run (listen (function [f] + (listen (|>> f (publish output)) + fa)) + ff)) + output)))) (struct: #export _ (Monad Channel) (def: applicative Applicative) (def: (join mma) - (let [output (channel)] - (exec (let [(^open) Functor] - (map (function [ma] - (pipe ma output)) - mma)) + (let [output (channel [])] + (exec (io.run (listen (listen (publish output)) + mma)) output)))) diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux index 9b1cabe01..cc609a055 100644 --- a/stdlib/source/lux/concurrency/stm.lux +++ b/stdlib/source/lux/concurrency/stm.lux @@ -6,10 +6,7 @@ [io #+ IO io] (data [product] [maybe] - [number "nat/" Codec] - [text] - (coll [list "list/" Functor Fold] - [dict #+ Dict])) + (coll [list "list/" Functor Fold])) (concurrency [atom #+ Atom atom] [promise #+ Promise promise] [frp "frp/" Functor]) @@ -17,12 +14,12 @@ (abstract: #export (Var a) {#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."} - (Atom [a (Dict Text (-> a (IO Unit)))]) + (Atom [a (List (-> a (IO Unit)))]) (def: #export (var value) {#.doc "Creates a new STM var, with a default value."} (All [a] (-> a (Var a))) - (@abstraction (atom.atom [value (dict.new text.Hash)]))) + (@abstraction (atom.atom [value (list)]))) (def: read!! (All [a] (-> (Var a) a)) @@ -42,43 +39,21 @@ succeeded? (atom.compare-and-swap old [new-value _observers] var)] (if succeeded? (do @ - [_ (|> _observers - dict.values - (monad.map @ (function [f] (f new-value))))] + [_ (monad.map @ (function [f] (f new-value)) _observers)] (wrap [])) (write! new-value (@abstraction var))))) (def: #export (follow (^@representation target)) {#.doc "Creates a channel that will receive all changes to the value of the given var."} (All [a] (-> (Var a) (IO (frp.Channel a)))) - (let [head (: (frp.Channel ($ +0)) (frp.channel)) - ## head (frp.channel) - channel-var (var head) - observer (function [label value] - (case (io.run (|> channel-var read!! (frp.write value))) - #.None - ## By closing the output Channel, the - ## observer becomes obsolete. - (atom.update (function [[value observers]] - [value (dict.remove label observers)]) - target) - - (#.Some tail') - (write! tail' channel-var)))] + (let [channel (: (frp.Channel ($ +0)) (frp.channel [])) + ## channel (frp.channel) + ] (do io.Monad [_ (atom.update (function [[value observers]] - (let [label (nat/encode (list/fold (function [key base] - (case (nat/decode key) - (#.Left _) - base - - (#.Right key-num) - (n/max key-num base))) - +0 - (dict.keys observers)))] - [value (dict.put label (observer label) observers)])) + [value (#.Cons (frp.publish channel) observers)]) target)] - (wrap head)))) + (wrap channel)))) ) (type: (Tx-Frame a) diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux index fcd037578..e551e995d 100644 --- a/stdlib/source/lux/world/net/tcp.jvm.lux +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -1,13 +1,13 @@ (.module: lux (lux (control monad) - (concurrency ["P" promise] - ["T" task] + (concurrency [promise #+ Promise promise] + [task #+ Task] [frp]) (data ["e" error]) (type abstract) (world [blob #+ Blob]) - [io] + [io #+ Process] [host]) [//]) @@ -42,90 +42,74 @@ #out OutputStream} (def: #export (read data offset length self) - (let [in (get@ #in (@representation self))] - (P.future - (do (e.ErrorT io.Monad) - [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)] - in)] - (wrap (int-to-nat bytes-read)))))) + (-> Blob Nat Nat TCP (Task Nat)) + (promise.future + (do io.Monad + [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)] + (get@ #in (@representation self)))] + (wrap (int-to-nat bytes-read))))) (def: #export (write data offset length self) + (-> Blob Nat Nat TCP (Task Unit)) (let [out (get@ #out (@representation self))] - (P.future - (do (e.ErrorT io.Monad) + (promise.future + (do io.Monad [_ (OutputStream::write [data (nat-to-int offset) (nat-to-int length)] out)] (Flushable::flush [] out))))) (def: #export (close self) + (-> TCP (Task Unit)) (let [(^open) (@representation self)] - (P.future - (do (e.ErrorT io.Monad) + (promise.future + (do io.Monad [_ (AutoCloseable::close [] in) _ (AutoCloseable::close [] out)] (AutoCloseable::close [] socket))))) (def: (tcp-client socket) - (-> Socket (io.IO (e.Error TCP))) - (do (e.ErrorT io.Monad) + (-> Socket (Process TCP)) + (do io.Monad [input (Socket::getInputStream [] socket) output (Socket::getOutputStream [] socket)] (wrap (@abstraction {#socket socket #in input #out output})))) + ) - (def: #export (client address port) - (-> //.Address //.Port (T.Task TCP)) - (P.future - (do (e.ErrorT io.Monad) - [socket (Socket::new [address (nat-to-int port)])] - (tcp-client socket)))) - - (def: (await-server-release client-channel server) - (-> (frp.Channel TCP) ServerSocket (P.Promise Unit)) - (do P.Monad - [outcome client-channel] - (case outcome - ## Channel has been closed. - ## Must close associated server. - #.None - (P.future - (do io.Monad - [_ (AutoCloseable::close [] server)] - (wrap []))) - - ## A client was generated. - ## Nothing to be done... - (#.Some _) - (wrap [])))) +(def: #export (client address port) + (-> //.Address //.Port (Task TCP)) + (promise.future + (do io.Monad + [socket (Socket::new [address (nat-to-int port)])] + (tcp-client socket)))) - (def: #export (server port) - (-> //.Port (T.Task (frp.Channel TCP))) - (P.future - (do (e.ErrorT io.Monad) - [server (ServerSocket::new [(nat-to-int port)]) - #let [output (: (frp.Channel TCP) - (frp.channel)) - _ (: (P.Promise Bool) - (P.future - (loop [tail output] - (do io.Monad - [?client (do (e.ErrorT io.Monad) - [socket (ServerSocket::accept [] server)] - (tcp-client socket))] - (case ?client - (#e.Error error) - (frp.close tail) - - (#e.Success client) - (do @ - [?tail' (frp.write client tail)] - (case ?tail' - #.None - (wrap true) - - (#.Some tail') - (exec (await-server-release tail' server) - (recur tail')))))))))]] - (wrap output)))) - ) +(def: #export (server port) + (-> //.Port (Task [(Promise Unit) + (frp.Channel TCP)])) + (promise.future + (do (e.ErrorT io.Monad) + [server (ServerSocket::new [(nat-to-int port)]) + #let [signal (: (Promise Unit) + (promise #.None)) + _ (promise.await (function [_] + (AutoCloseable::close [] server)) + signal) + output (: (frp.Channel TCP) + (frp.channel [])) + _ (: (Promise Unit) + (promise.future + (loop [_ []] + (do io.Monad + [?client (do (e.ErrorT io.Monad) + [socket (ServerSocket::accept [] server)] + (tcp-client socket))] + (case ?client + (#e.Error error) + (wrap []) + + (#e.Success client) + (do @ + [_ (frp.publish output client)] + (recur [])))))))]] + (wrap [signal output])))) diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux index caec294cd..c0aa54b77 100644 --- a/stdlib/source/lux/world/net/udp.jvm.lux +++ b/stdlib/source/lux/world/net/udp.jvm.lux @@ -3,8 +3,7 @@ (lux (control monad ["ex" exception #+ exception:]) (concurrency ["P" promise] - ["T" task] - [frp]) + ["T" task]) (data ["e" error] [maybe] (coll [array])) -- cgit v1.2.3