Skip to content

Commit 271c0bb

Browse files
committed
third attempt
1 parent 3b7c659 commit 271c0bb

File tree

2 files changed

+58
-40
lines changed

2 files changed

+58
-40
lines changed

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.mongodb.annotations.NotThreadSafe;
1919
import com.mongodb.internal.async.SingleResultCallback;
20+
import com.mongodb.internal.async.function.CallbackChain.Element;
2021
import com.mongodb.lang.Nullable;
2122

2223
import java.util.function.Supplier;
@@ -39,10 +40,8 @@
3940
*/
4041
@NotThreadSafe
4142
public final class AsyncCallbackLoop implements AsyncCallbackRunnable {
42-
@Nullable
43-
private final CallbackChain chain;
4443
private final LoopState state;
45-
private final AsyncCallbackRunnable body;
44+
private final Body body;
4645

4746
/**
4847
* @param state The {@link LoopState} to be deemed as initial for the purpose of the new {@link AsyncCallbackLoop}.
@@ -53,43 +52,67 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body
5352
}
5453

5554
public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) {
56-
this.chain = optimized ? new CallbackChain() : null;
5755
this.state = state;
58-
this.body = body;
56+
this.body = new Body(optimized, body);
5957
}
6058

6159
@Override
6260
public void run(final SingleResultCallback<Void> callback) {
6361
body.run(new LoopingCallback(callback));
6462
}
6563

64+
private static final class Body {
65+
private final AsyncCallbackRunnable wrapped;
66+
@Nullable
67+
private final CallbackChain chain;
68+
69+
private Body(final boolean optimized, final AsyncCallbackRunnable body) {
70+
this.wrapped = body;
71+
this.chain = optimized ? new CallbackChain() : null;
72+
}
73+
74+
@Nullable
75+
Element run(final LoopingCallback loopingCallback) {
76+
Element[] mutableElement = new Element[1];
77+
wrapped.run((r, t) -> {
78+
Element nextCallbackToComplete = loopingCallback.onResult(r, t);
79+
if (!CallbackChain.execute(chain, nextCallbackToComplete)) {
80+
mutableElement[0] = nextCallbackToComplete;
81+
}
82+
});
83+
return mutableElement[0];
84+
}
85+
}
86+
6687
/**
6788
* This callback is allowed to be completed more than once.
6889
*/
6990
@NotThreadSafe
70-
private class LoopingCallback implements SingleResultCallback<Void> {
91+
private class LoopingCallback {
7192
private final SingleResultCallback<Void> wrapped;
7293

7394
LoopingCallback(final SingleResultCallback<Void> callback) {
7495
wrapped = callback;
7596
}
7697

77-
@Override
78-
public void onResult(@Nullable final Void result, @Nullable final Throwable t) {
98+
@Nullable
99+
public Element onResult(@Nullable final Void result, @Nullable final Throwable t) {
79100
if (t != null) {
80101
wrapped.onResult(null, t);
102+
return null;
81103
} else {
82104
boolean continueLooping;
83105
try {
84106
continueLooping = state.advance();
85107
} catch (Throwable e) {
86108
wrapped.onResult(null, e);
87-
return;
109+
return null;
88110
}
89111
if (continueLooping) {
90-
CallbackChain.execute(chain, () -> body.run(this));
112+
return () -> body.run(this);
91113
} else {
92114
wrapped.onResult(result, null);
115+
return null;
93116
}
94117
}
95118
}

driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,52 +15,47 @@
1515
*/
1616
package com.mongodb.internal.async.function;
1717

18+
import com.mongodb.annotations.NotThreadSafe;
1819
import com.mongodb.lang.Nullable;
1920

20-
import java.util.concurrent.atomic.AtomicReference;
21-
22-
import static com.mongodb.assertions.Assertions.assertNotNull;
23-
import static com.mongodb.assertions.Assertions.assertNull;
24-
21+
@NotThreadSafe
2522
final class CallbackChain {
26-
@Nullable
27-
private Runnable next;
28-
private int runEnteredCounter;
29-
private final AtomicReference<Thread> thread;
23+
private int enteringCounter;
3024

3125
CallbackChain() {
32-
runEnteredCounter = 0;
33-
thread = new AtomicReference<>();
26+
enteringCounter = 0;
3427
}
3528

36-
static void execute(@Nullable final CallbackChain chain, final Runnable next) {
29+
static boolean execute(@Nullable final CallbackChain chain, @Nullable final Element element) {
30+
if (element == null) {
31+
return false;
32+
}
3733
if (chain != null) {
38-
chain.execute(next);
34+
return chain.execute(element);
3935
} else {
40-
next.run();
36+
element.execute();
37+
return true;
4138
}
4239
}
4340

44-
// VAKOTODO figure out thread safety
45-
private void execute(final Runnable next) {
46-
assertNotNull(next);
47-
assertNull(this.next);
48-
this.next = next;
49-
50-
// if (!thread.compareAndSet(null, Thread.currentThread())) {
51-
// assertTrue(Thread.currentThread() == thread.get());
52-
// }
53-
boolean recursive = ++runEnteredCounter > 1;
41+
private boolean execute(final Element element) {
42+
boolean reentered = ++enteringCounter > 1;
5443
try {
55-
if (recursive) {
56-
return;
44+
if (reentered) {
45+
return false;
5746
}
58-
for (Runnable localNext = next; localNext != null; localNext = this.next) {
59-
this.next = null;
60-
localNext.run();
47+
Element next = element.execute();
48+
while (next != null) {
49+
next = next.execute();
6150
}
6251
} finally {
63-
runEnteredCounter--;
52+
enteringCounter--;
6453
}
54+
return true;
55+
}
56+
57+
interface Element {
58+
@Nullable
59+
Element execute();
6560
}
6661
}

0 commit comments

Comments
 (0)