blob: 442fd5f364960ef9dab900d6a9b4cb28626be44f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
(.require
[library
[lux (.except try)
[abstract
[functor (.only Functor)]
[monad (.only Monad do)]]
[control
["[0]" try (.only Try) (.use "[1]#[0]" monad)]
["[0]" exception (.only Exception)]]]]
[//
["[0]" async (.only Async) (.use "[1]#[0]" monad)]
["[0]" frp]])
(type .public (Process a)
(Async (Try a)))
(type .public Channel' frp.Channel')
(type .public Channel frp.Channel)
(type .public Sink frp.Sink)
(def .public channel
(All (_ a) (-> Any [(Channel a) (Sink a)]))
frp.channel)
(def .public functor
(Functor Process)
(implementation
(def (each $)
(async#each (try#each $)))))
(def .public monad
(Monad Process)
(implementation
(def functor ..functor)
(def in (|>> try#in async#in))
(def (conjoint atatx)
(do async.monad
[tatx atatx]
(when tatx
{try.#Success atx}
atx
{try.#Failure error}
(in {try.#Failure error}))))))
(exception.def .public channel_has_been_closed)
(def .public (read it)
(All (_ r w)
(-> (Channel' r w) (Process [r (Channel' r w)])))
(let [[output resolver] (async.async [])]
(exec
(async.future
(async.upon! (function (_ head,tail)
(resolver (when head,tail
{.#Some [head tail]}
{try.#Success [head tail]}
{.#None}
(exception.except ..channel_has_been_closed []))))
it))
output)))
(def .public (write value sink)
(All (_ w)
(-> w (Sink w) (Process Any)))
(async.future (of sink feed value)))
(def .public (close sink)
(All (_ w)
(-> (Sink w) (Process Any)))
(async.future (of sink close)))
(def .public try
(All (_ a) (-> (Process a) (Process (Try a))))
(async#each (|>> {try.#Success})))
|