Skip to content

Commit b71501e

Browse files
committed
third attempt
1 parent 3b7c659 commit b71501e

File tree

2 files changed

+63
-38
lines changed

2 files changed

+63
-38
lines changed

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@
1616
package com.mongodb.internal.async.function;
1717

1818
import com.mongodb.annotations.NotThreadSafe;
19+
import com.mongodb.assertions.Assertions;
20+
import com.mongodb.connection.AsyncCompletionHandler;
1921
import com.mongodb.internal.async.SingleResultCallback;
22+
import com.mongodb.internal.async.function.CallbackChain.Element;
2023
import com.mongodb.lang.Nullable;
2124

2225
import java.util.function.Supplier;
2326

27+
import static com.mongodb.assertions.Assertions.assertNotNull;
28+
import static com.mongodb.assertions.Assertions.assertNull;
29+
2430
/**
2531
* A decorator that implements automatic repeating of an {@link AsyncCallbackRunnable}.
2632
* {@link AsyncCallbackLoop} may execute the original asynchronous function multiple times sequentially,
@@ -39,10 +45,8 @@
3945
*/
4046
@NotThreadSafe
4147
public final class AsyncCallbackLoop implements AsyncCallbackRunnable {
42-
@Nullable
43-
private final CallbackChain chain;
4448
private final LoopState state;
45-
private final AsyncCallbackRunnable body;
49+
private final Body body;
4650

4751
/**
4852
* @param state The {@link LoopState} to be deemed as initial for the purpose of the new {@link AsyncCallbackLoop}.
@@ -53,43 +57,67 @@ public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body
5357
}
5458

5559
public AsyncCallbackLoop(final boolean optimized, final LoopState state, final AsyncCallbackRunnable body) {
56-
this.chain = optimized ? new CallbackChain() : null;
5760
this.state = state;
58-
this.body = body;
61+
this.body = new Body(optimized, body);
5962
}
6063

6164
@Override
6265
public void run(final SingleResultCallback<Void> callback) {
6366
body.run(new LoopingCallback(callback));
6467
}
6568

69+
private final class Body {
70+
private final AsyncCallbackRunnable wrapped;
71+
@Nullable
72+
private final CallbackChain chain;
73+
74+
private Body(final boolean optimized, final AsyncCallbackRunnable body) {
75+
this.wrapped = body;
76+
this.chain = optimized ? new CallbackChain() : null;
77+
}
78+
79+
@Nullable
80+
Element run(final LoopingCallback loopingCallback) {
81+
Element[] mutableElement = new Element[1];
82+
wrapped.run((r, t) -> {
83+
Element nextCallbackToComplete = loopingCallback.onResult(r, t);
84+
if (!CallbackChain.execute(chain, nextCallbackToComplete)) {
85+
mutableElement[0] = nextCallbackToComplete;
86+
}
87+
});
88+
return mutableElement[0];
89+
}
90+
}
91+
6692
/**
6793
* This callback is allowed to be completed more than once.
6894
*/
6995
@NotThreadSafe
70-
private class LoopingCallback implements SingleResultCallback<Void> {
96+
private class LoopingCallback {
7197
private final SingleResultCallback<Void> wrapped;
7298

7399
LoopingCallback(final SingleResultCallback<Void> callback) {
74100
wrapped = callback;
75101
}
76102

77-
@Override
78-
public void onResult(@Nullable final Void result, @Nullable final Throwable t) {
103+
@Nullable
104+
public Element onResult(@Nullable final Void result, @Nullable final Throwable t) {
79105
if (t != null) {
80106
wrapped.onResult(null, t);
107+
return null;
81108
} else {
82109
boolean continueLooping;
83110
try {
84111
continueLooping = state.advance();
85112
} catch (Throwable e) {
86113
wrapped.onResult(null, e);
87-
return;
114+
return null;
88115
}
89116
if (continueLooping) {
90-
CallbackChain.execute(chain, () -> body.run(this));
117+
return () -> body.run(this);
91118
} else {
92119
wrapped.onResult(result, null);
120+
return null;
93121
}
94122
}
95123
}

driver-core/src/main/com/mongodb/internal/async/function/CallbackChain.java

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,52 +15,49 @@
1515
*/
1616
package com.mongodb.internal.async.function;
1717

18+
import com.mongodb.annotations.NotThreadSafe;
1819
import com.mongodb.lang.Nullable;
1920

2021
import java.util.concurrent.atomic.AtomicReference;
2122

22-
import static com.mongodb.assertions.Assertions.assertNotNull;
23-
import static com.mongodb.assertions.Assertions.assertNull;
24-
23+
@NotThreadSafe
2524
final class CallbackChain {
26-
@Nullable
27-
private Runnable next;
28-
private int runEnteredCounter;
29-
private final AtomicReference<Thread> thread;
25+
private int enteringCounter;
3026

3127
CallbackChain() {
32-
runEnteredCounter = 0;
33-
thread = new AtomicReference<>();
28+
enteringCounter = 0;
3429
}
3530

36-
static void execute(@Nullable final CallbackChain chain, final Runnable next) {
31+
static boolean execute(@Nullable final CallbackChain chain, @Nullable final Element element) {
32+
if (element == null) {
33+
return false;
34+
}
3735
if (chain != null) {
38-
chain.execute(next);
36+
return chain.execute(element);
3937
} else {
40-
next.run();
38+
element.execute();
39+
return true;
4140
}
4241
}
4342

44-
// VAKOTODO figure out thread safety
45-
private void execute(final Runnable next) {
46-
assertNotNull(next);
47-
assertNull(this.next);
48-
this.next = next;
49-
50-
// if (!thread.compareAndSet(null, Thread.currentThread())) {
51-
// assertTrue(Thread.currentThread() == thread.get());
52-
// }
53-
boolean recursive = ++runEnteredCounter > 1;
43+
private boolean execute(final Element element) {
44+
boolean reentered = ++enteringCounter > 1;
5445
try {
55-
if (recursive) {
56-
return;
46+
if (reentered) {
47+
return false;
5748
}
58-
for (Runnable localNext = next; localNext != null; localNext = this.next) {
59-
this.next = null;
60-
localNext.run();
49+
Element next = element.execute();
50+
while (next != null) {
51+
next = next.execute();
6152
}
6253
} finally {
63-
runEnteredCounter--;
54+
enteringCounter--;
6455
}
56+
return true;
57+
}
58+
59+
interface Element {
60+
@Nullable
61+
Element execute();
6562
}
6663
}

0 commit comments

Comments
 (0)