diff options
Diffstat (limited to 'stdlib/source/lux/world/net/tcp.lux')
-rw-r--r-- | stdlib/source/lux/world/net/tcp.lux | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/stdlib/source/lux/world/net/tcp.lux b/stdlib/source/lux/world/net/tcp.lux new file mode 100644 index 000000000..a0fa13c8a --- /dev/null +++ b/stdlib/source/lux/world/net/tcp.lux @@ -0,0 +1,129 @@ +(.module: + [lux #* + [host (#+ import:)] + ["@" target] + [abstract + monad] + [control + ["." io (#+ IO)] + [concurrency + ["." promise (#+ Promise Resolver)] + ["." frp (#+ Channel Sink)]] + [security + ["!" capability]]] + [data + ["." error (#+ Error)]] + [world + ["." binary (#+ Binary)]]] + ["." // (#+ Can-Read Can-Write Can-Close)]) + +(signature: #export (TCP !) + (: (Can-Read ! [Nat Binary]) + read) + + (: (Can-Write ! Binary) + write) + + (: (Can-Close !) + close)) + +(def: #export (async tcp) + (-> (TCP IO) (TCP Promise)) + (`` (structure (~~ (template [<capability> <forge>] + [(def: <capability> (<forge> (|>> (!.use (:: tcp <capability>)) promise.future)))] + + [read //.can-read] + [write //.can-write] + [close //.can-close]))))) + +(with-expansions [<for-jvm> (as-is (import: java/lang/AutoCloseable + (close [] #io #try void)) + + (import: java/io/Flushable + (flush [] #io #try void)) + + (import: java/io/InputStream + (read [[byte] int int] #io #try int)) + + (import: java/io/OutputStream + (write [[byte] int int] #io #try void)) + + (import: java/net/Socket + (new [String int] #io #try) + (getInputStream [] #try InputStream) + (getOutputStream [] #try OutputStream)) + + (import: java/net/ServerSocket + (new [int] #io #try) + (accept [] #io #try Socket)) + + (def: (tcp socket) + (-> Socket (Error (TCP IO))) + (do error.monad + [input (Socket::getInputStream socket) + output (Socket::getOutputStream socket)] + (wrap (: (TCP IO) + (structure (def: read + (//.can-read + (function (read size) + (do (error.with io.monad) + [#let [data (binary.create size)] + bytes-read (InputStream::read data +0 (.int size) input)] + (wrap [(.nat bytes-read) + data]))))) + + (def: write + (//.can-write + (function (write data) + (do (error.with io.monad) + [_ (OutputStream::write data +0 (.int (binary.size data)) + output)] + (Flushable::flush output))))) + + (def: close + (//.can-close + (function (close _) + (do (error.with io.monad) + [_ (AutoCloseable::close input) + _ (AutoCloseable::close output)] + (AutoCloseable::close socket)))))))))) + + (def: #export (client address port) + (-> //.Address //.Port (IO (Error (TCP IO)))) + (do (error.with io.monad) + [socket (Socket::new address (.int port))] + (io.io (tcp socket)))) + + (def: #export (server port) + (-> //.Port (IO (Error [(Resolver Any) + (Channel (TCP IO))]))) + (do (error.with io.monad) + [server (ServerSocket::new (.int port)) + #let [[close-signal close-resolver] (: [(Promise Any) (Resolver Any)] + (promise.promise [])) + _ (promise.await (function (_ _) + (AutoCloseable::close server)) + close-signal) + [output sink] (: [(Channel (TCP IO)) (Sink (TCP IO))] + (frp.channel [])) + _ (: (Promise Any) + (promise.future + (loop [_ []] + (do io.monad + [?client (do (error.with io.monad) + [socket (ServerSocket::accept server)] + (io.io (tcp socket)))] + (case ?client + (#error.Failure error) + (wrap []) + + (#error.Success client) + (do @ + [_ (:: sink feed client)] + (recur [])))))))]] + (wrap [close-resolver output]))))] + (`` (for {(~~ (static @.old)) + (as-is <for-jvm>) + + (~~ (static @.jvm)) + (as-is <for-jvm>)}))) |