aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/atom.lux
blob: 2a7fec64cd37a326a97b635da7d515f3b1108e04 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
(.require
 [library
  [lux (.except)
   ["[0]" ffi]
   [abstract
    [monad (.only do)]]
   [control
    ["[0]" function]
    ["[0]" io (.only IO) (.use "[1]#[0]" functor)]]
   [data
    ["[0]" product]
    [collection
     ["[0]" array
      ["[1]" \\unsafe]]]]
   [meta
    ["@" target]
    [type
     [primitive (.except)]
     ["[0]" variance (.only Mutable)]]]]])

(with_expansions [<jvm> (these (ffi.import (java/util/concurrent/atomic/AtomicReference a)
                                 "[1]::[0]"
                                 (new [a])
                                 (get [] a)
                                 (compareAndSet [a a] boolean)))]
  (for @.old <jvm>
       @.jvm <jvm>
       (these)))

(primitive .public (Atom'' a)
  (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)]
    (for @.old <jvm>
         @.jvm <jvm>
         (array.Array a)))

  (type .public (Atom' r w)
    (Atom'' (Mutable r w)))

  (type .public (Atom a)
    (Atom'' (Mutable a a)))

  (def .public (atom value)
    (All (_ a) (-> a (Atom a)))
    (abstraction
     (with_expansions [<jvm> (as_expected (java/util/concurrent/atomic/AtomicReference::new value))]
       (for @.old <jvm>
            @.jvm <jvm>
            (array.has! 0 (variance.write value) (array.empty 1))))))

  (def .public (read! atom)
    (All (_ r w) (-> (Atom' r w) (IO r)))
    (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (representation atom))]
      (io.io (for @.old (variance.read <jvm>)
                  @.jvm <jvm>
                  (variance.read (array.item 0 (representation atom)))))))

  (def .public (compare_and_swap! current new atom)
    (All (_ r w) (-> r w (Atom' r w) (IO Bit)))
    (io.io (for @.old (ffi.of_boolean
                       (java/util/concurrent/atomic/AtomicReference::compareAndSet (variance.write
                                                                                    (`` (as (,, (type_of new))
                                                                                            current)))
                                                                                   (variance.write new)
                                                                                   (representation atom)))
                @.jvm (ffi.of_boolean
                       (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (representation atom)))
                (if (|> (representation atom)
                        (array.item 0)
                        variance.read
                        (same? current))
                  (exec
                    (array.has! 0 (variance.write new) (representation atom))
                    true)
                  false))))
  )

(def .public (update! f atom)
  (All (_ r w) (-> (-> r w) (Atom' r w) (IO [r w])))
  (loop (again [_ []])
    (do io.monad
      [old (read! atom)
       .let [new (f old)]
       swapped? (compare_and_swap! old new atom)]
      (if swapped?
        (in [old new])
        (again [])))))

(def .public (write! value atom)
  (All (_ r w) (-> w (Atom' r w) (IO r)))
  (|> atom
      (..update! (function.constant value))
      (io#each product.left)))