Skip to content

Commit 3b7c659

Browse files
committed
second attempt
1 parent 9263265 commit 3b7c659

File tree

4 files changed

+107
-144
lines changed

4 files changed

+107
-144
lines changed

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ public AsyncCallbackLoop(final boolean optimized, final LoopState state, final A
6161
@Override
6262
public void run(final SingleResultCallback<Void> callback) {
6363
body.run(new LoopingCallback(callback));
64-
CallbackChain.run(chain);
6564
}
6665

6766
/**
@@ -88,7 +87,7 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) {
8887
return;
8988
}
9089
if (continueLooping) {
91-
CallbackChain.addOrRun(chain, () -> body.run(this));
90+
CallbackChain.execute(chain, () -> body.run(this));
9291
} else {
9392
wrapped.onResult(result, null);
9493
}

driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,50 @@
1717

1818
import com.mongodb.lang.Nullable;
1919

20+
import java.util.concurrent.atomic.AtomicReference;
21+
2022
import static com.mongodb.assertions.Assertions.assertNotNull;
2123
import static com.mongodb.assertions.Assertions.assertNull;
2224

23-
public final class CallbackChain {
25+
final class CallbackChain {
2426
@Nullable
2527
private Runnable next;
28+
private int runEnteredCounter;
29+
private final AtomicReference<Thread> thread;
2630

27-
public CallbackChain() {
31+
CallbackChain() {
32+
runEnteredCounter = 0;
33+
thread = new AtomicReference<>();
2834
}
2935

30-
public static void addOrRun(@Nullable final CallbackChain chain, final Runnable next) {
36+
static void execute(@Nullable final CallbackChain chain, final Runnable next) {
3137
if (chain != null) {
32-
chain.add(next);
38+
chain.execute(next);
3339
} else {
3440
next.run();
3541
}
3642
}
3743

38-
public static void run(final @Nullable CallbackChain chain) {
39-
if (chain != null) {
40-
chain.run();
41-
}
42-
}
43-
44-
private void add(final Runnable next) {
44+
// VAKOTODO figure out thread safety
45+
private void execute(final Runnable next) {
4546
assertNotNull(next);
4647
assertNull(this.next);
4748
this.next = next;
48-
}
4949

50-
private void run() {
51-
for (Runnable localNext = next; localNext != null; localNext = next) {
52-
next = null;
53-
localNext.run();
50+
// if (!thread.compareAndSet(null, Thread.currentThread())) {
51+
// assertTrue(Thread.currentThread() == thread.get());
52+
// }
53+
boolean recursive = ++runEnteredCounter > 1;
54+
try {
55+
if (recursive) {
56+
return;
57+
}
58+
for (Runnable localNext = next; localNext != null; localNext = this.next) {
59+
this.next = null;
60+
localNext.run();
61+
}
62+
} finally {
63+
runEnteredCounter--;
5464
}
5565
}
5666
}

driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -990,4 +990,23 @@ void testDerivation() {
990990
}).finish(callback);
991991
});
992992
}
993+
994+
@Test
995+
void testThenRunDoWhileLoop() {
996+
assertBehavesSameVariations(8,
997+
() -> {
998+
int i = 0;
999+
do {
1000+
i++;
1001+
sync(i);
1002+
} while (i < 3 && plainTest(i));
1003+
},
1004+
(callback) -> {
1005+
final int[] i = new int[1];
1006+
beginAsync().thenRunDoWhileLoop((c) -> {
1007+
i[0]++;
1008+
async(i[0], c);
1009+
}, () -> i[0] < 3 && plainTest(i[0])).finish(callback);
1010+
});
1011+
}
9931012
}

driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java

Lines changed: 61 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,27 @@
1717

1818
import com.mongodb.internal.async.function.AsyncCallbackLoop;
1919
import com.mongodb.internal.async.function.LoopState;
20+
import com.mongodb.internal.time.StartTime;
2021
import com.mongodb.lang.Nullable;
2122
import org.junit.jupiter.params.ParameterizedTest;
2223
import org.junit.jupiter.params.provider.CsvSource;
23-
import org.junit.jupiter.params.provider.ValueSource;
2424

2525
import java.io.IOException;
2626
import java.io.PrintWriter;
2727
import java.io.StringWriter;
2828
import java.util.Objects;
29-
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ForkJoinPool;
31+
import java.util.concurrent.TimeUnit;
3032

3133
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
3234

3335
class VakoTest {
3436
@ParameterizedTest
35-
@CsvSource({"false, 20", "true, 20"})
36-
void asyncCallbackLoop(final boolean optimized, final int iterations) {
37+
@CsvSource({"false, 30", "true, 30"})
38+
void asyncCallbackLoop(final boolean optimized, final int iterations) throws Exception {
3739
System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length);
40+
CompletableFuture<Void> join = new CompletableFuture<>();
3841
LoopState loopState = new LoopState();
3942
new AsyncCallbackLoop(optimized, loopState, c -> {
4043
int iteration = loopState.iteration();
@@ -45,127 +48,83 @@ void asyncCallbackLoop(final boolean optimized, final int iterations) {
4548
}).run((r, t) -> {
4649
System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n",
4750
Thread.currentThread().getStackTrace().length, r, exceptionToString(t));
51+
if (t != null) {
52+
join.completeExceptionally(t);
53+
} else {
54+
join.complete(r);
55+
}
4856
});
57+
join.get();
4958
}
5059

5160
@ParameterizedTest
52-
@CsvSource({"false, 20", "true, 20"})
53-
void testA(final boolean optimized, final int counterValue) {
61+
@CsvSource({
62+
"false, false, 30", "false, true, 30",
63+
"true, false, 30", "true, true, 30"})
64+
void testThenRunDoWhileLoop(final boolean optimized, final boolean separateThread, final int counterInitialValue) throws Exception {
65+
StartTime start = StartTime.now();
5466
System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length);
55-
asyncMethod1A(optimized, new Counter(counterValue), (r, t) -> {
67+
CompletableFuture<Void> join = new CompletableFuture<>();
68+
asyncMethod1(optimized, separateThread, new Counter(counterInitialValue), (r, t) -> {
5669
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
5770
Thread.currentThread().getStackTrace().length, r, exceptionToString(t));
71+
if (t != null) {
72+
join.completeExceptionally(t);
73+
} else {
74+
join.complete(r);
75+
}
5876
});
77+
System.err.printf("asyncMethod1 executed in %s%n", start.elapsed());
78+
join.get();
5979
}
6080

61-
private static void asyncMethod1A(final boolean optimized, final Counter counter, final SingleResultCallback<Void> callback) {
81+
private static void asyncMethod1(final boolean optimized, final boolean separateThread,
82+
final Counter counter, final SingleResultCallback<Void> callback) {
6283
beginAsync().thenRunDoWhileLoop(optimized, c -> {
63-
asyncMethod2A(counter, c);
84+
asyncMethod2(separateThread, counter, c);
6485
}, () -> !counter.done()).finish(callback);
6586
}
6687

67-
private static void asyncMethod2A(final Counter counter, final SingleResultCallback<Void> callback) {
68-
counter.countDown();
69-
callback.complete(callback);
70-
}
71-
72-
@ParameterizedTest
73-
@ValueSource(ints = {10})
74-
void testB(final int counterValue) {
75-
AtomicInteger stackDepthUnoptimized = new AtomicInteger();
76-
AtomicInteger stackDepthOptimized = new AtomicInteger();
77-
asyncMethod1B(false, new Counter(counterValue), (r, t) -> {
78-
stackDepthUnoptimized.set(Thread.currentThread().getStackTrace().length);
79-
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
80-
stackDepthUnoptimized, r, exceptionToString(t));
81-
});
82-
asyncMethod1B(true, new Counter(counterValue), (r, t) -> {
83-
stackDepthOptimized.set(Thread.currentThread().getStackTrace().length);
84-
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
85-
stackDepthOptimized, r, exceptionToString(t));
86-
});
87-
System.err.printf("test completed baselineStackDepth=%d, stackDepthUnoptimized=%s, stackDepthOptimized=%s%n",
88-
Thread.currentThread().getStackTrace().length, stackDepthOptimized, stackDepthUnoptimized);
89-
}
90-
91-
private static void asyncMethod1B(final boolean optimized, final Counter counter, final SingleResultCallback<Void> callback) {
92-
asyncMethod2B(counter, new Callback(optimized, counter, callback));
93-
}
94-
95-
private static void asyncMethod2B(final Counter counter, final SingleResultCallback<Void> callback) {
96-
counter.countDown();
97-
callback.complete(callback);
98-
}
99-
100-
private static final class Callback implements SingleResultCallback<Void> {
101-
private final boolean optimized;
102-
private Counter counter;
103-
private SingleResultCallback<Void> callback;
104-
105-
Callback(final boolean optimized, final Counter counter, final SingleResultCallback<Void> callback) {
106-
this.optimized = optimized;
107-
this.counter = counter;
108-
this.callback = callback;
109-
}
110-
111-
Counter takeCounter() {
112-
Counter localCounter = com.mongodb.assertions.Assertions.assertNotNull(counter);
113-
counter = null;
114-
return localCounter;
115-
}
116-
117-
void setCounter(final Counter counter) {
118-
com.mongodb.assertions.Assertions.assertNull(this.counter);
119-
this.counter = counter;
120-
}
121-
122-
SingleResultCallback<Void> takeCallback() {
123-
SingleResultCallback<Void> localCallback = com.mongodb.assertions.Assertions.assertNotNull(callback);
124-
callback = null;
125-
return localCallback;
126-
}
127-
128-
void setCallback(final SingleResultCallback<Void> callback) {
129-
com.mongodb.assertions.Assertions.assertNull(this.callback);
130-
this.callback = callback;
131-
}
132-
133-
@Override
134-
public void onResult(final Void result, final Throwable t) {
135-
SingleResultCallback<Void> localCallback = takeCallback();
136-
beginAsync().thenRun((c) -> {
137-
System.err.printf("thenRun%n");
138-
Counter localCounter = takeCounter();
139-
if (t != null) {
140-
System.err.printf("exception t=%s%n", exceptionToString(t));
141-
c.completeExceptionally(t);
142-
} else if (localCounter.done()) {
143-
c.complete(c);
144-
} else {
145-
asyncMethod2B(localCounter, new Callback(optimized, localCounter, localCallback));
146-
}
147-
}).finish((r, t2) -> {
148-
System.err.printf("finish r=%s, t=%s%n", r, exceptionToString(t2));
149-
localCallback.onResult(r, t);
150-
});
88+
private static void asyncMethod2(final boolean separateThread, final Counter counter, final SingleResultCallback<Void> callback) {
89+
Runnable action = () -> {
90+
try {
91+
Thread.sleep(TimeUnit.SECONDS.toMillis(1) / counter.initial());
92+
} catch (InterruptedException e) {
93+
Thread.currentThread().interrupt();
94+
throw new RuntimeException(e);
95+
}
96+
counter.countDown();
97+
callback.complete(callback);
98+
};
99+
if (separateThread) {
100+
ForkJoinPool.commonPool().execute(action);
101+
} else {
102+
action.run();
151103
}
152104
}
153105

154106
private static final class Counter {
155-
private int v;
107+
private final int initial;
108+
private int current;
109+
110+
Counter(final int initial) {
111+
this.initial = initial;
112+
this.current = initial;
113+
}
156114

157-
Counter(final int v) {
158-
this.v = v;
115+
int initial() {
116+
return initial;
159117
}
160118

161119
void countDown() {
162-
com.mongodb.assertions.Assertions.assertTrue(v > 0);
163-
v--;
164-
System.err.printf("counted %d->%d callStackDepth=%d %n", v + 1, v, Thread.currentThread().getStackTrace().length);
120+
com.mongodb.assertions.Assertions.assertTrue(current > 0);
121+
int previous = current;
122+
int decremented = --current;
123+
System.err.printf("counted %d->%d callStackDepth=%d %n", previous, decremented, Thread.currentThread().getStackTrace().length);
165124
}
166125

167126
boolean done() {
168-
if (v == 0) {
127+
if (current == 0) {
169128
System.err.printf("counting done callStackDepth=%d %n", Thread.currentThread().getStackTrace().length);
170129
return true;
171130
}
@@ -179,36 +138,12 @@ private static String exceptionToString(@Nullable final Throwable t) {
179138
}
180139
try (StringWriter sw = new StringWriter();
181140
PrintWriter pw = new PrintWriter(sw)) {
182-
t.printStackTrace(pw);
141+
// t.printStackTrace(pw);
142+
pw.println(t);
183143
pw.flush();
184144
return sw.toString();
185145
} catch (IOException e) {
186146
throw new RuntimeException(e);
187147
}
188148
}
189149
}
190-
191-
/*
192-
193-
c3.complete{
194-
//c3...
195-
c2.complete{
196-
//c2...
197-
c1.complete{
198-
//c1...
199-
}
200-
}
201-
202-
c3.complete{
203-
//c3...
204-
chain.add(c2)
205-
}
206-
chain.run() -> c2.complete{
207-
//c2...
208-
chain.add(c1)
209-
}
210-
chain.run() -> c1.complete{
211-
//c1...
212-
}
213-
214-
*/

0 commit comments

Comments
 (0)