blob: fddd6a94ee0954e52db67a4ca5d5c2965fc51aae (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
(.module:
lux
(lux (control monad)
(concurrency [promise #+ Promise promise]
[task #+ Task]
[frp])
(data ["e" error])
(type abstract)
(world [blob #+ Blob])
[io #+ Process]
[host])
[//])
(host.import java/lang/AutoCloseable
(close [] #io #try void))
(host.import java/io/Flushable
(flush [] #io #try void))
(host.import java/io/InputStream
(read [(Array byte) int int] #io #try int))
(host.import java/io/OutputStream
(write [(Array byte) int int] #io #try void))
(host.import java/net/Socket
(new [String int] #io #try)
(getInputStream [] #io #try InputStream)
(getOutputStream [] #io #try OutputStream))
(host.import java/net/ServerSocket
(new [int] #io #try)
(accept [] #io #try Socket))
############################################################
############################################################
############################################################
(abstract: #export TCP {}
{#socket Socket
#in InputStream
#out OutputStream}
(def: #export (read data offset length self)
(-> Blob Nat Nat TCP (Task Nat))
(promise.future
(do io.Monad<Process>
[bytes-read (InputStream::read [data (.int offset) (.int length)]
(get@ #in (@representation self)))]
(wrap (.nat bytes-read)))))
(def: #export (write data offset length self)
(-> Blob Nat Nat TCP (Task Any))
(let [out (get@ #out (@representation self))]
(promise.future
(do io.Monad<Process>
[_ (OutputStream::write [data (.int offset) (.int length)]
out)]
(Flushable::flush [] out)))))
(def: #export (close self)
(-> TCP (Task Any))
(let [(^open) (@representation self)]
(promise.future
(do io.Monad<Process>
[_ (AutoCloseable::close [] in)
_ (AutoCloseable::close [] out)]
(AutoCloseable::close [] socket)))))
(def: (tcp-client socket)
(-> Socket (Process TCP))
(do io.Monad<Process>
[input (Socket::getInputStream [] socket)
output (Socket::getOutputStream [] socket)]
(wrap (@abstraction {#socket socket
#in input
#out output}))))
)
(def: #export (client address port)
(-> //.Address //.Port (Task TCP))
(promise.future
(do io.Monad<Process>
[socket (Socket::new [address (.int port)])]
(tcp-client socket))))
(def: #export (server port)
(-> //.Port (Task [(Promise Any)
(frp.Channel TCP)]))
(promise.future
(do (e.ErrorT io.Monad<IO>)
[server (ServerSocket::new [(.int port)])
#let [signal (: (Promise Any)
(promise #.None))
_ (promise.await (function (_ _)
(AutoCloseable::close [] server))
signal)
output (: (frp.Channel TCP)
(frp.channel []))
_ (: (Promise Any)
(promise.future
(loop [_ []]
(do io.Monad<IO>
[?client (do (e.ErrorT io.Monad<IO>)
[socket (ServerSocket::accept [] server)]
(tcp-client socket))]
(case ?client
(#e.Error error)
(wrap [])
(#e.Success client)
(do @
[_ (frp.publish output client)]
(recur [])))))))]]
(wrap [signal output]))))
|