aboutsummaryrefslogtreecommitdiff
path: root/stdlib/source/library/lux/control/concurrency/atom.lux
blob: a607fa8d34080abda088bafc289044ce9e2bdc8c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
(.module:
  [library
   [lux #*
    ["." ffi]
    ["@" target]
    [abstract
     [monad (#+ do)]]
    [control
     ["." function]
     ["." io (#- run) ("#\." functor)]]
    [data
     ["." product]
     [collection
      ["." array]]]
    [type
     abstract]]])

(with_expansions [<jvm> (as_is (ffi.import: (java/util/concurrent/atomic/AtomicReference a)
                                 ["#::."
                                  (new [a])
                                  (get [] a)
                                  (compareAndSet [a a] boolean)]))]
  (for {@.old <jvm>
        @.jvm <jvm>}
       (as_is)))

(with_expansions [<new> (for {@.js "js array new"
                              @.python "python array new"
                              @.lua "lua array new"
                              @.ruby "ruby array new"
                              @.php "php array new"
                              @.scheme "scheme array new"}
                             (as_is))
                  <write> (for {@.js "js array write"
                                @.python "python array write"
                                @.lua "lua array write"
                                @.ruby "ruby array write"
                                @.php "php array write"
                                @.scheme "scheme array write"}
                               (as_is))
                  
                  <read> (for {@.js "js array read"
                               @.python "python array read"
                               @.lua "lua array read"
                               @.ruby "ruby array read"
                               @.php "php array read"
                               @.scheme "scheme array read"}
                              (as_is))]
  (abstract: .public (Atom a)
    {#.doc "Atomic references that are safe to mutate concurrently."}

    (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference a)]
      (for {@.old <jvm>
            @.jvm <jvm>}
           (array.Array a)))

    (def: .public (atom value)
      (All [a] (-> a (Atom a)))
      (:abstraction (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::new value)]
                      (for {@.old <jvm>
                            @.jvm <jvm>}
                           (<write> 0 value (<new> 1))))))

    (def: .public (read! atom)
      (All [a] (-> (Atom a) (IO a)))
      (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::get (:representation atom))]
            (for {@.old <jvm>
                  @.jvm <jvm>}
                 (<read> 0 (:representation atom))))))

    (def: .public (compare_and_swap! current new atom)
      {#.doc (doc "Only mutates an atom if you can present its current value."
                  "That guarantees that atom was not updated since you last read from it.")}
      (All [a] (-> a a (Atom a) (IO Bit)))
      (io (with_expansions [<jvm> (java/util/concurrent/atomic/AtomicReference::compareAndSet current new (:representation atom))]
            (for {@.old <jvm>
                  @.jvm <jvm>}
                 (let [old (<read> 0 (:representation atom))]
                   (if (is? old current)
                     (exec (<write> 0 new (:representation atom))
                       true)
                     false))))))
    ))

(def: .public (update! f atom)
  {#.doc (doc "Updates an atom by applying a function to its current value."
              "If it fails to update it (because some other process wrote to it first), it will retry until it succeeds."
              "The retries will be done with the new values of the atom, as they show up.")}
  (All [a] (-> (-> a a) (Atom a) (IO [a a])))
  (loop [_ []]
    (do io.monad
      [old (read! atom)
       .let [new (f old)]
       swapped? (compare_and_swap! old new atom)]
      (if swapped?
        (in [old new])
        (recur [])))))

(def: .public (write! value atom)
  {#.doc (doc "Writes the given value to an atom."
              "If it fails to write it (because some other process wrote to it first), it will retry until it succeeds.")}
  (All [a] (-> a (Atom a) (IO a)))
  (|> atom
      (..update! (function.constant value))
      (io\map product.left)))