aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source
diff options
context:
space:
mode:
authorEduardo Julian2017-12-12 08:28:55 -0400
committerEduardo Julian2017-12-12 08:28:55 -0400
commitea32f6d5500b5affa7f5b11cdc05b48ad4fe7a46 (patch)
treec1f8a884fe7f3b0b3d5acc29389f2003a371a394 /stdlib/source
parent859c7485cd0e9ebe8d456ed58238bdec849bd6e1 (diff)
- FRP channels are no longer asynchronously-built lists, but rather mediums for spreading data, that can be chained together to form networks for data distribution.
Diffstat (limited to 'stdlib/source')
-rw-r--r--stdlib/source/lux/concurrency/frp.lux375
-rw-r--r--stdlib/source/lux/concurrency/stm.lux43
-rw-r--r--stdlib/source/lux/world/net/tcp.jvm.lux122
-rw-r--r--stdlib/source/lux/world/net/udp.jvm.lux3
4 files changed, 159 insertions, 384 deletions
diff --git a/stdlib/source/lux/concurrency/frp.lux b/stdlib/source/lux/concurrency/frp.lux
index 533d2a7e5..e7efcbf3d 100644
--- a/stdlib/source/lux/concurrency/frp.lux
+++ b/stdlib/source/lux/concurrency/frp.lux
@@ -1,310 +1,127 @@
(.module:
lux
- (lux (control ["F" functor]
- ["A" applicative]
- ["M" monad #+ do Monad]
- [eq #+ Eq]
- ["p" parser])
- [io #- run]
- (data (coll [list "L/" Monoid<List>])
- text/format)
- [macro]
- (macro ["s" syntax #+ syntax: Syntax]))
- (// ["&" promise]))
+ (lux (control [functor #+ Functor]
+ [applicative #+ Applicative]
+ [monad #+ do Monad])
+ [io #+ IO io]
+ (data (coll [list "list/" Monoid<List>]))
+ (type abstract))
+ (// [atom #+ Atom atom]
+ [promise #+ Promise]))
## [Types]
-(type: #export (Channel a)
- {#.doc "An asynchronous channel of values which may be closed.
-
- Reading from a channel does not remove the read piece of data, as it can still be accessed if you have an earlier node of the channel."}
- (&.Promise (Maybe [a (Channel a)])))
-
-## [Syntax]
-(syntax: #export (channel)
- {#.doc (doc "Makes an uninitialized Channel."
- (channel))}
- (wrap (list (` (&.promise #.None)))))
+(abstract: #export (Channel a)
+ {#.doc "An asynchronous channel to distribute values."}
+ (Atom (List (-> a (IO Top))))
+
+ (def: #export (channel _)
+ (All [a] (-> Top (Channel a)))
+ (@abstraction (atom (list))))
+
+ (def: #export (listen listener (^@representation channel))
+ (All [a] (-> (-> a (IO Top)) (Channel a) (IO Unit)))
+ (atom.update (|>> (#.Cons listener)) channel))
+
+ (def: #export (publish (^@representation channel) value)
+ {#.doc "Publish to a channel."}
+ (All [a] (-> (Channel a) a (IO Unit)))
+ (do io.Monad<IO>
+ [listeners (atom.read channel)
+ _ (monad.map @ (function [listener] (listener value)) listeners)]
+ (wrap [])))
+ )
## [Values]
-(def: #export (filter p xs)
+(def: #export (filter predicate input)
(All [a] (-> (-> a Bool) (Channel a) (Channel a)))
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap #.None)
- (#.Some [x xs']) (if (p x)
- (wrap (#.Some [x (filter p xs')]))
- (filter p xs')))))
-
-(def: #export (write value target)
- {#.doc "Write to a channel, so long as it's still open."}
- (All [a] (-> a (Channel a) (IO (Maybe (Channel a)))))
- (case (&.poll target)
- (^template [<case> <channel-to-write>]
- <case>
- (do Monad<IO>
- [#let [new-tail (channel)]
- done? (&.resolve (#.Some [value new-tail]) <channel-to-write>)]
- (if done?
- (wrap (#.Some new-tail))
- (write value <channel-to-write>))))
- ([#.None target]
- [(#.Some (#.Some [_ target'])) target'])
-
- _
- (:: Monad<IO> wrap #.None)
- ))
-
-(def: #export (close target)
- (All [a] (-> (Channel a) (IO Bool)))
- (case (&.poll target)
- (^template [<case> <channel-to-write>]
- <case>
- (do Monad<IO>
- [done? (&.resolve #.None <channel-to-write>)]
- (if done?
- (wrap true)
- (close <channel-to-write>))))
- ([#.None target]
- [(#.Some (#.Some [_ target'])) target'])
-
- _
- (:: Monad<IO> wrap false)
- ))
-
-(def: (pipe' input output)
- (All [a] (-> (Channel a) (Channel a) (&.Promise Unit)))
- (do &.Monad<Promise>
- [?x+xs input]
- (case ?x+xs
- #.None (wrap [])
- (#.Some [x input']) (case (io.run (write x output))
- #.None
- (wrap [])
-
- (#.Some output')
- (pipe' input' output')))))
+ (let [output (channel [])]
+ (exec (io.run (listen (function [value]
+ (if (predicate value)
+ (publish output value)
+ (io [])))
+ input))
+ output)))
-(def: #export (pipe input output)
+(def: #export (pipe output input)
{#.doc "Copy/pipe the contents of a channel on to another."}
- (All [a] (-> (Channel a) (Channel a) (&.Promise Unit)))
- (do &.Monad<Promise>
- [_ (pipe' input output)]
- (exec (io.run (close output))
- (wrap []))))
+ (All [a] (-> (Channel a) (Channel a) (IO Unit)))
+ (listen (publish output)
+ input))
-(def: #export (merge xss)
+(def: #export (merge inputs)
{#.doc "Fuse all the elements in a list of channels by piping them onto a new output channel."}
- (All [a] (-> (List (Channel a)) (Channel a)))
- (let [output (channel)]
- (exec (do &.Monad<Promise>
- [_ (M.map @ (function [input] (pipe' input output)) xss)]
- (exec (io.run (close output))
- (wrap [])))
+ (All [a] (-> (List (Channel a)) (IO (Channel a))))
+ (let [output (channel [])]
+ (do io.Monad<IO>
+ [_ (monad.map @ (pipe output) inputs)]
+ (wrap output))))
+
+(def: #export (from-promise promise)
+ (All [a] (-> (Promise a) (Channel a)))
+ (let [output (channel [])]
+ (exec (promise.await (publish output) promise)
output)))
-(def: #export (fold f init xs)
- {#.doc "Asynchronous fold over channels."}
- (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (&.Promise a)))
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap init)
- (#.Some [x xs']) (do @
- [init' (f x init)]
- (fold f init' xs')))))
-
-(def: #export (folds f init xs)
- {#.doc "A channel of folds."}
- (All [a b] (-> (-> b a (&.Promise a)) a (Channel b) (Channel a)))
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap (#.Some [init (wrap #.None)]))
- (#.Some [x xs']) (do @
- [init' (f x init)]
- (folds f init' xs')))))
-
-(def: (distinct' eq last-one xs)
- (All [a] (-> (Eq a) a (Channel a) (Channel a)))
- (let [(^open) eq]
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap #.None)
- (#.Some [x xs']) (if (= x last-one)
- (distinct' eq last-one xs')
- (wrap (#.Some [x (distinct' eq x xs')])))))))
-
-(def: #export (distinct eq xs)
- {#.doc "Multiple consecutive equal values in the input channel will just be single value in the output channel."}
- (All [a] (-> (Eq a) (Channel a) (Channel a)))
- (let [(^open) eq]
- (do &.Monad<Promise>
- [?x+xs xs]
- (case ?x+xs
- #.None (wrap #.None)
- (#.Some [x xs']) (wrap (#.Some [x (distinct' eq x xs')]))))))
-
-(def: #export (consume xs)
- {#.doc "Reads the entirety of a channel's contents and returns them as a list."}
- (All [a] (-> (Channel a) (&.Promise (List a))))
- (do &.Monad<Promise>
- [?x+xs' xs]
- (case ?x+xs'
- #.None
- (wrap #.Nil)
-
- (#.Some [x xs'])
- (do @
- [=xs (consume xs')]
- (wrap (#.Cons x =xs))))))
-
-(def: #export (once p)
- (All [a] (-> (&.Promise a) (Channel a)))
- (do &.Monad<Promise>
- [x p]
- (wrap (#.Some [x (wrap #.None)]))))
-
(def: #export (poll time action)
- (All [a] (-> Nat (IO (Maybe a)) (Channel a)))
- (do &.Monad<Promise>
- [?output (&.future action)]
- (case ?output
- #.None
- (wrap #.None)
-
- (#.Some head)
- (do @
- [_ (&.wait time)]
- (wrap (#.Some [head (poll time action)]))))))
-
-(def: #export (periodic time value)
- (All [a] (-> Nat a (Channel a)))
- (do &.Monad<Promise>
- []
- (wrap (#.Some [value (do @
- [_ (&.wait time)]
- (periodic time value))]))))
-
-(def: #export (sequential time xs)
- (All [a] (-> Nat (List a) (Channel a)))
- (do &.Monad<Promise>
- []
- (case xs
- #.Nil
- (wrap #.None)
-
- (#.Cons x xs')
- (wrap (#.Some [x (do @
- [_ (&.wait time)]
- (sequential time xs'))])))))
-
-(def: #export (cycle time values)
- (All [a] (-> Nat (List a) (Channel a)))
- (do &.Monad<Promise>
- []
- (case values
- #.Nil
- (wrap #.None)
-
- _
- (loop [xs values]
- (case xs
- #.Nil
- (recur values)
-
- (#.Cons x xs')
- (wrap (#.Some [x (do @
- [_ (&.wait time)]
- (recur xs'))])))))))
-
-## Utils
-(def: (tail xs)
- (All [a] (-> (List a) (List a)))
- (case xs
- #.Nil
- #.Nil
-
- (#.Cons _ xs')
- xs'))
+ (All [a] (-> Nat (IO a) (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [_ []]
+ (do io.Monad<IO>
+ [value action
+ _ (publish output value)]
+ (wrap (promise.await recur (promise.wait time))))))
+ output)))
-(def: #export (sliding-window max inputs)
- (All [a] (-> Nat (Channel a) (Channel (List a))))
- (let [(^open) &.Monad<Promise>]
- (folds (function [input window]
- (let [window' (L/compose window (list input))]
- (wrap (if (n/<= max (list.size window'))
- window'
- (tail window')))))
- (list)
- inputs)))
+(def: #export (periodic time)
+ (All [a] (-> Nat (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [_ []]
+ (do io.Monad<IO>
+ [_ (publish output [])]
+ (wrap (promise.await recur (promise.wait time))))))
+ output)))
(def: #export (iterate f init)
- (All [a] (-> (-> a (&.Promise (Maybe a))) a (Channel a)))
- (do &.Monad<Promise>
- []
- (wrap (#.Some [init (do @
- [?next (f init)]
- (case ?next
- #.None
- (wrap #.None)
-
- (#.Some init')
- (iterate f init')))]))))
-
-(def: #export (sample time inputs)
- (All [a] (-> Nat (Channel a) (Channel a)))
- (do &.Monad<Promise>
- [?h+t inputs]
- (case ?h+t
- #.None
- (wrap #.None)
-
- (#.Some [value inputs'])
- (do @
- [_ (&.wait time)
- #let [next-inputs (loop [last-resolved-node inputs']
- (case (&.poll last-resolved-node)
- (^multi (#.Some (#.Some [_ next-node]))
- (&.resolved? next-node))
- (recur next-node)
-
- _
- last-resolved-node))]]
- (wrap (#.Some [value (sample time next-inputs)]))))))
+ (All [a] (-> (-> a (Promise a)) a (Channel a)))
+ (let [output (channel [])]
+ (exec (io.run
+ (loop [zero init]
+ (do io.Monad<IO>
+ [_ (publish output zero)]
+ (wrap (promise.await recur (f zero))))))
+ output)))
## [Structures]
-(struct: #export _ (F.Functor Channel)
- (def: (map f xs)
- (:: &.Functor<Promise> map
- (function [?x+xs]
- (case ?x+xs
- #.None #.None
- (#.Some [x xs']) (#.Some [(f x) (map f xs')])))
- xs)))
+(struct: #export _ (Functor Channel)
+ (def: (map f input)
+ (let [output (channel [])]
+ (exec (io.run (listen (|>> f (publish output))
+ input))
+ output))))
-(struct: #export _ (A.Applicative Channel)
+(struct: #export _ (Applicative Channel)
(def: functor Functor<Channel>)
(def: (wrap a)
- (let [(^open) &.Monad<Promise>]
- (wrap (#.Some [a (wrap #.None)]))))
+ (let [output (channel [])]
+ (exec (io.run (publish output a))
+ output)))
(def: (apply ff fa)
- (let [fb (channel)]
- (exec (let [(^open) Functor<Channel>]
- (map (function [f] (pipe (map f fa) fb))
- ff))
- fb))))
+ (let [output (channel [])]
+ (exec (io.run (listen (function [f]
+ (listen (|>> f (publish output))
+ fa))
+ ff))
+ output))))
(struct: #export _ (Monad Channel)
(def: applicative Applicative<Channel>)
(def: (join mma)
- (let [output (channel)]
- (exec (let [(^open) Functor<Channel>]
- (map (function [ma]
- (pipe ma output))
- mma))
+ (let [output (channel [])]
+ (exec (io.run (listen (listen (publish output))
+ mma))
output))))
diff --git a/stdlib/source/lux/concurrency/stm.lux b/stdlib/source/lux/concurrency/stm.lux
index 9b1cabe01..cc609a055 100644
--- a/stdlib/source/lux/concurrency/stm.lux
+++ b/stdlib/source/lux/concurrency/stm.lux
@@ -6,10 +6,7 @@
[io #+ IO io]
(data [product]
[maybe]
- [number "nat/" Codec<Text,Nat>]
- [text]
- (coll [list "list/" Functor<List> Fold<List>]
- [dict #+ Dict]))
+ (coll [list "list/" Functor<List> Fold<List>]))
(concurrency [atom #+ Atom atom]
[promise #+ Promise promise]
[frp "frp/" Functor<Channel>])
@@ -17,12 +14,12 @@
(abstract: #export (Var a)
{#.doc "A mutable cell containing a value, and observers that will be alerted of any change to it."}
- (Atom [a (Dict Text (-> a (IO Unit)))])
+ (Atom [a (List (-> a (IO Unit)))])
(def: #export (var value)
{#.doc "Creates a new STM var, with a default value."}
(All [a] (-> a (Var a)))
- (@abstraction (atom.atom [value (dict.new text.Hash<Text>)])))
+ (@abstraction (atom.atom [value (list)])))
(def: read!!
(All [a] (-> (Var a) a))
@@ -42,43 +39,21 @@
succeeded? (atom.compare-and-swap old [new-value _observers] var)]
(if succeeded?
(do @
- [_ (|> _observers
- dict.values
- (monad.map @ (function [f] (f new-value))))]
+ [_ (monad.map @ (function [f] (f new-value)) _observers)]
(wrap []))
(write! new-value (@abstraction var)))))
(def: #export (follow (^@representation target))
{#.doc "Creates a channel that will receive all changes to the value of the given var."}
(All [a] (-> (Var a) (IO (frp.Channel a))))
- (let [head (: (frp.Channel ($ +0)) (frp.channel))
- ## head (frp.channel)
- channel-var (var head)
- observer (function [label value]
- (case (io.run (|> channel-var read!! (frp.write value)))
- #.None
- ## By closing the output Channel, the
- ## observer becomes obsolete.
- (atom.update (function [[value observers]]
- [value (dict.remove label observers)])
- target)
-
- (#.Some tail')
- (write! tail' channel-var)))]
+ (let [channel (: (frp.Channel ($ +0)) (frp.channel []))
+ ## channel (frp.channel)
+ ]
(do io.Monad<IO>
[_ (atom.update (function [[value observers]]
- (let [label (nat/encode (list/fold (function [key base]
- (case (nat/decode key)
- (#.Left _)
- base
-
- (#.Right key-num)
- (n/max key-num base)))
- +0
- (dict.keys observers)))]
- [value (dict.put label (observer label) observers)]))
+ [value (#.Cons (frp.publish channel) observers)])
target)]
- (wrap head))))
+ (wrap channel))))
)
(type: (Tx-Frame a)
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux
index fcd037578..e551e995d 100644
--- a/stdlib/source/lux/world/net/tcp.jvm.lux
+++ b/stdlib/source/lux/world/net/tcp.jvm.lux
@@ -1,13 +1,13 @@
(.module:
lux
(lux (control monad)
- (concurrency ["P" promise]
- ["T" task]
+ (concurrency [promise #+ Promise promise]
+ [task #+ Task]
[frp])
(data ["e" error])
(type abstract)
(world [blob #+ Blob])
- [io]
+ [io #+ Process]
[host])
[//])
@@ -42,90 +42,74 @@
#out OutputStream}
(def: #export (read data offset length self)
- (let [in (get@ #in (@representation self))]
- (P.future
- (do (e.ErrorT io.Monad<IO>)
- [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)]
- in)]
- (wrap (int-to-nat bytes-read))))))
+ (-> Blob Nat Nat TCP (Task Nat))
+ (promise.future
+ (do io.Monad<Process>
+ [bytes-read (InputStream::read [data (nat-to-int offset) (nat-to-int length)]
+ (get@ #in (@representation self)))]
+ (wrap (int-to-nat bytes-read)))))
(def: #export (write data offset length self)
+ (-> Blob Nat Nat TCP (Task Unit))
(let [out (get@ #out (@representation self))]
- (P.future
- (do (e.ErrorT io.Monad<IO>)
+ (promise.future
+ (do io.Monad<Process>
[_ (OutputStream::write [data (nat-to-int offset) (nat-to-int length)]
out)]
(Flushable::flush [] out)))))
(def: #export (close self)
+ (-> TCP (Task Unit))
(let [(^open) (@representation self)]
- (P.future
- (do (e.ErrorT io.Monad<IO>)
+ (promise.future
+ (do io.Monad<Process>
[_ (AutoCloseable::close [] in)
_ (AutoCloseable::close [] out)]
(AutoCloseable::close [] socket)))))
(def: (tcp-client socket)
- (-> Socket (io.IO (e.Error TCP)))
- (do (e.ErrorT io.Monad<IO>)
+ (-> Socket (Process TCP))
+ (do io.Monad<Process>
[input (Socket::getInputStream [] socket)
output (Socket::getOutputStream [] socket)]
(wrap (@abstraction {#socket socket
#in input
#out output}))))
+ )
- (def: #export (client address port)
- (-> //.Address //.Port (T.Task TCP))
- (P.future
- (do (e.ErrorT io.Monad<IO>)
- [socket (Socket::new [address (nat-to-int port)])]
- (tcp-client socket))))
-
- (def: (await-server-release client-channel server)
- (-> (frp.Channel TCP) ServerSocket (P.Promise Unit))
- (do P.Monad<Promise>
- [outcome client-channel]
- (case outcome
- ## Channel has been closed.
- ## Must close associated server.
- #.None
- (P.future
- (do io.Monad<IO>
- [_ (AutoCloseable::close [] server)]
- (wrap [])))
-
- ## A client was generated.
- ## Nothing to be done...
- (#.Some _)
- (wrap []))))
+(def: #export (client address port)
+ (-> //.Address //.Port (Task TCP))
+ (promise.future
+ (do io.Monad<Process>
+ [socket (Socket::new [address (nat-to-int port)])]
+ (tcp-client socket))))
- (def: #export (server port)
- (-> //.Port (T.Task (frp.Channel TCP)))
- (P.future
- (do (e.ErrorT io.Monad<IO>)
- [server (ServerSocket::new [(nat-to-int port)])
- #let [output (: (frp.Channel TCP)
- (frp.channel))
- _ (: (P.Promise Bool)
- (P.future
- (loop [tail output]
- (do io.Monad<IO>
- [?client (do (e.ErrorT io.Monad<IO>)
- [socket (ServerSocket::accept [] server)]
- (tcp-client socket))]
- (case ?client
- (#e.Error error)
- (frp.close tail)
-
- (#e.Success client)
- (do @
- [?tail' (frp.write client tail)]
- (case ?tail'
- #.None
- (wrap true)
-
- (#.Some tail')
- (exec (await-server-release tail' server)
- (recur tail')))))))))]]
- (wrap output))))
- )
+(def: #export (server port)
+ (-> //.Port (Task [(Promise Unit)
+ (frp.Channel TCP)]))
+ (promise.future
+ (do (e.ErrorT io.Monad<IO>)
+ [server (ServerSocket::new [(nat-to-int port)])
+ #let [signal (: (Promise Unit)
+ (promise #.None))
+ _ (promise.await (function [_]
+ (AutoCloseable::close [] server))
+ signal)
+ output (: (frp.Channel TCP)
+ (frp.channel []))
+ _ (: (Promise Unit)
+ (promise.future
+ (loop [_ []]
+ (do io.Monad<IO>
+ [?client (do (e.ErrorT io.Monad<IO>)
+ [socket (ServerSocket::accept [] server)]
+ (tcp-client socket))]
+ (case ?client
+ (#e.Error error)
+ (wrap [])
+
+ (#e.Success client)
+ (do @
+ [_ (frp.publish output client)]
+ (recur [])))))))]]
+ (wrap [signal output]))))
diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux
index caec294cd..c0aa54b77 100644
--- a/stdlib/source/lux/world/net/udp.jvm.lux
+++ b/stdlib/source/lux/world/net/udp.jvm.lux
@@ -3,8 +3,7 @@
(lux (control monad
["ex" exception #+ exception:])
(concurrency ["P" promise]
- ["T" task]
- [frp])
+ ["T" task])
(data ["e" error]
[maybe]
(coll [array]))