diff options
Diffstat (limited to '')
-rw-r--r-- | stdlib/source/lux/world/net/tcp.jvm.lux | 175 |
1 files changed, 89 insertions, 86 deletions
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux index 50191c407..329d256e0 100644 --- a/stdlib/source/lux/world/net/tcp.jvm.lux +++ b/stdlib/source/lux/world/net/tcp.jvm.lux @@ -1,19 +1,22 @@ (.module: [lux #* [control - monad] + monad + [security + ["." taint (#+ Dirty taint)]]] [concurrency ["." promise (#+ Promise promise)] [task (#+ Task)] ["." frp]] [data - ["e" error]] - [type - abstract] + ["." error (#+ Error)]] [world - [binary (#+ Binary)]] - ["." io (#+ Process)] - [host (#+ import:)]] + ["." binary (#+ Binary)]] + ["." io (#+ IO)] + [host (#+ import:)] + [platform + [compiler + ["." host]]]] ["." //]) (import: java/lang/AutoCloseable @@ -30,8 +33,8 @@ (import: java/net/Socket (new [String int] #io #try) - (getInputStream [] #io #try InputStream) - (getOutputStream [] #io #try OutputStream)) + (getInputStream [] #try InputStream) + (getOutputStream [] #try OutputStream)) (import: java/net/ServerSocket (new [int] #io #try) @@ -41,80 +44,80 @@ ############################################################ ############################################################ -(abstract: #export TCP {} - {#socket Socket - #in InputStream - #out OutputStream} - - (def: #export (read data offset length self) - (-> Binary Nat Nat TCP (Task Nat)) - (promise.future - (do io.Monad<Process> - [bytes-read (InputStream::read data (.int offset) (.int length) - (get@ #in (:representation self)))] - (wrap (.nat bytes-read))))) - - (def: #export (write data offset length self) - (-> Binary Nat Nat TCP (Task Any)) - (let [out (get@ #out (:representation self))] - (promise.future - (do io.Monad<Process> - [_ (OutputStream::write data (.int offset) (.int length) - out)] - (Flushable::flush out))))) - - (def: #export (close self) - (-> TCP (Task Any)) - (let [(^open ".") (:representation self)] - (promise.future - (do io.Monad<Process> - [_ (AutoCloseable::close in) - _ (AutoCloseable::close out)] - (AutoCloseable::close socket))))) - - (def: (tcp-client socket) - (-> Socket (Process TCP)) - (do io.Monad<Process> - [input (Socket::getInputStream socket) - output (Socket::getOutputStream socket)] - (wrap (:abstraction {#socket socket - #in input - #out output})))) - ) - -(def: #export (client address port) - (-> //.Address //.Port (Task TCP)) - (promise.future - (do io.Monad<Process> - [socket (Socket::new address (.int port))] - (tcp-client socket)))) - -(def: #export (server port) - (-> //.Port (Task [(Promise Any) - (frp.Channel TCP)])) - (promise.future - (do (e.ErrorT io.Monad<IO>) - [server (ServerSocket::new (.int port)) - #let [signal (: (Promise Any) - (promise #.None)) - _ (promise.await (function (_ _) - (AutoCloseable::close server)) - signal) - output (: (frp.Channel TCP) - (frp.channel [])) - _ (: (Promise Any) - (promise.future - (loop [_ []] - (do io.Monad<IO> - [?client (do (e.ErrorT io.Monad<IO>) - [socket (ServerSocket::accept server)] - (tcp-client socket))] - (case ?client - (#e.Error error) - (wrap []) - - (#e.Success client) - (do @ - [_ (frp.publish output client)] - (recur [])))))))]] - (wrap [signal output])))) +(signature: #export (TCP !) + (: (-> Nat (! (Error [Nat (Dirty Binary)]))) + read) + + (: (-> Binary (! (Error Any))) + write) + + (: (-> Any (! (Error Any))) + close)) + +(def: #export (async tcp) + (-> (TCP IO) (TCP Promise)) + (`` (structure (~~ (do-template [<capability>] + [(def: <capability> (|>> (:: tcp <capability>) promise.future))] + + [read] [write] [close]))))) + +(`` (for {(~~ (static host.jvm)) + (as-is (def: (tcp socket) + (-> Socket (Error (TCP IO))) + (do error.Monad<Error> + [input (Socket::getInputStream socket) + output (Socket::getOutputStream socket)] + (wrap (: (TCP IO) + (structure (def: (read size) + (do io.Monad<Process> + [#let [data (binary.create size)] + bytes-read (InputStream::read data +0 (.int size) input)] + (wrap [(.nat bytes-read) + (taint data)]))) + + (def: (write data) + (do io.Monad<Process> + [_ (OutputStream::write data +0 (.int (binary.size data)) + output)] + (Flushable::flush output))) + + (def: (close _) + (do io.Monad<Process> + [_ (AutoCloseable::close input) + _ (AutoCloseable::close output)] + (AutoCloseable::close socket)))))))) + + (def: #export (client address port) + (-> //.Address //.Port (IO (Error (TCP IO)))) + (do io.Monad<Process> + [socket (Socket::new address (.int port))] + (io.io (tcp socket)))) + + (def: #export (server port) + (-> //.Port (IO (Error [(Promise Any) + (frp.Channel (TCP IO))]))) + (do io.Monad<Process> + [server (ServerSocket::new (.int port)) + #let [close-signal (: (Promise Any) + (promise #.None)) + _ (promise.await (function (_ _) + (AutoCloseable::close server)) + close-signal) + output (: (frp.Channel (TCP IO)) + (frp.channel [])) + _ (: (Promise Any) + (promise.future + (loop [_ []] + (do io.Monad<IO> + [?client (do io.Monad<Process> + [socket (ServerSocket::accept server)] + (io.io (tcp socket)))] + (case ?client + (#error.Error error) + (wrap []) + + (#error.Success client) + (do @ + [_ (frp.publish output client)] + (recur [])))))))]] + (wrap [close-signal output]))))})) |