aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/world/net/tcp.jvm.lux
blob: fddd6a94ee0954e52db67a4ca5d5c2965fc51aae (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
(.module:
  lux
  (lux (control monad)
       (concurrency [promise #+ Promise promise]
                    [task #+ Task]
                    [frp])
       (data ["e" error])
       (type abstract)
       (world [blob #+ Blob])
       [io #+ Process]
       [host])
  [//])

(host.import java/lang/AutoCloseable
  (close [] #io #try void))

(host.import java/io/Flushable
  (flush [] #io #try void))

(host.import java/io/InputStream
  (read [(Array byte) int int] #io #try int))

(host.import java/io/OutputStream
  (write [(Array byte) int int] #io #try void))

(host.import java/net/Socket
  (new [String int] #io #try)
  (getInputStream [] #io #try InputStream)
  (getOutputStream [] #io #try OutputStream))

(host.import java/net/ServerSocket
  (new [int] #io #try)
  (accept [] #io #try Socket))

############################################################
############################################################
############################################################

(abstract: #export TCP {}
  {#socket Socket
   #in InputStream
   #out OutputStream}
  
  (def: #export (read data offset length self)
    (-> Blob Nat Nat TCP (Task Nat))
    (promise.future
     (do io.Monad<Process>
       [bytes-read (InputStream::read [data (.int offset) (.int length)]
                                      (get@ #in (@representation self)))]
       (wrap (.nat bytes-read)))))
  
  (def: #export (write data offset length self)
    (-> Blob Nat Nat TCP (Task Any))
    (let [out (get@ #out (@representation self))]
      (promise.future
       (do io.Monad<Process>
         [_ (OutputStream::write [data (.int offset) (.int length)]
                                 out)]
         (Flushable::flush [] out)))))

  (def: #export (close self)
    (-> TCP (Task Any))
    (let [(^open) (@representation self)]
      (promise.future
       (do io.Monad<Process>
         [_ (AutoCloseable::close [] in)
          _ (AutoCloseable::close [] out)]
         (AutoCloseable::close [] socket)))))

  (def: (tcp-client socket)
    (-> Socket (Process TCP))
    (do io.Monad<Process>
      [input (Socket::getInputStream [] socket)
       output (Socket::getOutputStream [] socket)]
      (wrap (@abstraction {#socket socket
                           #in input
                           #out output}))))
  )

(def: #export (client address port)
  (-> //.Address //.Port (Task TCP))
  (promise.future
   (do io.Monad<Process>
     [socket (Socket::new [address (.int port)])]
     (tcp-client socket))))

(def: #export (server port)
  (-> //.Port (Task [(Promise Any)
                     (frp.Channel TCP)]))
  (promise.future
   (do (e.ErrorT io.Monad<IO>)
     [server (ServerSocket::new [(.int port)])
      #let [signal (: (Promise Any)
                      (promise #.None))
            _ (promise.await (function (_ _)
                               (AutoCloseable::close [] server))
                             signal)
            output (: (frp.Channel TCP)
                      (frp.channel []))
            _ (: (Promise Any)
                 (promise.future
                  (loop [_ []]
                    (do io.Monad<IO>
                      [?client (do (e.ErrorT io.Monad<IO>)
                                 [socket (ServerSocket::accept [] server)]
                                 (tcp-client socket))]
                      (case ?client
                        (#e.Error error)
                        (wrap [])
                        
                        (#e.Success client)
                        (do @
                          [_ (frp.publish output client)]
                          (recur [])))))))]]
     (wrap [signal output]))))