aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/world/net/tcp.lux
blob: d9011a5cbb2b186273fdf61e8d62578d70a77456 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
(.module:
  [lux #*
   [host (#+ import:)]
   ["@" target]
   [abstract
    monad]
   [control
    ["." io (#+ IO)]
    [concurrency
     ["." promise (#+ Promise Resolver)]
     ["." frp (#+ Channel Sink)]]
    [security
     ["!" capability]]]
   [data
    ["." error (#+ Error)]]
   [world
    ["." binary (#+ Binary)]]]
  ["." // (#+ Can-Read Can-Write Can-Close)])

(signature: #export (TCP !)
  (: (Can-Read ! [Nat Binary])
     read)

  (: (Can-Write ! Binary)
     write)

  (: (Can-Close !)
     close))

(def: #export (async tcp)
  (-> (TCP IO) (TCP Promise))
  (`` (structure (~~ (template [<capability> <forge>]
                       [(def: <capability> (<forge> (|>> (!.use (:: tcp <capability>)) promise.future)))]

                       [read //.can-read]
                       [write //.can-write]
                       [close //.can-close])))))

(with-expansions [<for-jvm> (as-is (import: #long java/lang/String)

                                   (import: #long java/lang/AutoCloseable
                                     (close [] #io #try void))

                                   (import: #long java/io/Flushable
                                     (flush [] #io #try void))

                                   (import: #long java/io/InputStream
                                     (read [[byte] int int] #io #try int))

                                   (import: #long java/io/OutputStream
                                     (write [[byte] int int] #io #try void))

                                   (import: #long java/net/Socket
                                     (new [java/lang/String int] #io #try)
                                     (getInputStream [] #try java/io/InputStream)
                                     (getOutputStream [] #try java/io/OutputStream))

                                   (import: #long java/net/ServerSocket
                                     (new [int] #io #try)
                                     (accept [] #io #try java/net/Socket))

                                   (def: (tcp socket)
                                     (-> java/net/Socket (Error (TCP IO)))
                                     (do error.monad
                                       [input (java/net/Socket::getInputStream socket)
                                        output (java/net/Socket::getOutputStream socket)]
                                       (wrap (: (TCP IO)
                                                (structure (def: read
                                                             (//.can-read
                                                              (function (read size)
                                                                (do (error.with io.monad)
                                                                  [#let [data (binary.create size)]
                                                                   bytes-read (java/io/InputStream::read data +0 (.int size) input)]
                                                                  (wrap [(.nat bytes-read)
                                                                         data])))))
                                                           
                                                           (def: write
                                                             (//.can-write
                                                              (function (write data)
                                                                (do (error.with io.monad)
                                                                  [_ (java/io/OutputStream::write data +0 (.int (binary.size data))
                                                                                                  output)]
                                                                  (java/io/Flushable::flush output)))))

                                                           (def: close
                                                             (//.can-close
                                                              (function (close _)
                                                                (do (error.with io.monad)
                                                                  [_ (java/lang/AutoCloseable::close input)
                                                                   _ (java/lang/AutoCloseable::close output)]
                                                                  (java/lang/AutoCloseable::close socket))))))))))

                                   (def: #export (client address port)
                                     (-> //.Address //.Port (IO (Error (TCP IO))))
                                     (do (error.with io.monad)
                                       [socket (java/net/Socket::new address (.int port))]
                                       (io.io (tcp socket))))

                                   (def: #export (server port)
                                     (-> //.Port (IO (Error [(Resolver Any)
                                                             (Channel (TCP IO))])))
                                     (do (error.with io.monad)
                                       [server (java/net/ServerSocket::new (.int port))
                                        #let [[close-signal close-resolver] (: [(Promise Any) (Resolver Any)]
                                                                               (promise.promise []))
                                              _ (promise.await (function (_ _)
                                                                 (java/lang/AutoCloseable::close server))
                                                               close-signal)
                                              [output sink] (: [(Channel (TCP IO)) (Sink (TCP IO))]
                                                               (frp.channel []))
                                              _ (: (Promise Any)
                                                   (promise.future
                                                    (loop [_ []]
                                                      (do io.monad
                                                        [?client (do (error.with io.monad)
                                                                   [socket (java/net/ServerSocket::accept server)]
                                                                   (io.io (tcp socket)))]
                                                        (case ?client
                                                          (#error.Failure error)
                                                          (wrap [])
                                                          
                                                          (#error.Success client)
                                                          (do @
                                                            [_ (:: sink feed client)]
                                                            (recur [])))))))]]
                                       (wrap [close-resolver output]))))]
  (`` (for {(~~ (static @.old))
            (as-is <for-jvm>)

            (~~ (static @.jvm))
            (as-is <for-jvm>)})))