aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEduardo Julian2017-09-04 19:42:38 -0400
committerEduardo Julian2017-09-04 19:42:38 -0400
commit970e950aeb08179972d9e3460af3003771fc8511 (patch)
treebaecc5a1ece9ce0e6ac48478bb5cb87f2ab5b777
parentf08a9fb208a32ee8f450649095c4f8a0f05931da (diff)
- Implemented network I/O for TCP and UDP (in the JVM).
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/world/net.lux5
-rw-r--r--stdlib/source/lux/world/net/tcp.jvm.lux130
-rw-r--r--stdlib/source/lux/world/net/udp.jvm.lux99
-rw-r--r--stdlib/test/test/lux/world/net/tcp.lux70
-rw-r--r--stdlib/test/test/lux/world/net/udp.lux70
-rw-r--r--stdlib/test/tests.lux4
6 files changed, 377 insertions, 1 deletions
diff --git a/stdlib/source/lux/world/net.lux b/stdlib/source/lux/world/net.lux
new file mode 100644
index 000000000..c9e2829e5
--- /dev/null
+++ b/stdlib/source/lux/world/net.lux
@@ -0,0 +1,5 @@
+(;module:
+ lux)
+
+(type: #export Address Text)
+(type: #export Port Nat)
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux
new file mode 100644
index 000000000..bb7a36c39
--- /dev/null
+++ b/stdlib/source/lux/world/net/tcp.jvm.lux
@@ -0,0 +1,130 @@
+(;module:
+ lux
+ (lux (control monad)
+ (concurrency ["P" promise]
+ ["T" task]
+ [frp])
+ (data ["R" result])
+ (type opaque)
+ (world [blob #+ Blob])
+ [io]
+ [host #+ jvm-import])
+ [..])
+
+(jvm-import java.lang.AutoCloseable
+ (close [] #io #try void))
+
+(jvm-import java.io.Flushable
+ (flush [] #io #try void))
+
+(jvm-import java.io.InputStream
+ (read [Byte-Array int int] #io #try int))
+
+(jvm-import java.io.OutputStream
+ (write [Byte-Array int int] #io #try void))
+
+(jvm-import java.net.Socket
+ (new [String int] #io #try)
+ (getInputStream [] #io #try InputStream)
+ (getOutputStream [] #io #try OutputStream))
+
+(jvm-import java.net.ServerSocket
+ (new [int] #io #try)
+ (accept [] #io #try Socket))
+
+############################################################
+############################################################
+############################################################
+
+(opaque: #export TCP {}
+ {#socket Socket
+ #in InputStream
+ #out OutputStream}
+
+ (def: #export (read data offset length self)
+ (let [in (get@ #in (@repr self))]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [bytes-read (InputStream.read [data (nat-to-int offset) (nat-to-int length)]
+ in)]
+ (wrap (int-to-nat bytes-read))))))
+
+ (def: #export (write data offset length self)
+ (let [out (get@ #out (@repr self))]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [_ (OutputStream.write [data (nat-to-int offset) (nat-to-int length)]
+ out)]
+ (Flushable.flush [] out)))))
+
+ (def: #export (close self)
+ (let [(^open) (@repr self)]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [_ (AutoCloseable.close [] in)
+ _ (AutoCloseable.close [] out)]
+ (AutoCloseable.close [] socket)))))
+
+ (def: (tcp-client socket)
+ (-> Socket (io;IO (R;Result TCP)))
+ (do (R;ResultT io;Monad<IO>)
+ [input (Socket.getInputStream [] socket)
+ output (Socket.getOutputStream [] socket)]
+ (wrap (@opaque {#socket socket
+ #in input
+ #out output}))))
+
+ (def: #export (client address port)
+ (-> ..;Address ..;Port (T;Task TCP))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [socket (Socket.new [address (nat-to-int port)])]
+ (tcp-client socket))))
+
+ (def: (await-server-release client-channel server)
+ (-> (frp;Channel TCP) ServerSocket (P;Promise Unit))
+ (do P;Monad<Promise>
+ [outcome client-channel]
+ (case outcome
+ ## Channel has been closed.
+ ## Must close associated server.
+ #;None
+ (P;future
+ (do io;Monad<IO>
+ [_ (AutoCloseable.close [] server)]
+ (wrap [])))
+
+ ## A client was generated.
+ ## Nothing to be done...
+ (#;Some _)
+ (wrap []))))
+
+ (def: #export (server port)
+ (-> ..;Port (T;Task (frp;Channel TCP)))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [server (ServerSocket.new [(nat-to-int port)])
+ #let [output (frp;channel TCP)
+ _ (: (P;Promise Bool)
+ (P;future
+ (loop [tail output]
+ (do io;Monad<IO>
+ [?client (do (R;ResultT io;Monad<IO>)
+ [socket (ServerSocket.accept [] server)]
+ (tcp-client socket))]
+ (case ?client
+ (#R;Error error)
+ (frp;close tail)
+
+ (#R;Success client)
+ (do @
+ [?tail' (frp;write client tail)]
+ (case ?tail'
+ #;None
+ (wrap true)
+
+ (#;Some tail')
+ (exec (await-server-release tail' server)
+ (recur tail')))))))))]]
+ (wrap output))))
+ )
diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux
new file mode 100644
index 000000000..4bf95a03e
--- /dev/null
+++ b/stdlib/source/lux/world/net/udp.jvm.lux
@@ -0,0 +1,99 @@
+(;module:
+ lux
+ (lux (control monad
+ ["ex" exception #+ exception:])
+ (concurrency ["P" promise]
+ ["T" task]
+ [frp])
+ (data ["R" result]
+ (coll [array]))
+ (type opaque)
+ (world [blob #+ Blob])
+ [io]
+ [host #+ jvm-import])
+ [..])
+
+(jvm-import java.lang.AutoCloseable
+ (close [] #io #try void))
+
+(jvm-import java.io.Flushable
+ (flush [] #io #try void))
+
+(jvm-import java.net.InetAddress
+ (#static getAllByName [String] #io #try (Array InetAddress))
+ (getHostAddress [] String))
+
+(jvm-import java.net.DatagramPacket
+ (new #as new|send [Byte-Array int int InetAddress int])
+ (new #as new|receive [Byte-Array int int])
+ (getAddress [] InetAddress)
+ (getPort [] int)
+ (getLength [] int))
+
+(jvm-import java.net.DatagramSocket
+ (new #as new|client [] #io #try)
+ (new #as new|server [int] #io #try)
+ (receive [DatagramPacket] #io #try void)
+ (send [DatagramPacket] #io #try void))
+
+############################################################
+############################################################
+############################################################
+
+(exception: #export Cannot-Resolve-Address)
+(exception: #export Multiple-Candidate-Addresses)
+
+(def: (resolve address)
+ (-> ..;Address (io;IO (R;Result InetAddress)))
+ (do (R;ResultT io;Monad<IO>)
+ [addresses (InetAddress.getAllByName [address])]
+ (: (io;IO (R;Result InetAddress))
+ (case (array;size addresses)
+ +0 (io;io (ex;throw Cannot-Resolve-Address address))
+ +1 (wrap (assume (array;get +0 addresses)))
+ _ (io;io (ex;throw Multiple-Candidate-Addresses address))))))
+
+(opaque: #export UDP {}
+ {#socket DatagramSocket}
+
+ (def: #export (read data offset length self)
+ (-> Blob Nat Nat UDP (T;Task [Nat ..;Address ..;Port]))
+ (let [(^open) (@repr self)
+ packet (DatagramPacket.new|receive [data (nat-to-int offset) (nat-to-int length)])]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [_ (DatagramSocket.receive [packet] socket)
+ #let [bytes-read (int-to-nat (DatagramPacket.getLength [] packet))]]
+ (wrap [bytes-read
+ (|> packet (DatagramPacket.getAddress []) (InetAddress.getHostAddress []))
+ (int-to-nat (DatagramPacket.getPort [] packet))])))))
+
+ (def: #export (write address port data offset length self)
+ (-> ..;Address ..;Port Blob Nat Nat UDP (T;Task Unit))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [address (resolve address)
+ #let [(^open) (@repr self)]]
+ (DatagramSocket.send (DatagramPacket.new|send [data (nat-to-int offset) (nat-to-int length) address (nat-to-int port)])
+ socket))))
+
+ (def: #export (close self)
+ (-> UDP (T;Task Unit))
+ (let [(^open) (@repr self)]
+ (P;future
+ (AutoCloseable.close [] socket))))
+
+ (def: #export (client _)
+ (-> Unit (T;Task UDP))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [socket (DatagramSocket.new|client [])]
+ (wrap (@opaque (#socket socket))))))
+
+ (def: #export (server port)
+ (-> ..;Port (T;Task UDP))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [socket (DatagramSocket.new|server [(nat-to-int port)])]
+ (wrap (@opaque (#socket socket))))))
+ )
diff --git a/stdlib/test/test/lux/world/net/tcp.lux b/stdlib/test/test/lux/world/net/tcp.lux
new file mode 100644
index 000000000..4c975b4dd
--- /dev/null
+++ b/stdlib/test/test/lux/world/net/tcp.lux
@@ -0,0 +1,70 @@
+(;module:
+ lux
+ (lux [io]
+ (control [monad #+ do]
+ ["ex" exception #+ exception:])
+ (concurrency ["P" promise]
+ ["T" task]
+ [frp])
+ (data ["R" result]
+ [text]
+ text/format)
+ (world [blob]
+ [net]
+ (net ["@" tcp]))
+ ["r" math/random])
+ lux/test
+ (../.. ["_;" blob]))
+
+(def: localhost net;Address "127.0.0.1")
+(def: port
+ (r;Random net;Port)
+ (|> r;nat
+ (:: r;Monad<Random> map
+ (|>. (n.% +1000)
+ (n.+ +8000)))))
+
+(exception: Empty-Channel)
+
+(def: (head channel)
+ (All [a] (-> (frp;Channel a) (T;Task a)))
+ (do P;Monad<Promise>
+ [head+tail channel]
+ (case head+tail
+ (#;Some [head tail])
+ (wrap (ex;return head))
+
+ #;None
+ (wrap (ex;throw Empty-Channel "")))))
+
+(context: "TCP networking."
+ #times +1
+ [port ;;port
+ size (|> r;nat (:: @ map (|>. (n.% +100) (n.max +10))))
+ from (_blob;blob size)
+ to (_blob;blob size)
+ #let [temp (blob;create size)]]
+ ($_ seq
+ (do P;Monad<Promise>
+ [result (do T;Monad<Task>
+ [server (@;server port)
+ client (@;client localhost port)
+ ####################
+ _ (@;write from +0 size client)
+ socket (head server)
+ bytes-from (@;read temp +0 size socket)
+ #let [from-worked? (and (n.= size bytes-from)
+ (:: blob;Eq<Blob> = from temp))]
+ ####################
+ _ (@;write to +0 size socket)
+ bytes-to (@;read temp +0 size client)
+ #let [to-worked? (and (n.= size bytes-to)
+ (:: blob;Eq<Blob> = to temp))]
+ ####################
+ _ (@;close client)
+ _ (T;from-promise (P;future (frp;close server)))]
+ (wrap (and from-worked?
+ to-worked?)))]
+ (test "Can communicate between client and server."
+ (R;default false result)))
+ ))
diff --git a/stdlib/test/test/lux/world/net/udp.lux b/stdlib/test/test/lux/world/net/udp.lux
new file mode 100644
index 000000000..8af67bcd5
--- /dev/null
+++ b/stdlib/test/test/lux/world/net/udp.lux
@@ -0,0 +1,70 @@
+(;module:
+ lux
+ (lux [io]
+ (control [monad #+ do]
+ ["ex" exception #+ exception:])
+ (concurrency ["P" promise]
+ ["T" task]
+ [frp])
+ (data ["R" result]
+ [text]
+ text/format)
+ (world [blob]
+ [net]
+ (net ["@" udp]))
+ ["r" math/random])
+ lux/test
+ (../.. ["_;" blob]))
+
+(def: localhost net;Address "127.0.0.1")
+(def: port
+ (r;Random net;Port)
+ (|> r;nat
+ (:: r;Monad<Random> map
+ (|>. (n.% +1000)
+ (n.+ +8000)))))
+
+(exception: Empty-Channel)
+
+(def: (head channel)
+ (All [a] (-> (frp;Channel a) (T;Task a)))
+ (do P;Monad<Promise>
+ [head+tail channel]
+ (case head+tail
+ (#;Some [head tail])
+ (wrap (ex;return head))
+
+ #;None
+ (wrap (ex;throw Empty-Channel "")))))
+
+(context: "UDP networking."
+ #times +1
+ [port ;;port
+ size (|> r;nat (:: @ map (|>. (n.% +100) (n.max +10))))
+ from (_blob;blob size)
+ to (_blob;blob size)
+ #let [temp (blob;create size)]]
+ ($_ seq
+ (do P;Monad<Promise>
+ [result (do T;Monad<Task>
+ [server (@;server port)
+ client (@;client [])
+ ####################
+ _ (@;write localhost port from +0 size client)
+ [bytes-from from-address from-port] (@;read temp +0 size server)
+ #let [from-worked? (and (n.= size bytes-from)
+ (:: blob;Eq<Blob> = from temp))]
+ ####################
+ _ (@;write from-address from-port to +0 size server)
+ [bytes-to to-address to-port] (@;read temp +0 size client)
+ #let [to-worked? (and (n.= size bytes-to)
+ (:: blob;Eq<Blob> = to temp)
+ (n.= port to-port))]
+ ####################
+ _ (@;close client)
+ _ (@;close server)]
+ (wrap (and from-worked?
+ to-worked?)))]
+ (test "Can communicate between client and server."
+ (R;default false result)))
+ ))
diff --git a/stdlib/test/tests.lux b/stdlib/test/tests.lux
index ee503c94c..fffe409c4 100644
--- a/stdlib/test/tests.lux
+++ b/stdlib/test/tests.lux
@@ -71,7 +71,9 @@
["_;" auto]
["_;" object])
(world ["_;" blob]
- ["_;" fs])
+ ["_;" fs]
+ (net ["_;" tcp]
+ ["_;" udp]))
))
(lux (control [contract]
[concatenative])