From 970e950aeb08179972d9e3460af3003771fc8511 Mon Sep 17 00:00:00 2001 From: Eduardo Julian Date: Mon, 4 Sep 2017 19:42:38 -0400 Subject: - Implemented network I/O for TCP and UDP (in the JVM). --- stdlib/source/lux/world/net.lux | 5 ++ stdlib/source/lux/world/net/tcp.jvm.lux | 130 ++++++++++++++++++++++++++++++++ stdlib/source/lux/world/net/udp.jvm.lux | 99 ++++++++++++++++++++++++ stdlib/test/test/lux/world/net/tcp.lux | 70 +++++++++++++++++ stdlib/test/test/lux/world/net/udp.lux | 70 +++++++++++++++++ stdlib/test/tests.lux | 4 +- 6 files changed, 377 insertions(+), 1 deletion(-) create mode 100644 stdlib/source/lux/world/net.lux create mode 100644 stdlib/source/lux/world/net/tcp.jvm.lux create mode 100644 stdlib/source/lux/world/net/udp.jvm.lux create mode 100644 stdlib/test/test/lux/world/net/tcp.lux create mode 100644 stdlib/test/test/lux/world/net/udp.lux diff --git a/stdlib/source/lux/world/net.lux b/stdlib/source/lux/world/net.lux new file mode 100644 index 000000000..c9e2829e5 --- /dev/null +++ b/stdlib/source/lux/world/net.lux @@ -0,0 +1,5 @@ +(;module: + lux) + +(type: #export Address Text) +(type: #export Port Nat) diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux new file mode 100644 index 000000000..bb7a36c39 --- /dev/null +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -0,0 +1,130 @@ +(;module: + lux + (lux (control monad) + (concurrency ["P" promise] + ["T" task] + [frp]) + (data ["R" result]) + (type opaque) + (world [blob #+ Blob]) + [io] + [host #+ jvm-import]) + [..]) + +(jvm-import java.lang.AutoCloseable + (close [] #io #try void)) + +(jvm-import java.io.Flushable + (flush [] #io #try void)) + +(jvm-import java.io.InputStream + (read [Byte-Array int int] #io #try int)) + +(jvm-import java.io.OutputStream + (write [Byte-Array int int] #io #try void)) + +(jvm-import java.net.Socket + (new [String int] #io #try) + (getInputStream [] #io #try InputStream) + (getOutputStream [] #io #try OutputStream)) + +(jvm-import java.net.ServerSocket + (new [int] #io #try) + (accept [] #io #try Socket)) + +############################################################ +############################################################ +############################################################ + +(opaque: #export TCP {} + {#socket Socket + #in InputStream + #out OutputStream} + + (def: #export (read data offset length self) + (let [in (get@ #in (@repr self))] + (P;future + (do (R;ResultT io;Monad) + [bytes-read (InputStream.read [data (nat-to-int offset) (nat-to-int length)] + in)] + (wrap (int-to-nat bytes-read)))))) + + (def: #export (write data offset length self) + (let [out (get@ #out (@repr self))] + (P;future + (do (R;ResultT io;Monad) + [_ (OutputStream.write [data (nat-to-int offset) (nat-to-int length)] + out)] + (Flushable.flush [] out))))) + + (def: #export (close self) + (let [(^open) (@repr self)] + (P;future + (do (R;ResultT io;Monad) + [_ (AutoCloseable.close [] in) + _ (AutoCloseable.close [] out)] + (AutoCloseable.close [] socket))))) + + (def: (tcp-client socket) + (-> Socket (io;IO (R;Result TCP))) + (do (R;ResultT io;Monad) + [input (Socket.getInputStream [] socket) + output (Socket.getOutputStream [] socket)] + (wrap (@opaque {#socket socket + #in input + #out output})))) + + (def: #export (client address port) + (-> ..;Address ..;Port (T;Task TCP)) + (P;future + (do (R;ResultT io;Monad) + [socket (Socket.new [address (nat-to-int port)])] + (tcp-client socket)))) + + (def: (await-server-release client-channel server) + (-> (frp;Channel TCP) ServerSocket (P;Promise Unit)) + (do P;Monad + [outcome client-channel] + (case outcome + ## Channel has been closed. + ## Must close associated server. + #;None + (P;future + (do io;Monad + [_ (AutoCloseable.close [] server)] + (wrap []))) + + ## A client was generated. + ## Nothing to be done... + (#;Some _) + (wrap [])))) + + (def: #export (server port) + (-> ..;Port (T;Task (frp;Channel TCP))) + (P;future + (do (R;ResultT io;Monad) + [server (ServerSocket.new [(nat-to-int port)]) + #let [output (frp;channel TCP) + _ (: (P;Promise Bool) + (P;future + (loop [tail output] + (do io;Monad + [?client (do (R;ResultT io;Monad) + [socket (ServerSocket.accept [] server)] + (tcp-client socket))] + (case ?client + (#R;Error error) + (frp;close tail) + + (#R;Success client) + (do @ + [?tail' (frp;write client tail)] + (case ?tail' + #;None + (wrap true) + + (#;Some tail') + (exec (await-server-release tail' server) + (recur tail')))))))))]] + (wrap output)))) + ) diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux new file mode 100644 index 000000000..4bf95a03e --- /dev/null +++ b/stdlib/source/lux/world/net/udp.jvm.lux @@ -0,0 +1,99 @@ +(;module: + lux + (lux (control monad + ["ex" exception #+ exception:]) + (concurrency ["P" promise] + ["T" task] + [frp]) + (data ["R" result] + (coll [array])) + (type opaque) + (world [blob #+ Blob]) + [io] + [host #+ jvm-import]) + [..]) + +(jvm-import java.lang.AutoCloseable + (close [] #io #try void)) + +(jvm-import java.io.Flushable + (flush [] #io #try void)) + +(jvm-import java.net.InetAddress + (#static getAllByName [String] #io #try (Array InetAddress)) + (getHostAddress [] String)) + +(jvm-import java.net.DatagramPacket + (new #as new|send [Byte-Array int int InetAddress int]) + (new #as new|receive [Byte-Array int int]) + (getAddress [] InetAddress) + (getPort [] int) + (getLength [] int)) + +(jvm-import java.net.DatagramSocket + (new #as new|client [] #io #try) + (new #as new|server [int] #io #try) + (receive [DatagramPacket] #io #try void) + (send [DatagramPacket] #io #try void)) + +############################################################ +############################################################ +############################################################ + +(exception: #export Cannot-Resolve-Address) +(exception: #export Multiple-Candidate-Addresses) + +(def: (resolve address) + (-> ..;Address (io;IO (R;Result InetAddress))) + (do (R;ResultT io;Monad) + [addresses (InetAddress.getAllByName [address])] + (: (io;IO (R;Result InetAddress)) + (case (array;size addresses) + +0 (io;io (ex;throw Cannot-Resolve-Address address)) + +1 (wrap (assume (array;get +0 addresses))) + _ (io;io (ex;throw Multiple-Candidate-Addresses address)))))) + +(opaque: #export UDP {} + {#socket DatagramSocket} + + (def: #export (read data offset length self) + (-> Blob Nat Nat UDP (T;Task [Nat ..;Address ..;Port])) + (let [(^open) (@repr self) + packet (DatagramPacket.new|receive [data (nat-to-int offset) (nat-to-int length)])] + (P;future + (do (R;ResultT io;Monad) + [_ (DatagramSocket.receive [packet] socket) + #let [bytes-read (int-to-nat (DatagramPacket.getLength [] packet))]] + (wrap [bytes-read + (|> packet (DatagramPacket.getAddress []) (InetAddress.getHostAddress [])) + (int-to-nat (DatagramPacket.getPort [] packet))]))))) + + (def: #export (write address port data offset length self) + (-> ..;Address ..;Port Blob Nat Nat UDP (T;Task Unit)) + (P;future + (do (R;ResultT io;Monad) + [address (resolve address) + #let [(^open) (@repr self)]] + (DatagramSocket.send (DatagramPacket.new|send [data (nat-to-int offset) (nat-to-int length) address (nat-to-int port)]) + socket)))) + + (def: #export (close self) + (-> UDP (T;Task Unit)) + (let [(^open) (@repr self)] + (P;future + (AutoCloseable.close [] socket)))) + + (def: #export (client _) + (-> Unit (T;Task UDP)) + (P;future + (do (R;ResultT io;Monad) + [socket (DatagramSocket.new|client [])] + (wrap (@opaque (#socket socket)))))) + + (def: #export (server port) + (-> ..;Port (T;Task UDP)) + (P;future + (do (R;ResultT io;Monad) + [socket (DatagramSocket.new|server [(nat-to-int port)])] + (wrap (@opaque (#socket socket)))))) + ) diff --git a/stdlib/test/test/lux/world/net/tcp.lux b/stdlib/test/test/lux/world/net/tcp.lux new file mode 100644 index 000000000..4c975b4dd --- /dev/null +++ b/stdlib/test/test/lux/world/net/tcp.lux @@ -0,0 +1,70 @@ +(;module: + lux + (lux [io] + (control [monad #+ do] + ["ex" exception #+ exception:]) + (concurrency ["P" promise] + ["T" task] + [frp]) + (data ["R" result] + [text] + text/format) + (world [blob] + [net] + (net ["@" tcp])) + ["r" math/random]) + lux/test + (../.. ["_;" blob])) + +(def: localhost net;Address "127.0.0.1") +(def: port + (r;Random net;Port) + (|> r;nat + (:: r;Monad map + (|>. (n.% +1000) + (n.+ +8000))))) + +(exception: Empty-Channel) + +(def: (head channel) + (All [a] (-> (frp;Channel a) (T;Task a))) + (do P;Monad + [head+tail channel] + (case head+tail + (#;Some [head tail]) + (wrap (ex;return head)) + + #;None + (wrap (ex;throw Empty-Channel ""))))) + +(context: "TCP networking." + #times +1 + [port ;;port + size (|> r;nat (:: @ map (|>. (n.% +100) (n.max +10)))) + from (_blob;blob size) + to (_blob;blob size) + #let [temp (blob;create size)]] + ($_ seq + (do P;Monad + [result (do T;Monad + [server (@;server port) + client (@;client localhost port) + #################### + _ (@;write from +0 size client) + socket (head server) + bytes-from (@;read temp +0 size socket) + #let [from-worked? (and (n.= size bytes-from) + (:: blob;Eq = from temp))] + #################### + _ (@;write to +0 size socket) + bytes-to (@;read temp +0 size client) + #let [to-worked? (and (n.= size bytes-to) + (:: blob;Eq = to temp))] + #################### + _ (@;close client) + _ (T;from-promise (P;future (frp;close server)))] + (wrap (and from-worked? + to-worked?)))] + (test "Can communicate between client and server." + (R;default false result))) + )) diff --git a/stdlib/test/test/lux/world/net/udp.lux b/stdlib/test/test/lux/world/net/udp.lux new file mode 100644 index 000000000..8af67bcd5 --- /dev/null +++ b/stdlib/test/test/lux/world/net/udp.lux @@ -0,0 +1,70 @@ +(;module: + lux + (lux [io] + (control [monad #+ do] + ["ex" exception #+ exception:]) + (concurrency ["P" promise] + ["T" task] + [frp]) + (data ["R" result] + [text] + text/format) + (world [blob] + [net] + (net ["@" udp])) + ["r" math/random]) + lux/test + (../.. ["_;" blob])) + +(def: localhost net;Address "127.0.0.1") +(def: port + (r;Random net;Port) + (|> r;nat + (:: r;Monad map + (|>. (n.% +1000) + (n.+ +8000))))) + +(exception: Empty-Channel) + +(def: (head channel) + (All [a] (-> (frp;Channel a) (T;Task a))) + (do P;Monad + [head+tail channel] + (case head+tail + (#;Some [head tail]) + (wrap (ex;return head)) + + #;None + (wrap (ex;throw Empty-Channel ""))))) + +(context: "UDP networking." + #times +1 + [port ;;port + size (|> r;nat (:: @ map (|>. (n.% +100) (n.max +10)))) + from (_blob;blob size) + to (_blob;blob size) + #let [temp (blob;create size)]] + ($_ seq + (do P;Monad + [result (do T;Monad + [server (@;server port) + client (@;client []) + #################### + _ (@;write localhost port from +0 size client) + [bytes-from from-address from-port] (@;read temp +0 size server) + #let [from-worked? (and (n.= size bytes-from) + (:: blob;Eq = from temp))] + #################### + _ (@;write from-address from-port to +0 size server) + [bytes-to to-address to-port] (@;read temp +0 size client) + #let [to-worked? (and (n.= size bytes-to) + (:: blob;Eq = to temp) + (n.= port to-port))] + #################### + _ (@;close client) + _ (@;close server)] + (wrap (and from-worked? + to-worked?)))] + (test "Can communicate between client and server." + (R;default false result))) + )) diff --git a/stdlib/test/tests.lux b/stdlib/test/tests.lux index ee503c94c..fffe409c4 100644 --- a/stdlib/test/tests.lux +++ b/stdlib/test/tests.lux @@ -71,7 +71,9 @@ ["_;" auto] ["_;" object]) (world ["_;" blob] - ["_;" fs]) + ["_;" fs] + (net ["_;" tcp] + ["_;" udp])) )) (lux (control [contract] [concatenative]) -- cgit v1.2.3