diff options
Diffstat (limited to 'stdlib/source')
-rw-r--r-- | stdlib/source/lux/world/net.lux | 5 | ||||
-rw-r--r-- | stdlib/source/lux/world/net/tcp.jvm.lux | 130 | ||||
-rw-r--r-- | stdlib/source/lux/world/net/udp.jvm.lux | 99 |
3 files changed, 234 insertions, 0 deletions
diff --git a/stdlib/source/lux/world/net.lux b/stdlib/source/lux/world/net.lux new file mode 100644 index 000000000..c9e2829e5 --- /dev/null +++ b/stdlib/source/lux/world/net.lux @@ -0,0 +1,5 @@ +(;module: + lux) + +(type: #export Address Text) +(type: #export Port Nat) diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux new file mode 100644 index 000000000..bb7a36c39 --- /dev/null +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -0,0 +1,130 @@ +(;module: + lux + (lux (control monad) + (concurrency ["P" promise] + ["T" task] + [frp]) + (data ["R" result]) + (type opaque) + (world [blob #+ Blob]) + [io] + [host #+ jvm-import]) + [..]) + +(jvm-import java.lang.AutoCloseable + (close [] #io #try void)) + +(jvm-import java.io.Flushable + (flush [] #io #try void)) + +(jvm-import java.io.InputStream + (read [Byte-Array int int] #io #try int)) + +(jvm-import java.io.OutputStream + (write [Byte-Array int int] #io #try void)) + +(jvm-import java.net.Socket + (new [String int] #io #try) + (getInputStream [] #io #try InputStream) + (getOutputStream [] #io #try OutputStream)) + +(jvm-import java.net.ServerSocket + (new [int] #io #try) + (accept [] #io #try Socket)) + +############################################################ +############################################################ +############################################################ + +(opaque: #export TCP {} + {#socket Socket + #in InputStream + #out OutputStream} + + (def: #export (read data offset length self) + (let [in (get@ #in (@repr self))] + (P;future + (do (R;ResultT io;Monad<IO>) + [bytes-read (InputStream.read [data (nat-to-int offset) (nat-to-int length)] + in)] + (wrap (int-to-nat bytes-read)))))) + + (def: #export (write data offset length self) + (let [out (get@ #out (@repr self))] + (P;future + (do (R;ResultT io;Monad<IO>) + [_ (OutputStream.write [data (nat-to-int offset) (nat-to-int length)] + out)] + (Flushable.flush [] out))))) + + (def: #export (close self) + (let [(^open) (@repr self)] + (P;future + (do (R;ResultT io;Monad<IO>) + [_ (AutoCloseable.close [] in) + _ (AutoCloseable.close [] out)] + (AutoCloseable.close [] socket))))) + + (def: (tcp-client socket) + (-> Socket (io;IO (R;Result TCP))) + (do (R;ResultT io;Monad<IO>) + [input (Socket.getInputStream [] socket) + output (Socket.getOutputStream [] socket)] + (wrap (@opaque {#socket socket + #in input + #out output})))) + + (def: #export (client address port) + (-> ..;Address ..;Port (T;Task TCP)) + (P;future + (do (R;ResultT io;Monad<IO>) + [socket (Socket.new [address (nat-to-int port)])] + (tcp-client socket)))) + + (def: (await-server-release client-channel server) + (-> (frp;Channel TCP) ServerSocket (P;Promise Unit)) + (do P;Monad<Promise> + [outcome client-channel] + (case outcome + ## Channel has been closed. + ## Must close associated server. + #;None + (P;future + (do io;Monad<IO> + [_ (AutoCloseable.close [] server)] + (wrap []))) + + ## A client was generated. + ## Nothing to be done... + (#;Some _) + (wrap [])))) + + (def: #export (server port) + (-> ..;Port (T;Task (frp;Channel TCP))) + (P;future + (do (R;ResultT io;Monad<IO>) + [server (ServerSocket.new [(nat-to-int port)]) + #let [output (frp;channel TCP) + _ (: (P;Promise Bool) + (P;future + (loop [tail output] + (do io;Monad<IO> + [?client (do (R;ResultT io;Monad<IO>) + [socket (ServerSocket.accept [] server)] + (tcp-client socket))] + (case ?client + (#R;Error error) + (frp;close tail) + + (#R;Success client) + (do @ + [?tail' (frp;write client tail)] + (case ?tail' + #;None + (wrap true) + + (#;Some tail') + (exec (await-server-release tail' server) + (recur tail')))))))))]] + (wrap output)))) + ) diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux new file mode 100644 index 000000000..4bf95a03e --- /dev/null +++ b/stdlib/source/lux/world/net/udp.jvm.lux @@ -0,0 +1,99 @@ +(;module: + lux + (lux (control monad + ["ex" exception #+ exception:]) + (concurrency ["P" promise] + ["T" task] + [frp]) + (data ["R" result] + (coll [array])) + (type opaque) + (world [blob #+ Blob]) + [io] + [host #+ jvm-import]) + [..]) + +(jvm-import java.lang.AutoCloseable + (close [] #io #try void)) + +(jvm-import java.io.Flushable + (flush [] #io #try void)) + +(jvm-import java.net.InetAddress + (#static getAllByName [String] #io #try (Array InetAddress)) + (getHostAddress [] String)) + +(jvm-import java.net.DatagramPacket + (new #as new|send [Byte-Array int int InetAddress int]) + (new #as new|receive [Byte-Array int int]) + (getAddress [] InetAddress) + (getPort [] int) + (getLength [] int)) + +(jvm-import java.net.DatagramSocket + (new #as new|client [] #io #try) + (new #as new|server [int] #io #try) + (receive [DatagramPacket] #io #try void) + (send [DatagramPacket] #io #try void)) + +############################################################ +############################################################ +############################################################ + +(exception: #export Cannot-Resolve-Address) +(exception: #export Multiple-Candidate-Addresses) + +(def: (resolve address) + (-> ..;Address (io;IO (R;Result InetAddress))) + (do (R;ResultT io;Monad<IO>) + [addresses (InetAddress.getAllByName [address])] + (: (io;IO (R;Result InetAddress)) + (case (array;size addresses) + +0 (io;io (ex;throw Cannot-Resolve-Address address)) + +1 (wrap (assume (array;get +0 addresses))) + _ (io;io (ex;throw Multiple-Candidate-Addresses address)))))) + +(opaque: #export UDP {} + {#socket DatagramSocket} + + (def: #export (read data offset length self) + (-> Blob Nat Nat UDP (T;Task [Nat ..;Address ..;Port])) + (let [(^open) (@repr self) + packet (DatagramPacket.new|receive [data (nat-to-int offset) (nat-to-int length)])] + (P;future + (do (R;ResultT io;Monad<IO>) + [_ (DatagramSocket.receive [packet] socket) + #let [bytes-read (int-to-nat (DatagramPacket.getLength [] packet))]] + (wrap [bytes-read + (|> packet (DatagramPacket.getAddress []) (InetAddress.getHostAddress [])) + (int-to-nat (DatagramPacket.getPort [] packet))]))))) + + (def: #export (write address port data offset length self) + (-> ..;Address ..;Port Blob Nat Nat UDP (T;Task Unit)) + (P;future + (do (R;ResultT io;Monad<IO>) + [address (resolve address) + #let [(^open) (@repr self)]] + (DatagramSocket.send (DatagramPacket.new|send [data (nat-to-int offset) (nat-to-int length) address (nat-to-int port)]) + socket)))) + + (def: #export (close self) + (-> UDP (T;Task Unit)) + (let [(^open) (@repr self)] + (P;future + (AutoCloseable.close [] socket)))) + + (def: #export (client _) + (-> Unit (T;Task UDP)) + (P;future + (do (R;ResultT io;Monad<IO>) + [socket (DatagramSocket.new|client [])] + (wrap (@opaque (#socket socket)))))) + + (def: #export (server port) + (-> ..;Port (T;Task UDP)) + (P;future + (do (R;ResultT io;Monad<IO>) + [socket (DatagramSocket.new|server [(nat-to-int port)])] + (wrap (@opaque (#socket socket)))))) + ) |