aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/lux/control/concurrency/atom.lux
blob: 54be96d767fdefd4d7e928d50f94b178cf1b3720 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
(.module:
  [lux #*
   ["." host]
   ["@" target]
   [abstract
    [monad (#+ do)]]
   [control
    ["." function]
    ["." io (#- run)]]
   [data
    [collection
     ["." array]]]
   [type
    abstract]])

(for {@.old
      (host.import: (java/util/concurrent/atomic/AtomicReference a)
        (new [a])
        (get [] a)
        (compareAndSet [a a] boolean))

      @.jvm
      (host.import: (java/util/concurrent/atomic/AtomicReference a)
        (new [a])
        (get [] a)
        (compareAndSet [a a] boolean))}
     
     (as-is))

(abstract: #export (Atom a)
  (for {@.old
        (java/util/concurrent/atomic/AtomicReference a)

        @.jvm
        (java/util/concurrent/atomic/AtomicReference a)

        @.js
        (array.Array a)
        })

  {#.doc "Atomic references that are safe to mutate concurrently."}

  (def: #export (atom value)
    (All [a] (-> a (Atom a)))
    (:abstraction (for {@.old
                        (java/util/concurrent/atomic/AtomicReference::new value)

                        @.jvm
                        (java/util/concurrent/atomic/AtomicReference::new value)

                        @.js
                        ("js array write" 0 value ("js array new" 1))
                        })))

  (def: #export (read atom)
    (All [a] (-> (Atom a) (IO a)))
    (io (for {@.old
              (java/util/concurrent/atomic/AtomicReference::get (:representation atom))

              @.jvm
              (java/util/concurrent/atomic/AtomicReference::get (:representation atom))

              @.js
              ("js array read" 0 (:representation atom))
              })))

  (def: #export (compare-and-swap current new atom)
    {#.doc (doc "Only mutates an atom if you can present its current value."
                "That guarantees that atom was not updated since you last read from it.")}
    (All [a] (-> a a (Atom a) (IO Bit)))
    (io (for {@.old
              (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))

              @.jvm
              (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))

              @.js
              (let [old ("js array read" 0 (:representation atom))]
                (if (is? old current)
                  (exec ("js array write" 0 new (:representation atom))
                    true)
                  false))})))
  )

(def: #export (update f atom)
  {#.doc (doc "Updates an atom by applying a function to its current value."
              "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds."
              "The retries will be done with the new values of the atom, as they show up.")}
  (All [a] (-> (-> a a) (Atom a) (IO a)))
  (loop [_ []]
    (do io.monad
      [old (read atom)
       #let [new (f old)]
       swapped? (compare-and-swap old new atom)]
      (if swapped?
        (wrap new)
        (recur [])))))

(def: #export (write value atom)
  (All [a] (-> a (Atom a) (IO Any)))
  (..update (function.constant value) atom))