aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/csp.lux
blob: 442fd5f364960ef9dab900d6a9b4cb28626be44f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
(.require
 [library
  [lux (.except try)
   [abstract
    [functor (.only Functor)]
    [monad (.only Monad do)]]
   [control
    ["[0]" try (.only Try) (.use "[1]#[0]" monad)]
    ["[0]" exception (.only Exception)]]]]
 [//
  ["[0]" async (.only Async) (.use "[1]#[0]" monad)]
  ["[0]" frp]])

(type .public (Process a)
  (Async (Try a)))

(type .public Channel' frp.Channel')
(type .public Channel frp.Channel)
(type .public Sink frp.Sink)

(def .public channel
  (All (_ a) (-> Any [(Channel a) (Sink a)]))
  frp.channel)

(def .public functor
  (Functor Process)
  (implementation
   (def (each $)
     (async#each (try#each $)))))

(def .public monad
  (Monad Process)
  (implementation
   (def functor ..functor)
   (def in (|>> try#in async#in))
   (def (conjoint atatx)
     (do async.monad
       [tatx atatx]
       (when tatx
         {try.#Success atx}
         atx
         
         {try.#Failure error}
         (in {try.#Failure error}))))))

(exception.def .public channel_has_been_closed)

(def .public (read it)
  (All (_ r w)
    (-> (Channel' r w) (Process [r (Channel' r w)])))
  (let [[output resolver] (async.async [])]
    (exec
      (async.future
       (async.upon! (function (_ head,tail)
                      (resolver (when head,tail
                                  {.#Some [head tail]}
                                  {try.#Success [head tail]}
                                  
                                  {.#None}
                                  (exception.except ..channel_has_been_closed []))))
                    it))
      output)))

(def .public (write value sink)
  (All (_ w)
    (-> w (Sink w) (Process Any)))
  (async.future (of sink feed value)))

(def .public (close sink)
  (All (_ w)
    (-> (Sink w) (Process Any)))
  (async.future (of sink close)))

(def .public try
  (All (_ a) (-> (Process a) (Process (Try a))))
  (async#each (|>> {try.#Success})))