aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/world/net/tcp.jvm.lux
blob: 4b111fcf71c1eb32cb0ab06539b73e4724a93823 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
(;module:
  lux
  (lux (control monad)
       (concurrency ["P" promise]
                    ["T" task]
                    [frp])
       (data ["e" error])
       (type opaque)
       (world [blob #+ Blob])
       [io]
       [host])
  [..])

(host;import java.lang.AutoCloseable
  (close [] #io #try void))

(host;import java.io.Flushable
  (flush [] #io #try void))

(host;import java.io.InputStream
  (read [(Array byte) int int] #io #try int))

(host;import java.io.OutputStream
  (write [(Array byte) int int] #io #try void))

(host;import java.net.Socket
  (new [String int] #io #try)
  (getInputStream [] #io #try InputStream)
  (getOutputStream [] #io #try OutputStream))

(host;import java.net.ServerSocket
  (new [int] #io #try)
  (accept [] #io #try Socket))

############################################################
############################################################
############################################################

(opaque: #export TCP {}
  {#socket Socket
   #in InputStream
   #out OutputStream}
  
  (def: #export (read data offset length self)
    (let [in (get@ #in (@repr self))]
      (P;future
       (do (e;ErrorT io;Monad<IO>)
         [bytes-read (InputStream.read [data (nat-to-int offset) (nat-to-int length)]
                                       in)]
         (wrap (int-to-nat bytes-read))))))
  
  (def: #export (write data offset length self)
    (let [out (get@ #out (@repr self))]
      (P;future
       (do (e;ErrorT io;Monad<IO>)
         [_ (OutputStream.write [data (nat-to-int offset) (nat-to-int length)]
                                out)]
         (Flushable.flush [] out)))))

  (def: #export (close self)
    (let [(^open) (@repr self)]
      (P;future
       (do (e;ErrorT io;Monad<IO>)
         [_ (AutoCloseable.close [] in)
          _ (AutoCloseable.close [] out)]
         (AutoCloseable.close [] socket)))))

  (def: (tcp-client socket)
    (-> Socket (io;IO (e;Error TCP)))
    (do (e;ErrorT io;Monad<IO>)
      [input (Socket.getInputStream [] socket)
       output (Socket.getOutputStream [] socket)]
      (wrap (@opaque {#socket socket
                      #in input
                      #out output}))))

  (def: #export (client address port)
    (-> ..;Address ..;Port (T;Task TCP))
    (P;future
     (do (e;ErrorT io;Monad<IO>)
       [socket (Socket.new [address (nat-to-int port)])]
       (tcp-client socket))))

  (def: (await-server-release client-channel server)
    (-> (frp;Channel TCP) ServerSocket (P;Promise Unit))
    (do P;Monad<Promise>
      [outcome client-channel]
      (case outcome
        ## Channel has been closed.
        ## Must close associated server.
        #;None
        (P;future
         (do io;Monad<IO>
           [_ (AutoCloseable.close [] server)]
           (wrap [])))

        ## A client was generated.
        ## Nothing to be done...
        (#;Some _)
        (wrap []))))

  (def: #export (server port)
    (-> ..;Port (T;Task (frp;Channel TCP)))
    (P;future
     (do (e;ErrorT io;Monad<IO>)
       [server (ServerSocket.new [(nat-to-int port)])
        #let [output (frp;channel TCP)
              _ (: (P;Promise Bool)
                   (P;future
                    (loop [tail output]
                      (do io;Monad<IO>
                        [?client (do (e;ErrorT io;Monad<IO>)
                                   [socket (ServerSocket.accept [] server)]
                                   (tcp-client socket))]
                        (case ?client
                          (#e;Error error)
                          (frp;close tail)
                          
                          (#e;Success client)
                          (do @
                            [?tail' (frp;write client tail)]
                            (case ?tail'
                              #;None
                              (wrap true)
                              
                              (#;Some tail')
                              (exec (await-server-release tail' server)
                                (recur tail')))))))))]]
       (wrap output))))
  )