summaryrefslogtreecommitdiff
path: root/test/719-varhandle-concurrency/src/Main.java
blob: 577cdbf07e80710993dfd19f559e9f47795885f7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.lang.invoke.VarHandle;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
 * Runs tests to validate the concurrency guarantees of VarHandle.
 *
 * The tests involve having a lot of tasks and significantly fewer threads. The tasks are stored on
 * a queue and each thread tries to grab a task from the queue using operations like
 * VarHandle.compareAndSet(). If the operation works as specified, then each task would only be
 * handled in a single thread, exactly once.
 *
 * The tasks just add atomically a specified integer to a total. If the total is different from the
 * expected one, then either some tasks were run multiple times (on multiple threads), or some task
 * were not run at all (skipped by all threads).
 */
public class Main {
  private static final VarHandle QA;
  static {
      QA = MethodHandles.arrayElementVarHandle(TestTask[].class);
  }

  private static final int TASK_COUNT = 10000;
  private static final int THREAD_COUNT = 20;
  /* Each test may need several retries before a concurrent failure is seen. In the past, for a
   * known bug, between 5 and 10 retries were sufficient. Use RETRIES to configure how many
   * iterations to retry for each test scenario. However, to avoid the test running for too long,
   * for example with gcstress, set a cap duration in MAX_RETRIES_DURATION. With this at least one
   * iteration would run, but there could be fewer retries if each of them takes too long. */
  private static final int RETRIES = 50;
  // b/235431387: timeout reduced from 1 minute
  private static final Duration MAX_RETRIES_DURATION = Duration.ofSeconds(15);

  public static void main(String[] args) throws Throwable {
    testConcurrentProcessing(new CompareAndExchangeRunnerFactory(), "compareAndExchange");
    testConcurrentProcessing(new CompareAndSetRunnerFactory(), "compareAndSet");
    testConcurrentProcessing(new WeakCompareAndSetRunnerFactory(), "weakCompareAndSet");
  }

  private static void testConcurrentProcessing(RunnerFactory factory,
          String testName) throws Throwable {
    final Duration startTs = Duration.ofNanos(System.nanoTime());
    final Duration endTs = startTs.plus(MAX_RETRIES_DURATION);
    for (int i = 0; i < RETRIES; ++i) {
      concurrentProcessingTestIteration(factory, i, testName);
      Duration now = Duration.ofNanos(System.nanoTime());
      if (0 < now.compareTo(endTs)) {
          break;
      }
    }
  }

  private static void concurrentProcessingTestIteration(RunnerFactory factory,
          int iteration, String testName) throws Throwable {
      final TestTask[] tasks = new TestTask[TASK_COUNT];
      final AtomicInteger result = new AtomicInteger();

      for (int i = 0; i < TASK_COUNT; ++i) {
          tasks[i] = new TestTask(Integer.valueOf(i+1), result::addAndGet);
      }

      Thread[] threads = new Thread[THREAD_COUNT];
      for (int i = 0; i < THREAD_COUNT; ++i) {
          threads[i] = factory.createRunner(tasks);
      }

      for (int i = 0; i < THREAD_COUNT; ++i) {
          threads[i].start();
      }

      for (int i = 0; i < THREAD_COUNT; ++i) {
          threads[i].join();
      }

      check(result.get(), TASK_COUNT * (TASK_COUNT + 1) / 2,
              testName + " test result not as expected", iteration);
  }

  /**
   * Processes the task queue until there are no tasks left.
   *
   * The actual task-grabbing mechanism is implemented in subclasses through grabTask(). This allows
   * testing various mechanisms, like compareAndSet() and compareAndExchange().
   */
  private static abstract class TaskRunner extends Thread {

      protected final TestTask[] tasks;

      TaskRunner(TestTask[] tasks) {
          this.tasks = tasks;
      }

      @Override
      public void run() {
          int i = 0;
          while (i < TASK_COUNT) {
              TestTask t = (TestTask) QA.get(tasks, i);
              if (t == null) {
                  ++i;
                  continue;
              }
              if (!grabTask(t, i)) {
                  continue;
              }
              ++i;
              VarHandle.releaseFence();
              t.exec();
          }
      }

      /**
       * Grabs the next task from the queue in an atomic way.
       * 
       * Once a task is retrieved successfully, the queue should no longer hold a reference to it.
       * This would be done, for example, by swapping the task with a null value.
       *
       * @param t The task to get from the queue
       * @param i The index where the task is found
       *
       * @return {@code true} if the task has been retrieved and is not available to any other
       * threads. Otherwise {@code false}. If {@code false} is returned, then either the task was no
       * longer present on the queue due to another thread grabbing it, or, in case of spurious
       * failure, the task is still available and no other thread managed to grab it.
       */
      protected abstract boolean grabTask(TestTask t, int i);
  }

  private static class TaskRunnerWithCompareAndExchange extends TaskRunner {

      TaskRunnerWithCompareAndExchange(TestTask[] tasks) {
          super(tasks);
      }

      @Override
      protected boolean grabTask(TestTask t, int i) {
          return (t == QA.compareAndExchange(tasks, i, t, null));
      }
  }

  private static class TaskRunnerWithCompareAndSet extends TaskRunner {

      TaskRunnerWithCompareAndSet(TestTask[] tasks) {
          super(tasks);
      }

      @Override
      protected boolean grabTask(TestTask t, int i) {
          return QA.compareAndSet(tasks, i, t, null);
      }
  }

  private static class TaskRunnerWithWeakCompareAndSet extends TaskRunner {

      TaskRunnerWithWeakCompareAndSet(TestTask[] tasks) {
          super(tasks);
      }

      @Override
      protected boolean grabTask(TestTask t, int i) {
          return QA.weakCompareAndSet(tasks, i, t, null);
      }
  }


  private interface RunnerFactory {
      Thread createRunner(TestTask[] tasks);
  }

  private static class CompareAndExchangeRunnerFactory implements RunnerFactory {
      @Override
      public Thread createRunner(TestTask[] tasks) {
          return new TaskRunnerWithCompareAndExchange(tasks);
      }
  }

  private static class CompareAndSetRunnerFactory implements RunnerFactory {
      @Override
      public Thread createRunner(TestTask[] tasks) {
          return new TaskRunnerWithCompareAndSet(tasks);
      }
  }

  private static class WeakCompareAndSetRunnerFactory implements RunnerFactory {
      @Override
      public Thread createRunner(TestTask[] tasks) {
          return new TaskRunnerWithWeakCompareAndSet(tasks);
      }
  }

  private static class TestTask {
      private final Integer ord;
      private final Consumer<Integer> action;

      TestTask(Integer ord, Consumer<Integer> action) {
          this.ord = ord;
          this.action = action;
      }

      public void exec() {
          action.accept(ord);
      }
  }

  private static void check(int actual, int expected, String msg, int iteration) {
    if (actual != expected) {
      System.err.println(String.format("[iteration %d] %s : %d != %d",
                  iteration, msg, actual, expected));
      System.exit(1);
    }
  }
}