aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/world/net/tcp.lux
diff options
context:
space:
mode:
Diffstat (limited to 'stdlib/source/lux/world/net/tcp.lux')
-rw-r--r--stdlib/source/lux/world/net/tcp.lux129
1 files changed, 129 insertions, 0 deletions
diff --git a/stdlib/source/lux/world/net/tcp.lux b/stdlib/source/lux/world/net/tcp.lux
new file mode 100644
index 000000000..a0fa13c8a
--- /dev/null
+++ b/stdlib/source/lux/world/net/tcp.lux
@@ -0,0 +1,129 @@
+(.module:
+ [lux #*
+ [host (#+ import:)]
+ ["@" target]
+ [abstract
+ monad]
+ [control
+ ["." io (#+ IO)]
+ [concurrency
+ ["." promise (#+ Promise Resolver)]
+ ["." frp (#+ Channel Sink)]]
+ [security
+ ["!" capability]]]
+ [data
+ ["." error (#+ Error)]]
+ [world
+ ["." binary (#+ Binary)]]]
+ ["." // (#+ Can-Read Can-Write Can-Close)])
+
+(signature: #export (TCP !)
+ (: (Can-Read ! [Nat Binary])
+ read)
+
+ (: (Can-Write ! Binary)
+ write)
+
+ (: (Can-Close !)
+ close))
+
+(def: #export (async tcp)
+ (-> (TCP IO) (TCP Promise))
+ (`` (structure (~~ (template [<capability> <forge>]
+ [(def: <capability> (<forge> (|>> (!.use (:: tcp <capability>)) promise.future)))]
+
+ [read //.can-read]
+ [write //.can-write]
+ [close //.can-close])))))
+
+(with-expansions [<for-jvm> (as-is (import: java/lang/AutoCloseable
+ (close [] #io #try void))
+
+ (import: java/io/Flushable
+ (flush [] #io #try void))
+
+ (import: java/io/InputStream
+ (read [[byte] int int] #io #try int))
+
+ (import: java/io/OutputStream
+ (write [[byte] int int] #io #try void))
+
+ (import: java/net/Socket
+ (new [String int] #io #try)
+ (getInputStream [] #try InputStream)
+ (getOutputStream [] #try OutputStream))
+
+ (import: java/net/ServerSocket
+ (new [int] #io #try)
+ (accept [] #io #try Socket))
+
+ (def: (tcp socket)
+ (-> Socket (Error (TCP IO)))
+ (do error.monad
+ [input (Socket::getInputStream socket)
+ output (Socket::getOutputStream socket)]
+ (wrap (: (TCP IO)
+ (structure (def: read
+ (//.can-read
+ (function (read size)
+ (do (error.with io.monad)
+ [#let [data (binary.create size)]
+ bytes-read (InputStream::read data +0 (.int size) input)]
+ (wrap [(.nat bytes-read)
+ data])))))
+
+ (def: write
+ (//.can-write
+ (function (write data)
+ (do (error.with io.monad)
+ [_ (OutputStream::write data +0 (.int (binary.size data))
+ output)]
+ (Flushable::flush output)))))
+
+ (def: close
+ (//.can-close
+ (function (close _)
+ (do (error.with io.monad)
+ [_ (AutoCloseable::close input)
+ _ (AutoCloseable::close output)]
+ (AutoCloseable::close socket))))))))))
+
+ (def: #export (client address port)
+ (-> //.Address //.Port (IO (Error (TCP IO))))
+ (do (error.with io.monad)
+ [socket (Socket::new address (.int port))]
+ (io.io (tcp socket))))
+
+ (def: #export (server port)
+ (-> //.Port (IO (Error [(Resolver Any)
+ (Channel (TCP IO))])))
+ (do (error.with io.monad)
+ [server (ServerSocket::new (.int port))
+ #let [[close-signal close-resolver] (: [(Promise Any) (Resolver Any)]
+ (promise.promise []))
+ _ (promise.await (function (_ _)
+ (AutoCloseable::close server))
+ close-signal)
+ [output sink] (: [(Channel (TCP IO)) (Sink (TCP IO))]
+ (frp.channel []))
+ _ (: (Promise Any)
+ (promise.future
+ (loop [_ []]
+ (do io.monad
+ [?client (do (error.with io.monad)
+ [socket (ServerSocket::accept server)]
+ (io.io (tcp socket)))]
+ (case ?client
+ (#error.Failure error)
+ (wrap [])
+
+ (#error.Success client)
+ (do @
+ [_ (:: sink feed client)]
+ (recur [])))))))]]
+ (wrap [close-resolver output]))))]
+ (`` (for {(~~ (static @.old))
+ (as-is <for-jvm>)
+
+ (~~ (static @.jvm))
+ (as-is <for-jvm>)})))