Skip to content

Commit 9263265

Browse files
committed
first attempt
1 parent b33d52b commit 9263265

4 files changed

Lines changed: 287 additions & 2 deletions

File tree

driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,9 +253,16 @@ default AsyncRunnable thenRunRetryingWhile(
253253
* @see AsyncCallbackLoop
254254
*/
255255
default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final BooleanSupplier whileCheck) {
256+
return thenRunDoWhileLoop(true, loopBodyRunnable, whileCheck);
257+
}
258+
259+
default AsyncRunnable thenRunDoWhileLoop(
260+
final boolean optimized,
261+
final AsyncRunnable loopBodyRunnable,
262+
final BooleanSupplier whileCheck) {
256263
return thenRun(finalCallback -> {
257264
LoopState loopState = new LoopState();
258-
new AsyncCallbackLoop(loopState, iterationCallback -> {
265+
new AsyncCallbackLoop(optimized, loopState, iterationCallback -> {
259266

260267
loopBodyRunnable.finish((result, t) -> {
261268
if (t != null) {

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
*/
4040
@NotThreadSafe
4141
public final class AsyncCallbackLoop implements AsyncCallbackRunnable {
42+
@Nullable
43+
private final CallbackChain chain;
4244
private final LoopState state;
4345
private final AsyncCallbackRunnable body;
4446

@@ -47,13 +49,19 @@ public final class AsyncCallbackLoop implements AsyncCallbackRunnable {
4749
* @param body The body of the loop.
4850
*/
4951
public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body) {
52+
this(true, state, body);
53+
}
54+
55+
public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) {
56+
this.chain = optimized ? new CallbackChain() : null;
5057
this.state = state;
5158
this.body = body;
5259
}
5360

5461
@Override
5562
public void run(final SingleResultCallback<Void> callback) {
5663
body.run(new LoopingCallback(callback));
64+
CallbackChain.run(chain);
5765
}
5866

5967
/**
@@ -80,7 +88,7 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) {
8088
return;
8189
}
8290
if (continueLooping) {
83-
body.run(this);
91+
CallbackChain.addOrRun(chain, () -> body.run(this));
8492
} else {
8593
wrapped.onResult(result, null);
8694
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.async.function;
17+
18+
import com.mongodb.lang.Nullable;
19+
20+
import static com.mongodb.assertions.Assertions.assertNotNull;
21+
import static com.mongodb.assertions.Assertions.assertNull;
22+
23+
public final class CallbackChain {
24+
@Nullable
25+
private Runnable next;
26+
27+
public CallbackChain() {
28+
}
29+
30+
public static void addOrRun(@Nullable final CallbackChain chain, final Runnable next) {
31+
if (chain != null) {
32+
chain.add(next);
33+
} else {
34+
next.run();
35+
}
36+
}
37+
38+
public static void run(final @Nullable CallbackChain chain) {
39+
if (chain != null) {
40+
chain.run();
41+
}
42+
}
43+
44+
private void add(final Runnable next) {
45+
assertNotNull(next);
46+
assertNull(this.next);
47+
this.next = next;
48+
}
49+
50+
private void run() {
51+
for (Runnable localNext = next; localNext != null; localNext = next) {
52+
next = null;
53+
localNext.run();
54+
}
55+
}
56+
}
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.internal.async;
17+
18+
import com.mongodb.internal.async.function.AsyncCallbackLoop;
19+
import com.mongodb.internal.async.function.LoopState;
20+
import com.mongodb.lang.Nullable;
21+
import org.junit.jupiter.params.ParameterizedTest;
22+
import org.junit.jupiter.params.provider.CsvSource;
23+
import org.junit.jupiter.params.provider.ValueSource;
24+
25+
import java.io.IOException;
26+
import java.io.PrintWriter;
27+
import java.io.StringWriter;
28+
import java.util.Objects;
29+
import java.util.concurrent.atomic.AtomicInteger;
30+
31+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
32+
33+
class VakoTest {
34+
@ParameterizedTest
35+
@CsvSource({"false, 20", "true, 20"})
36+
void asyncCallbackLoop(final boolean optimized, final int iterations) {
37+
System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length);
38+
LoopState loopState = new LoopState();
39+
new AsyncCallbackLoop(optimized, loopState, c -> {
40+
int iteration = loopState.iteration();
41+
System.err.printf("iteration=%d, callStackDepth=%d%n", iteration, Thread.currentThread().getStackTrace().length);
42+
if (!loopState.breakAndCompleteIf(() -> iteration == (iterations - 1), c)) {
43+
c.complete(c);
44+
}
45+
}).run((r, t) -> {
46+
System.err.printf("test callback completed callStackDepth=%d, r=%s, t=%s%n",
47+
Thread.currentThread().getStackTrace().length, r, exceptionToString(t));
48+
});
49+
}
50+
51+
@ParameterizedTest
52+
@CsvSource({"false, 20", "true, 20"})
53+
void testA(final boolean optimized, final int counterValue) {
54+
System.err.printf("baselineStackDepth=%d%n", Thread.currentThread().getStackTrace().length);
55+
asyncMethod1A(optimized, new Counter(counterValue), (r, t) -> {
56+
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
57+
Thread.currentThread().getStackTrace().length, r, exceptionToString(t));
58+
});
59+
}
60+
61+
private static void asyncMethod1A(final boolean optimized, final Counter counter, final SingleResultCallback<Void> callback) {
62+
beginAsync().thenRunDoWhileLoop(optimized, c -> {
63+
asyncMethod2A(counter, c);
64+
}, () -> !counter.done()).finish(callback);
65+
}
66+
67+
private static void asyncMethod2A(final Counter counter, final SingleResultCallback<Void> callback) {
68+
counter.countDown();
69+
callback.complete(callback);
70+
}
71+
72+
@ParameterizedTest
73+
@ValueSource(ints = {10})
74+
void testB(final int counterValue) {
75+
AtomicInteger stackDepthUnoptimized = new AtomicInteger();
76+
AtomicInteger stackDepthOptimized = new AtomicInteger();
77+
asyncMethod1B(false, new Counter(counterValue), (r, t) -> {
78+
stackDepthUnoptimized.set(Thread.currentThread().getStackTrace().length);
79+
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
80+
stackDepthUnoptimized, r, exceptionToString(t));
81+
});
82+
asyncMethod1B(true, new Counter(counterValue), (r, t) -> {
83+
stackDepthOptimized.set(Thread.currentThread().getStackTrace().length);
84+
System.err.printf("test callback completed callStackDepth=%s, r=%s, t=%s%n",
85+
stackDepthOptimized, r, exceptionToString(t));
86+
});
87+
System.err.printf("test completed baselineStackDepth=%d, stackDepthUnoptimized=%s, stackDepthOptimized=%s%n",
88+
Thread.currentThread().getStackTrace().length, stackDepthOptimized, stackDepthUnoptimized);
89+
}
90+
91+
private static void asyncMethod1B(final boolean optimized, final Counter counter, final SingleResultCallback<Void> callback) {
92+
asyncMethod2B(counter, new Callback(optimized, counter, callback));
93+
}
94+
95+
private static void asyncMethod2B(final Counter counter, final SingleResultCallback<Void> callback) {
96+
counter.countDown();
97+
callback.complete(callback);
98+
}
99+
100+
private static final class Callback implements SingleResultCallback<Void> {
101+
private final boolean optimized;
102+
private Counter counter;
103+
private SingleResultCallback<Void> callback;
104+
105+
Callback(final boolean optimized, final Counter counter, final SingleResultCallback<Void> callback) {
106+
this.optimized = optimized;
107+
this.counter = counter;
108+
this.callback = callback;
109+
}
110+
111+
Counter takeCounter() {
112+
Counter localCounter = com.mongodb.assertions.Assertions.assertNotNull(counter);
113+
counter = null;
114+
return localCounter;
115+
}
116+
117+
void setCounter(final Counter counter) {
118+
com.mongodb.assertions.Assertions.assertNull(this.counter);
119+
this.counter = counter;
120+
}
121+
122+
SingleResultCallback<Void> takeCallback() {
123+
SingleResultCallback<Void> localCallback = com.mongodb.assertions.Assertions.assertNotNull(callback);
124+
callback = null;
125+
return localCallback;
126+
}
127+
128+
void setCallback(final SingleResultCallback<Void> callback) {
129+
com.mongodb.assertions.Assertions.assertNull(this.callback);
130+
this.callback = callback;
131+
}
132+
133+
@Override
134+
public void onResult(final Void result, final Throwable t) {
135+
SingleResultCallback<Void> localCallback = takeCallback();
136+
beginAsync().thenRun((c) -> {
137+
System.err.printf("thenRun%n");
138+
Counter localCounter = takeCounter();
139+
if (t != null) {
140+
System.err.printf("exception t=%s%n", exceptionToString(t));
141+
c.completeExceptionally(t);
142+
} else if (localCounter.done()) {
143+
c.complete(c);
144+
} else {
145+
asyncMethod2B(localCounter, new Callback(optimized, localCounter, localCallback));
146+
}
147+
}).finish((r, t2) -> {
148+
System.err.printf("finish r=%s, t=%s%n", r, exceptionToString(t2));
149+
localCallback.onResult(r, t);
150+
});
151+
}
152+
}
153+
154+
private static final class Counter {
155+
private int v;
156+
157+
Counter(final int v) {
158+
this.v = v;
159+
}
160+
161+
void countDown() {
162+
com.mongodb.assertions.Assertions.assertTrue(v > 0);
163+
v--;
164+
System.err.printf("counted %d->%d callStackDepth=%d %n", v + 1, v, Thread.currentThread().getStackTrace().length);
165+
}
166+
167+
boolean done() {
168+
if (v == 0) {
169+
System.err.printf("counting done callStackDepth=%d %n", Thread.currentThread().getStackTrace().length);
170+
return true;
171+
}
172+
return false;
173+
}
174+
}
175+
176+
private static String exceptionToString(@Nullable final Throwable t) {
177+
if (t == null) {
178+
return Objects.toString(null);
179+
}
180+
try (StringWriter sw = new StringWriter();
181+
PrintWriter pw = new PrintWriter(sw)) {
182+
t.printStackTrace(pw);
183+
pw.flush();
184+
return sw.toString();
185+
} catch (IOException e) {
186+
throw new RuntimeException(e);
187+
}
188+
}
189+
}
190+
191+
/*
192+
193+
c3.complete{
194+
//c3...
195+
c2.complete{
196+
//c2...
197+
c1.complete{
198+
//c1...
199+
}
200+
}
201+
202+
c3.complete{
203+
//c3...
204+
chain.add(c2)
205+
}
206+
chain.run() -> c2.complete{
207+
//c2...
208+
chain.add(c1)
209+
}
210+
chain.run() -> c1.complete{
211+
//c1...
212+
}
213+
214+
*/

0 commit comments

Comments
 (0)