aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/world/net/tcp.jvm.lux
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--stdlib/source/lux/world/net/tcp.jvm.lux175
1 files changed, 89 insertions, 86 deletions
diff --git a/stdlib/source/lux/world/net/tcp.jvm.lux b/stdlib/source/lux/world/net/tcp.jvm.lux
index 50191c407..329d256e0 100644
--- a/stdlib/source/lux/world/net/tcp.jvm.lux
+++ b/stdlib/source/lux/world/net/tcp.jvm.lux
@@ -1,19 +1,22 @@
(.module:
[lux #*
[control
- monad]
+ monad
+ [security
+ ["." taint (#+ Dirty taint)]]]
[concurrency
["." promise (#+ Promise promise)]
[task (#+ Task)]
["." frp]]
[data
- ["e" error]]
- [type
- abstract]
+ ["." error (#+ Error)]]
[world
- [binary (#+ Binary)]]
- ["." io (#+ Process)]
- [host (#+ import:)]]
+ ["." binary (#+ Binary)]]
+ ["." io (#+ IO)]
+ [host (#+ import:)]
+ [platform
+ [compiler
+ ["." host]]]]
["." //])
(import: java/lang/AutoCloseable
@@ -30,8 +33,8 @@
(import: java/net/Socket
(new [String int] #io #try)
- (getInputStream [] #io #try InputStream)
- (getOutputStream [] #io #try OutputStream))
+ (getInputStream [] #try InputStream)
+ (getOutputStream [] #try OutputStream))
(import: java/net/ServerSocket
(new [int] #io #try)
@@ -41,80 +44,80 @@
############################################################
############################################################
-(abstract: #export TCP {}
- {#socket Socket
- #in InputStream
- #out OutputStream}
-
- (def: #export (read data offset length self)
- (-> Binary Nat Nat TCP (Task Nat))
- (promise.future
- (do io.Monad<Process>
- [bytes-read (InputStream::read data (.int offset) (.int length)
- (get@ #in (:representation self)))]
- (wrap (.nat bytes-read)))))
-
- (def: #export (write data offset length self)
- (-> Binary Nat Nat TCP (Task Any))
- (let [out (get@ #out (:representation self))]
- (promise.future
- (do io.Monad<Process>
- [_ (OutputStream::write data (.int offset) (.int length)
- out)]
- (Flushable::flush out)))))
-
- (def: #export (close self)
- (-> TCP (Task Any))
- (let [(^open ".") (:representation self)]
- (promise.future
- (do io.Monad<Process>
- [_ (AutoCloseable::close in)
- _ (AutoCloseable::close out)]
- (AutoCloseable::close socket)))))
-
- (def: (tcp-client socket)
- (-> Socket (Process TCP))
- (do io.Monad<Process>
- [input (Socket::getInputStream socket)
- output (Socket::getOutputStream socket)]
- (wrap (:abstraction {#socket socket
- #in input
- #out output}))))
- )
-
-(def: #export (client address port)
- (-> //.Address //.Port (Task TCP))
- (promise.future
- (do io.Monad<Process>
- [socket (Socket::new address (.int port))]
- (tcp-client socket))))
-
-(def: #export (server port)
- (-> //.Port (Task [(Promise Any)
- (frp.Channel TCP)]))
- (promise.future
- (do (e.ErrorT io.Monad<IO>)
- [server (ServerSocket::new (.int port))
- #let [signal (: (Promise Any)
- (promise #.None))
- _ (promise.await (function (_ _)
- (AutoCloseable::close server))
- signal)
- output (: (frp.Channel TCP)
- (frp.channel []))
- _ (: (Promise Any)
- (promise.future
- (loop [_ []]
- (do io.Monad<IO>
- [?client (do (e.ErrorT io.Monad<IO>)
- [socket (ServerSocket::accept server)]
- (tcp-client socket))]
- (case ?client
- (#e.Error error)
- (wrap [])
-
- (#e.Success client)
- (do @
- [_ (frp.publish output client)]
- (recur [])))))))]]
- (wrap [signal output]))))
+(signature: #export (TCP !)
+ (: (-> Nat (! (Error [Nat (Dirty Binary)])))
+ read)
+
+ (: (-> Binary (! (Error Any)))
+ write)
+
+ (: (-> Any (! (Error Any)))
+ close))
+
+(def: #export (async tcp)
+ (-> (TCP IO) (TCP Promise))
+ (`` (structure (~~ (do-template [<capability>]
+ [(def: <capability> (|>> (:: tcp <capability>) promise.future))]
+
+ [read] [write] [close])))))
+
+(`` (for {(~~ (static host.jvm))
+ (as-is (def: (tcp socket)
+ (-> Socket (Error (TCP IO)))
+ (do error.Monad<Error>
+ [input (Socket::getInputStream socket)
+ output (Socket::getOutputStream socket)]
+ (wrap (: (TCP IO)
+ (structure (def: (read size)
+ (do io.Monad<Process>
+ [#let [data (binary.create size)]
+ bytes-read (InputStream::read data +0 (.int size) input)]
+ (wrap [(.nat bytes-read)
+ (taint data)])))
+
+ (def: (write data)
+ (do io.Monad<Process>
+ [_ (OutputStream::write data +0 (.int (binary.size data))
+ output)]
+ (Flushable::flush output)))
+
+ (def: (close _)
+ (do io.Monad<Process>
+ [_ (AutoCloseable::close input)
+ _ (AutoCloseable::close output)]
+ (AutoCloseable::close socket))))))))
+
+ (def: #export (client address port)
+ (-> //.Address //.Port (IO (Error (TCP IO))))
+ (do io.Monad<Process>
+ [socket (Socket::new address (.int port))]
+ (io.io (tcp socket))))
+
+ (def: #export (server port)
+ (-> //.Port (IO (Error [(Promise Any)
+ (frp.Channel (TCP IO))])))
+ (do io.Monad<Process>
+ [server (ServerSocket::new (.int port))
+ #let [close-signal (: (Promise Any)
+ (promise #.None))
+ _ (promise.await (function (_ _)
+ (AutoCloseable::close server))
+ close-signal)
+ output (: (frp.Channel (TCP IO))
+ (frp.channel []))
+ _ (: (Promise Any)
+ (promise.future
+ (loop [_ []]
+ (do io.Monad<IO>
+ [?client (do io.Monad<Process>
+ [socket (ServerSocket::accept server)]
+ (io.io (tcp socket)))]
+ (case ?client
+ (#error.Error error)
+ (wrap [])
+
+ (#error.Success client)
+ (do @
+ [_ (frp.publish output client)]
+ (recur [])))))))]]
+ (wrap [close-signal output]))))}))