Skip to content

Commit 0020c7e

Browse files
committed
improve the test
1 parent 4ab2ad0 commit 0020c7e

2 files changed

Lines changed: 394 additions & 265 deletions

File tree

Lines changed: 394 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,394 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.async;
17+
18+
import com.mongodb.internal.async.function.AsyncCallbackLoop;
19+
import com.mongodb.internal.async.function.LoopState;
20+
import com.mongodb.internal.time.StartTime;
21+
import com.mongodb.lang.Nullable;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.CsvSource;
24+
25+
import java.io.IOException;
26+
import java.io.PrintWriter;
27+
import java.io.StringWriter;
28+
import java.time.Duration;
29+
import java.util.Objects;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ExecutionException;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.ScheduledThreadPoolExecutor;
34+
import java.util.concurrent.ThreadLocalRandom;
35+
import java.util.concurrent.TimeUnit;
36+
37+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
38+
import static org.junit.jupiter.api.Assertions.assertTrue;
39+
40+
class AsyncLoopTest {
41+
private static final int MAX_STACK_DEPTH = 500;
42+
43+
@ParameterizedTest
44+
@CsvSource({
45+
"10"
46+
})
47+
void testDemo(final int iterations) throws Exception {
48+
System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length);
49+
CompletableFuture<Void> join = new CompletableFuture<>();
50+
LoopState loopState = new LoopState();
51+
new AsyncCallbackLoop(loopState, c -> {
52+
int iteration = loopState.iteration();
53+
System.err.printf("iteration=%d, callStackDepth=%d%n", iteration, Thread.currentThread().getStackTrace().length);
54+
if (!loopState.breakAndCompleteIf(() -> iteration == (iterations - 1), c)) {
55+
c.complete(c);
56+
}
57+
}).run((r, t) -> {
58+
System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n",
59+
Thread.currentThread().getStackTrace().length, r, exceptionToString(t));
60+
complete(join, r, t);
61+
});
62+
join.get();
63+
System.err.printf("%nDONE%n%n");
64+
}
65+
66+
private enum IterationExecutionType {
67+
SYNC_SAME_THREAD,
68+
SYNC_DIFFERENT_THREAD,
69+
ASYNC,
70+
MIXED_SYNC_SAME_THREAD_AND_ASYNC
71+
}
72+
73+
private enum Verbocity {
74+
VERBOSE,
75+
COMPACT;
76+
77+
/**
78+
* Every {@value}s message is printed.
79+
*/
80+
private static final int COMPACTNESS = 50_000;
81+
}
82+
83+
private enum ThreadManagement {
84+
NEW_THREAD_PER_TASK,
85+
REUSE_THREADS
86+
}
87+
88+
@ParameterizedTest()
89+
@CsvSource({
90+
"250_000, 0, SYNC_SAME_THREAD, 0, COMPACT, 0, REUSE_THREADS",
91+
"250_000, 0, ASYNC, 0, COMPACT, 0, NEW_THREAD_PER_TASK",
92+
"250_000, 0, ASYNC, 0, COMPACT, 1, REUSE_THREADS",
93+
"250_000, 0, ASYNC, 0, COMPACT, 2, REUSE_THREADS",
94+
"250_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, COMPACT, 0, NEW_THREAD_PER_TASK",
95+
"250_000, 0, MIXED_SYNC_SAME_THREAD_AND_ASYNC, 0, COMPACT, 1, REUSE_THREADS",
96+
"4, 0, ASYNC, 4, VERBOSE, 1, REUSE_THREADS",
97+
"4, 4, ASYNC, 0, VERBOSE, 1, REUSE_THREADS",
98+
"250_000, 0, SYNC_DIFFERENT_THREAD, 0, COMPACT, 0, NEW_THREAD_PER_TASK",
99+
"250_000, 0, SYNC_DIFFERENT_THREAD, 0, COMPACT, 1, REUSE_THREADS",
100+
})
101+
void thenRunDoWhileLoopTest(
102+
final int counterInitialValue,
103+
final int blockSyncPartOfIterationTotalSeconds,
104+
final IterationExecutionType executionType,
105+
final int delayAsyncExecutionTotalSeconds,
106+
final Verbocity verbocity,
107+
final int executorSize,
108+
final ThreadManagement threadManagement) throws Exception {
109+
Duration blockSyncPartOfIterationTotalDuration = Duration.ofSeconds(blockSyncPartOfIterationTotalSeconds);
110+
if (executionType.equals(IterationExecutionType.SYNC_DIFFERENT_THREAD)) {
111+
com.mongodb.assertions.Assertions.assertTrue(
112+
(executorSize > 0 && threadManagement.equals(ThreadManagement.REUSE_THREADS))
113+
|| (executorSize == 0 && threadManagement.equals(ThreadManagement.NEW_THREAD_PER_TASK)));
114+
}
115+
if (executionType.equals(IterationExecutionType.SYNC_SAME_THREAD)) {
116+
com.mongodb.assertions.Assertions.assertTrue(executorSize == 0);
117+
com.mongodb.assertions.Assertions.assertTrue(threadManagement.equals(ThreadManagement.REUSE_THREADS));
118+
}
119+
if (!executionType.equals(IterationExecutionType.ASYNC)) {
120+
com.mongodb.assertions.Assertions.assertTrue(delayAsyncExecutionTotalSeconds == 0);
121+
}
122+
if (threadManagement.equals(ThreadManagement.NEW_THREAD_PER_TASK)) {
123+
com.mongodb.assertions.Assertions.assertTrue(executorSize == 0);
124+
}
125+
Duration delayAsyncExecutionTotalDuration = Duration.ofSeconds(delayAsyncExecutionTotalSeconds);
126+
ScheduledExecutor executor = executorSize == 0 ? null : new ScheduledExecutor(executorSize, threadManagement);
127+
try {
128+
System.err.printf("baselineStackDepth=%d%n%n", Thread.currentThread().getStackTrace().length);
129+
StartTime start = StartTime.now();
130+
CompletableFuture<Void> join = new CompletableFuture<>();
131+
asyncLoop(new Counter(counterInitialValue, verbocity),
132+
blockSyncPartOfIterationTotalDuration, executionType, delayAsyncExecutionTotalDuration, verbocity, executor,
133+
(r, t) -> {
134+
int stackDepth = Thread.currentThread().getStackTrace().length;
135+
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
136+
stackDepth, r, exceptionToString(t));
137+
assertTrue(stackDepth <= MAX_STACK_DEPTH);
138+
complete(join, r, t);
139+
});
140+
System.err.printf("\tasyncLoop method completed in %s%n", start.elapsed());
141+
join.get();
142+
System.err.printf("%nDONE%n%n");
143+
} finally {
144+
if (executor != null) {
145+
executor.shutdownNow();
146+
com.mongodb.assertions.Assertions.assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
147+
}
148+
}
149+
}
150+
151+
private static void asyncLoop(
152+
final Counter counter,
153+
final Duration blockSyncPartOfIterationTotalDuration,
154+
final IterationExecutionType executionType,
155+
final Duration delayAsyncExecutionTotalDuration,
156+
final Verbocity verbocity,
157+
@Nullable
158+
final ScheduledExecutor executor,
159+
final SingleResultCallback<Void> callback) {
160+
beginAsync().thenRunDoWhileLoop(c -> {
161+
sleep(blockSyncPartOfIterationTotalDuration.dividedBy(counter.initial()));
162+
StartTime start = StartTime.now();
163+
asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, verbocity, executor, c);
164+
if (verbocity.equals(Verbocity.VERBOSE)) {
165+
System.err.printf("\tasyncPartOfIteration method completed in %s%n", start.elapsed());
166+
}
167+
}, () -> !counter.done()).finish(callback);
168+
}
169+
170+
private static void asyncPartOfIteration(
171+
final Counter counter,
172+
final IterationExecutionType executionType,
173+
final Duration delayAsyncExecutionTotalDuration,
174+
final Verbocity verbocity,
175+
@Nullable
176+
final ScheduledExecutor executor,
177+
final SingleResultCallback<Void> callback) {
178+
Runnable asyncPartOfIteration = () -> {
179+
counter.countDown();
180+
StartTime start = StartTime.now();
181+
callback.complete(callback);
182+
if (verbocity.equals(Verbocity.VERBOSE)) {
183+
System.err.printf("\tasyncPartOfIteration callback.complete method completed in %s%n", start.elapsed());
184+
}
185+
};
186+
switch (executionType) {
187+
case SYNC_SAME_THREAD: {
188+
asyncPartOfIteration.run();
189+
break;
190+
}
191+
case SYNC_DIFFERENT_THREAD: {
192+
if (executor == null) {
193+
Thread thread = new Thread(asyncPartOfIteration);
194+
thread.start();
195+
join(thread);
196+
} else {
197+
join(executor.submit(asyncPartOfIteration));
198+
}
199+
break;
200+
}
201+
case ASYNC: {
202+
if (executor == null) {
203+
Thread thread = new Thread(() -> {
204+
sleep(delayAsyncExecutionTotalDuration.dividedBy(counter.initial()));
205+
asyncPartOfIteration.run();
206+
});
207+
thread.start();
208+
} else {
209+
com.mongodb.assertions.Assertions.assertNotNull(executor).schedule(asyncPartOfIteration,
210+
delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS);
211+
}
212+
break;
213+
}
214+
case MIXED_SYNC_SAME_THREAD_AND_ASYNC: {
215+
if (ThreadLocalRandom.current().nextBoolean()) {
216+
asyncPartOfIteration.run();
217+
} else {
218+
if (executor == null) {
219+
Thread thread = new Thread(() -> {
220+
sleep(delayAsyncExecutionTotalDuration.dividedBy(counter.initial()));
221+
asyncPartOfIteration.run();
222+
});
223+
thread.start();
224+
} else {
225+
com.mongodb.assertions.Assertions.assertNotNull(executor).schedule(asyncPartOfIteration,
226+
delayAsyncExecutionTotalDuration.dividedBy(counter.initial()).toNanos(), TimeUnit.NANOSECONDS);
227+
}
228+
}
229+
break;
230+
}
231+
default: {
232+
com.mongodb.assertions.Assertions.fail(executionType.toString());
233+
}
234+
}
235+
}
236+
237+
private static final class Counter {
238+
private final int initial;
239+
private int current;
240+
private boolean doneReturnedTrue;
241+
private final Verbocity verbocity;
242+
243+
Counter(final int initial, final Verbocity verbocity) {
244+
this.initial = initial;
245+
this.current = initial;
246+
this.doneReturnedTrue = false;
247+
this.verbocity = verbocity;
248+
}
249+
250+
int initial() {
251+
return initial;
252+
}
253+
254+
void countDown() {
255+
com.mongodb.assertions.Assertions.assertTrue(current > 0);
256+
int previous = current;
257+
int decremented = --current;
258+
if (verbocity.equals(Verbocity.VERBOSE) || decremented % Verbocity.COMPACTNESS == 0) {
259+
int stackDepth = Thread.currentThread().getStackTrace().length;
260+
assertTrue(stackDepth <= MAX_STACK_DEPTH);
261+
System.err.printf("counted %d->%d tid=%d callStackDepth=%d %n",
262+
previous, decremented, Thread.currentThread().getId(), stackDepth);
263+
}
264+
}
265+
266+
boolean done() {
267+
if (current == 0) {
268+
com.mongodb.assertions.Assertions.assertFalse(doneReturnedTrue);
269+
int stackDepth = Thread.currentThread().getStackTrace().length;
270+
assertTrue(stackDepth <= MAX_STACK_DEPTH);
271+
System.err.printf("counting done callStackDepth=%d %n", stackDepth);
272+
doneReturnedTrue = true;
273+
return true;
274+
}
275+
return false;
276+
}
277+
}
278+
279+
private static String exceptionToString(@Nullable final Throwable t) {
280+
if (t == null) {
281+
return Objects.toString(null);
282+
}
283+
try (StringWriter sw = new StringWriter();
284+
PrintWriter pw = new PrintWriter(sw)) {
285+
// t.printStackTrace(pw);
286+
pw.println(t);
287+
pw.flush();
288+
return sw.toString();
289+
} catch (IOException e) {
290+
throw new RuntimeException(e);
291+
}
292+
}
293+
294+
private static <T> void complete(final CompletableFuture<T> future, @Nullable final T result, @Nullable final Throwable t) {
295+
if (t != null) {
296+
future.completeExceptionally(t);
297+
} else {
298+
future.complete(result);
299+
}
300+
}
301+
302+
private static void join(final Thread thread) {
303+
try {
304+
thread.join();
305+
} catch (InterruptedException e) {
306+
Thread.currentThread().interrupt();
307+
throw new RuntimeException(e);
308+
}
309+
}
310+
311+
private static void join(final Future<?> future) {
312+
try {
313+
future.get();
314+
} catch (InterruptedException e) {
315+
Thread.currentThread().interrupt();
316+
throw new RuntimeException(e);
317+
} catch (ExecutionException e) {
318+
throw new RuntimeException(e);
319+
}
320+
}
321+
322+
private static void sleep(final Duration duration) {
323+
if (duration.isZero()) {
324+
return;
325+
}
326+
try {
327+
long durationNsPart = duration.getNano();
328+
long durationMsPartFromNsPart = TimeUnit.MILLISECONDS.convert(duration.getNano(), TimeUnit.NANOSECONDS);
329+
long sleepMs = TimeUnit.MILLISECONDS.convert(duration.getSeconds(), TimeUnit.SECONDS) + durationMsPartFromNsPart;
330+
int sleepNs = Math.toIntExact(durationNsPart - TimeUnit.NANOSECONDS.convert(durationMsPartFromNsPart, TimeUnit.MILLISECONDS));
331+
Thread.sleep(sleepMs, sleepNs);
332+
} catch (InterruptedException e) {
333+
Thread.currentThread().interrupt();
334+
throw new RuntimeException(e);
335+
}
336+
}
337+
338+
/**
339+
* This {@link ScheduledThreadPoolExecutor} propagates exceptions that caused termination of a task execution,
340+
* causing the thread that executed the task to be terminated.
341+
*/
342+
private static final class ScheduledExecutor extends ScheduledThreadPoolExecutor {
343+
ScheduledExecutor(final int size, final ThreadManagement threadManagement) {
344+
super(size, r -> {
345+
Thread thread = new Thread(() -> {
346+
r.run();
347+
if (threadManagement.equals(ThreadManagement.NEW_THREAD_PER_TASK)) {
348+
terminateCurrentThread();
349+
}
350+
});
351+
thread.setUncaughtExceptionHandler((t, e) -> {
352+
if (e instanceof ThreadTerminationException) {
353+
return;
354+
}
355+
t.getThreadGroup().uncaughtException(t, e);
356+
});
357+
return thread;
358+
});
359+
}
360+
361+
private static void terminateCurrentThread() {
362+
throw ThreadTerminationException.INSTANCE;
363+
}
364+
365+
@Override
366+
protected void afterExecute(final Runnable r, final Throwable t) {
367+
if (t instanceof ThreadTerminationException) {
368+
throw (ThreadTerminationException) t;
369+
} else if (r instanceof Future<?>) {
370+
Future<?> future = (Future<?>) r;
371+
if (future.isDone()) {
372+
try {
373+
future.get();
374+
} catch (ExecutionException e) {
375+
Throwable cause = e.getCause();
376+
if (cause instanceof ThreadTerminationException) {
377+
throw (ThreadTerminationException) cause;
378+
}
379+
} catch (Throwable e) {
380+
// do nothing, we are not swallowing `e`, btw
381+
}
382+
}
383+
}
384+
}
385+
386+
private static final class ThreadTerminationException extends RuntimeException {
387+
static final ThreadTerminationException INSTANCE = new ThreadTerminationException();
388+
389+
private ThreadTerminationException() {
390+
super(null, null, false, false);
391+
}
392+
}
393+
}
394+
}

0 commit comments

Comments
 (0)