aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source
diff options
context:
space:
mode:
authorEduardo Julian2017-09-04 19:42:38 -0400
committerEduardo Julian2017-09-04 19:42:38 -0400
commit970e950aeb08179972d9e3460af3003771fc8511 (patch)
treebaecc5a1ece9ce0e6ac48478bb5cb87f2ab5b777 /stdlib/source
parentf08a9fb208a32ee8f450649095c4f8a0f05931da (diff)
- Implemented network I/O for TCP and UDP (in the JVM).
Diffstat (limited to 'stdlib/source')
-rw-r--r--stdlib/source/lux/world/net.lux5
-rw-r--r--stdlib/source/lux/world/net/tcp.jvm.lux130
-rw-r--r--stdlib/source/lux/world/net/udp.jvm.lux99
3 files changed, 234 insertions, 0 deletions
diff --git a/stdlib/source/lux/world/net.lux b/stdlib/source/lux/world/net.lux
new file mode 100644
index 000000000..c9e2829e5
--- /dev/null
+++ b/stdlib/source/lux/world/net.lux
@@ -0,0 +1,5 @@
+(;module:
+ lux)
+
+(type: #export Address Text)
+(type: #export Port Nat)
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux
new file mode 100644
index 000000000..bb7a36c39
--- /dev/null
+++ b/stdlib/source/lux/world/net/tcp.jvm.lux
@@ -0,0 +1,130 @@
+(;module:
+ lux
+ (lux (control monad)
+ (concurrency ["P" promise]
+ ["T" task]
+ [frp])
+ (data ["R" result])
+ (type opaque)
+ (world [blob #+ Blob])
+ [io]
+ [host #+ jvm-import])
+ [..])
+
+(jvm-import java.lang.AutoCloseable
+ (close [] #io #try void))
+
+(jvm-import java.io.Flushable
+ (flush [] #io #try void))
+
+(jvm-import java.io.InputStream
+ (read [Byte-Array int int] #io #try int))
+
+(jvm-import java.io.OutputStream
+ (write [Byte-Array int int] #io #try void))
+
+(jvm-import java.net.Socket
+ (new [String int] #io #try)
+ (getInputStream [] #io #try InputStream)
+ (getOutputStream [] #io #try OutputStream))
+
+(jvm-import java.net.ServerSocket
+ (new [int] #io #try)
+ (accept [] #io #try Socket))
+
+############################################################
+############################################################
+############################################################
+
+(opaque: #export TCP {}
+ {#socket Socket
+ #in InputStream
+ #out OutputStream}
+
+ (def: #export (read data offset length self)
+ (let [in (get@ #in (@repr self))]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [bytes-read (InputStream.read [data (nat-to-int offset) (nat-to-int length)]
+ in)]
+ (wrap (int-to-nat bytes-read))))))
+
+ (def: #export (write data offset length self)
+ (let [out (get@ #out (@repr self))]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [_ (OutputStream.write [data (nat-to-int offset) (nat-to-int length)]
+ out)]
+ (Flushable.flush [] out)))))
+
+ (def: #export (close self)
+ (let [(^open) (@repr self)]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [_ (AutoCloseable.close [] in)
+ _ (AutoCloseable.close [] out)]
+ (AutoCloseable.close [] socket)))))
+
+ (def: (tcp-client socket)
+ (-> Socket (io;IO (R;Result TCP)))
+ (do (R;ResultT io;Monad<IO>)
+ [input (Socket.getInputStream [] socket)
+ output (Socket.getOutputStream [] socket)]
+ (wrap (@opaque {#socket socket
+ #in input
+ #out output}))))
+
+ (def: #export (client address port)
+ (-> ..;Address ..;Port (T;Task TCP))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [socket (Socket.new [address (nat-to-int port)])]
+ (tcp-client socket))))
+
+ (def: (await-server-release client-channel server)
+ (-> (frp;Channel TCP) ServerSocket (P;Promise Unit))
+ (do P;Monad<Promise>
+ [outcome client-channel]
+ (case outcome
+ ## Channel has been closed.
+ ## Must close associated server.
+ #;None
+ (P;future
+ (do io;Monad<IO>
+ [_ (AutoCloseable.close [] server)]
+ (wrap [])))
+
+ ## A client was generated.
+ ## Nothing to be done...
+ (#;Some _)
+ (wrap []))))
+
+ (def: #export (server port)
+ (-> ..;Port (T;Task (frp;Channel TCP)))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [server (ServerSocket.new [(nat-to-int port)])
+ #let [output (frp;channel TCP)
+ _ (: (P;Promise Bool)
+ (P;future
+ (loop [tail output]
+ (do io;Monad<IO>
+ [?client (do (R;ResultT io;Monad<IO>)
+ [socket (ServerSocket.accept [] server)]
+ (tcp-client socket))]
+ (case ?client
+ (#R;Error error)
+ (frp;close tail)
+
+ (#R;Success client)
+ (do @
+ [?tail' (frp;write client tail)]
+ (case ?tail'
+ #;None
+ (wrap true)
+
+ (#;Some tail')
+ (exec (await-server-release tail' server)
+ (recur tail')))))))))]]
+ (wrap output))))
+ )
diff --git a/stdlib/source/lux/world/net/udp.jvm.lux b/stdlib/source/lux/world/net/udp.jvm.lux
new file mode 100644
index 000000000..4bf95a03e
--- /dev/null
+++ b/stdlib/source/lux/world/net/udp.jvm.lux
@@ -0,0 +1,99 @@
+(;module:
+ lux
+ (lux (control monad
+ ["ex" exception #+ exception:])
+ (concurrency ["P" promise]
+ ["T" task]
+ [frp])
+ (data ["R" result]
+ (coll [array]))
+ (type opaque)
+ (world [blob #+ Blob])
+ [io]
+ [host #+ jvm-import])
+ [..])
+
+(jvm-import java.lang.AutoCloseable
+ (close [] #io #try void))
+
+(jvm-import java.io.Flushable
+ (flush [] #io #try void))
+
+(jvm-import java.net.InetAddress
+ (#static getAllByName [String] #io #try (Array InetAddress))
+ (getHostAddress [] String))
+
+(jvm-import java.net.DatagramPacket
+ (new #as new|send [Byte-Array int int InetAddress int])
+ (new #as new|receive [Byte-Array int int])
+ (getAddress [] InetAddress)
+ (getPort [] int)
+ (getLength [] int))
+
+(jvm-import java.net.DatagramSocket
+ (new #as new|client [] #io #try)
+ (new #as new|server [int] #io #try)
+ (receive [DatagramPacket] #io #try void)
+ (send [DatagramPacket] #io #try void))
+
+############################################################
+############################################################
+############################################################
+
+(exception: #export Cannot-Resolve-Address)
+(exception: #export Multiple-Candidate-Addresses)
+
+(def: (resolve address)
+ (-> ..;Address (io;IO (R;Result InetAddress)))
+ (do (R;ResultT io;Monad<IO>)
+ [addresses (InetAddress.getAllByName [address])]
+ (: (io;IO (R;Result InetAddress))
+ (case (array;size addresses)
+ +0 (io;io (ex;throw Cannot-Resolve-Address address))
+ +1 (wrap (assume (array;get +0 addresses)))
+ _ (io;io (ex;throw Multiple-Candidate-Addresses address))))))
+
+(opaque: #export UDP {}
+ {#socket DatagramSocket}
+
+ (def: #export (read data offset length self)
+ (-> Blob Nat Nat UDP (T;Task [Nat ..;Address ..;Port]))
+ (let [(^open) (@repr self)
+ packet (DatagramPacket.new|receive [data (nat-to-int offset) (nat-to-int length)])]
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [_ (DatagramSocket.receive [packet] socket)
+ #let [bytes-read (int-to-nat (DatagramPacket.getLength [] packet))]]
+ (wrap [bytes-read
+ (|> packet (DatagramPacket.getAddress []) (InetAddress.getHostAddress []))
+ (int-to-nat (DatagramPacket.getPort [] packet))])))))
+
+ (def: #export (write address port data offset length self)
+ (-> ..;Address ..;Port Blob Nat Nat UDP (T;Task Unit))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [address (resolve address)
+ #let [(^open) (@repr self)]]
+ (DatagramSocket.send (DatagramPacket.new|send [data (nat-to-int offset) (nat-to-int length) address (nat-to-int port)])
+ socket))))
+
+ (def: #export (close self)
+ (-> UDP (T;Task Unit))
+ (let [(^open) (@repr self)]
+ (P;future
+ (AutoCloseable.close [] socket))))
+
+ (def: #export (client _)
+ (-> Unit (T;Task UDP))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [socket (DatagramSocket.new|client [])]
+ (wrap (@opaque (#socket socket))))))
+
+ (def: #export (server port)
+ (-> ..;Port (T;Task UDP))
+ (P;future
+ (do (R;ResultT io;Monad<IO>)
+ [socket (DatagramSocket.new|server [(nat-to-int port)])]
+ (wrap (@opaque (#socket socket))))))
+ )