Skip to content

Commit e70e375

Browse files
committed
further refactor and improve the code
1 parent d418612 commit e70e375

2 files changed

Lines changed: 118 additions & 124 deletions

File tree

driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackLoop.java

Lines changed: 110 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -39,143 +39,136 @@
3939
*/
4040
@NotThreadSafe
4141
public final class AsyncCallbackLoop implements AsyncCallbackRunnable {
42-
private final Body body;
42+
private final LoopState state;
43+
private final AsyncCallbackRunnable body;
44+
private final ThreadLocal<SameThreadDetectionStatus> sameThreadDetector;
4345

4446
/**
4547
* @param state The {@link LoopState} to be deemed as initial for the purpose of the new {@link AsyncCallbackLoop}.
4648
* @param body The body of the loop.
4749
*/
4850
public AsyncCallbackLoop(final LoopState state, final AsyncCallbackRunnable body) {
49-
this.body = new Body(state, body);
51+
this.body = body;
52+
this.state = state;
53+
sameThreadDetector = ThreadLocal.withInitial(() -> SameThreadDetectionStatus.NEGATIVE);
5054
}
5155

5256
@Override
5357
public void run(final SingleResultCallback<Void> callback) {
54-
body.run(false, callback);
58+
run(false, callback);
5559
}
5660

57-
private static final class Body {
58-
private final AsyncCallbackRunnable body;
59-
private final LoopState state;
60-
private final ThreadLocal<SameThreadDetectionStatus> sameThreadDetector;
61-
62-
private enum SameThreadDetectionStatus {
63-
NEGATIVE,
64-
PROBING,
65-
POSITIVE
66-
}
67-
68-
private Body(final LoopState state, final AsyncCallbackRunnable body) {
69-
this.body = body;
70-
this.state = state;
71-
sameThreadDetector = ThreadLocal.withInitial(() -> SameThreadDetectionStatus.NEGATIVE);
72-
}
73-
74-
/**
75-
* Initiates a new iteration of the loop by invoking
76-
* {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}.
77-
* The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it:
78-
* <ul>
79-
* <li>synchronous execution—completion of the initiated iteration happens-before the method completion;</li>
80-
* <li>asynchronous execution—the aforementioned relation does not exist.</li>
81-
* </ul>
82-
*
83-
* <p>If another iteration is needed, it is initiated from the callback passed to
84-
* {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}
85-
* by invoking {@link #run(boolean, SingleResultCallback)}.
86-
* Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback.
87-
* Thus, it is guaranteed that all iterations are executed sequentially with each other
88-
* (that is, completion of one iteration happens-before initiation of the next one)
89-
* regardless of them being executed synchronously or asynchronously with the method that initiated them.
90-
*
91-
* <p>Initiating any but the {@linkplain LoopState#isFirstIteration() first} iteration is done using trampolining,
92-
* which allows us to do it iteratively rather than recursively, if iterations are executed synchronously,
93-
* and ensures stack usage does not increase with the number of iterations.
94-
*
95-
* @return {@code true} iff it is known that another iteration must be initiated.
96-
* This information is used only for trampolining, and is available only if the iteration executed synchronously.
97-
*
98-
* <p>It is impossible to detect whether an iteration is executed synchronously.
99-
* It is, however, possible to detect whether an iteration is executed in the same thread as the method that initiated it,
100-
* and we use it as proxy indicator of synchronous execution. Unfortunately, this means we do not support and behave incorrectly
101-
* if an iteration is executed synchronously but in a thread different from the one in which the method that
102-
* initiated the iteration was invoked.
103-
*
104-
* <p>The above limitation should not be a problem in practice:
105-
* <ul>
106-
* <li>the only way to execute an iteration synchronously but in a different thread is to block the thread that
107-
* initiated the iteration by waiting for completion of the iteration by that other thread;</li>
108-
* <li>blocking a thread is forbidden in asynchronous code, and we do not do it;</li>
109-
* <li>therefore, we would not have an iteration that is executed synchronously but in a different thread.</li>
110-
* </ul>
111-
*/
112-
boolean run(final boolean trampolining, final SingleResultCallback<Void> afterLoopCallback) {
113-
// The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with
114-
// the current method, which must be detected separately.
115-
//
116-
// It may be tempting to detect whether the iteration was executed synchronously by reading from the variable
117-
// and observing a write that is part of the callback execution. However, if the iteration is executed asynchronously with
118-
// the current method, then the aforementioned conflicting write and read actions are not ordered by
119-
// the happens-before relation, the execution contains a data race and the read is allowed to observe the write.
120-
// If such observation happens when the iteration is executed asynchronously, then we have a false positive.
121-
// Furthermore, depending on the nature of the value read, it may not be trustworthy.
122-
boolean[] trampoliningResult = {false};
123-
sameThreadDetector.set(SameThreadDetectionStatus.PROBING);
124-
body.run((r, t) -> {
125-
if (completeIfNeeded(afterLoopCallback, r, t)) {
126-
// If we are trampolining, then here we bounce up, trampolining completes and so is the whole loop;
127-
// otherwise, the whole loop simply completes.
61+
/**
62+
* Initiates a new iteration of the loop by invoking
63+
* {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}.
64+
* The initiated iteration may be executed either synchronously or asynchronously with the method that initiated it:
65+
* <ul>
66+
* <li>synchronous execution—completion of the initiated iteration happens-before the method completion;</li>
67+
* <li>asynchronous execution—the aforementioned relation does not exist.</li>
68+
* </ul>
69+
*
70+
* <p>If another iteration is needed, it is initiated from the callback passed to
71+
* {@link #body}{@code .}{@link AsyncCallbackRunnable#run(SingleResultCallback) run}
72+
* by invoking {@link #run(boolean, SingleResultCallback)}.
73+
* Completing the initiated iteration is {@linkplain SingleResultCallback#onResult(Object, Throwable) invoking} the callback.
74+
* Thus, it is guaranteed that all iterations are executed sequentially with each other
75+
* (that is, completion of one iteration happens-before initiation of the next one)
76+
* regardless of them being executed synchronously or asynchronously with the method that initiated them.
77+
*
78+
* <p>Initiating any but the {@linkplain LoopState#isFirstIteration() first} iteration is done using trampolining,
79+
* which allows us to do it iteratively rather than recursively, if iterations are executed synchronously,
80+
* and ensures stack usage does not increase with the number of iterations.
81+
*
82+
* @return {@code true} iff it is known that another iteration must be initiated.
83+
* This information is used only for trampolining, and is available only if the iteration executed synchronously.
84+
*
85+
* <p>It is impossible to detect whether an iteration is executed synchronously.
86+
* It is, however, possible to detect whether an iteration is executed in the same thread as the method that initiated it,
87+
* and we use this as a proxy indicator of synchronous execution. Unfortunately, this means we do not support / behave incorrectly
88+
* if an iteration is executed synchronously but in a thread different from the one in which the method that
89+
* initiated the iteration was invoked.
90+
*
91+
* <p>The above limitation should not be a problem in practice:
92+
* <ul>
93+
* <li>the only way to execute an iteration synchronously but in a different thread is to block the thread that
94+
* initiated the iteration by waiting for completion of the iteration by that other thread;</li>
95+
* <li>blocking a thread is forbidden in asynchronous code, and we do not do it;</li>
96+
* <li>therefore, we would not have an iteration that is executed synchronously but in a different thread.</li>
97+
* </ul>
98+
*/
99+
boolean run(final boolean trampolining, final SingleResultCallback<Void> afterLoopCallback) {
100+
// The `trampoliningResult` variable must be used only if the initiated iteration is executed synchronously with
101+
// the current method, which must be detected separately.
102+
//
103+
// It may be tempting to detect whether the iteration was executed synchronously by reading from the variable
104+
// and observing a write that is part of the callback execution. However, if the iteration is executed asynchronously with
105+
// the current method, then the aforementioned conflicting write and read actions are not ordered by
106+
// the happens-before relation, the execution contains a data race and the read is allowed to observe the write.
107+
// If such observation happens when the iteration is executed asynchronously, then we have a false positive.
108+
// Furthermore, depending on the nature of the value read, it may not be trustworthy.
109+
boolean[] trampoliningResult = {false};
110+
sameThreadDetector.set(SameThreadDetectionStatus.PROBING);
111+
body.run((r, t) -> {
112+
if (completeIfNeeded(afterLoopCallback, r, t)) {
113+
// Bounce if we are trampolining and the iteration was executed synchronously,
114+
// trampolining completes and so is the whole loop;
115+
// otherwise, the whole loop simply completes.
116+
return;
117+
}
118+
if (trampolining) {
119+
boolean sameThread = sameThreadDetector.get().equals(SameThreadDetectionStatus.PROBING);
120+
if (sameThread) {
121+
// Bounce if we are trampolining and the iteration was executed synchronously;
122+
// otherwise proceed to initiate trampolining.
123+
sameThreadDetector.set(SameThreadDetectionStatus.POSITIVE);
124+
trampoliningResult[0] = true;
128125
return;
126+
} else {
127+
sameThreadDetector.remove();
129128
}
130-
if (trampolining) {
131-
boolean sameThread = sameThreadDetector.get().equals(SameThreadDetectionStatus.PROBING);
132-
if (sameThread) {
133-
// Bounce up if we are trampolining and the iteration was executed synchronously;
134-
// otherwise proceed to begin trampolining.
135-
sameThreadDetector.set(SameThreadDetectionStatus.POSITIVE);
136-
trampoliningResult[0] = true;
137-
return;
138-
} else {
139-
sameThreadDetector.remove();
140-
}
141-
}
142-
// trampolining
143-
boolean anotherIterationNeeded;
144-
do {
145-
anotherIterationNeeded = run(true, afterLoopCallback);
146-
} while (anotherIterationNeeded);
147-
});
148-
try {
149-
return sameThreadDetector.get().equals(SameThreadDetectionStatus.POSITIVE) && trampoliningResult[0];
150-
} finally {
151-
sameThreadDetector.remove();
152129
}
130+
// initiate trampolining
131+
boolean anotherIterationNeeded;
132+
do {
133+
anotherIterationNeeded = run(true, afterLoopCallback);
134+
} while (anotherIterationNeeded);
135+
});
136+
try {
137+
return sameThreadDetector.get().equals(SameThreadDetectionStatus.POSITIVE) && trampoliningResult[0];
138+
} finally {
139+
sameThreadDetector.remove();
153140
}
141+
}
154142

155-
/**
156-
* @return {@code true} iff the {@code afterLoopCallback} was
157-
* {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}.
158-
*/
159-
private boolean completeIfNeeded(final SingleResultCallback<Void> afterLoopCallback,
160-
@Nullable final Void result, @Nullable final Throwable t) {
161-
if (t != null) {
162-
afterLoopCallback.onResult(null, t);
143+
/**
144+
* @return {@code true} iff the {@code afterLoopCallback} was
145+
* {@linkplain SingleResultCallback#onResult(Object, Throwable) completed}.
146+
*/
147+
private boolean completeIfNeeded(final SingleResultCallback<Void> afterLoopCallback,
148+
@Nullable final Void result, @Nullable final Throwable t) {
149+
if (t != null) {
150+
afterLoopCallback.onResult(null, t);
151+
return true;
152+
} else {
153+
boolean anotherIterationNeeded;
154+
try {
155+
anotherIterationNeeded = state.advance();
156+
} catch (Throwable e) {
157+
afterLoopCallback.onResult(null, e);
163158
return true;
159+
}
160+
if (anotherIterationNeeded) {
161+
return false;
164162
} else {
165-
boolean anotherIterationNeeded;
166-
try {
167-
anotherIterationNeeded = state.advance();
168-
} catch (Throwable e) {
169-
afterLoopCallback.onResult(null, e);
170-
return true;
171-
}
172-
if (anotherIterationNeeded) {
173-
return false;
174-
} else {
175-
afterLoopCallback.onResult(result, null);
176-
return true;
177-
}
163+
afterLoopCallback.onResult(result, null);
164+
return true;
178165
}
179166
}
180167
}
168+
169+
private enum SameThreadDetectionStatus {
170+
NEGATIVE,
171+
PROBING,
172+
POSITIVE
173+
}
181174
}

driver-core/src/test/unit/com/mongodb/internal/async/VakoTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,12 @@ private enum IterationExecutionType {
8383

8484
@ParameterizedTest()
8585
@CsvSource({
86-
"10, 0, SYNC_SAME_THREAD, 0, true",
87-
// "10, 0, SYNC_DIFFERENT_THREAD, 0, true",
88-
"10, 0, ASYNC, 4, true",
89-
"10, 4, ASYNC, 0, true",
86+
"1_000_000, 0, SYNC_SAME_THREAD, 0, false",
87+
// "1_000_000, 0, SYNC_DIFFERENT_THREAD, 0, false",
88+
"1_000_000, 0, ASYNC, 0, false",
9089
"1_000_000, 0, MIXED_SYNC_SAME_AND_ASYNC, 0, false",
90+
"4, 0, ASYNC, 4, true",
91+
"4, 4, ASYNC, 0, true",
9192
})
9293
void testThenRunDoWhileLoop(
9394
final int counterInitialValue,
@@ -109,7 +110,7 @@ void testThenRunDoWhileLoop(
109110
Thread.currentThread().getStackTrace().length, r, exceptionToString(t));
110111
complete(join, r, t);
111112
});
112-
System.err.printf("\tasyncLoop returned in %s%n", start.elapsed());
113+
System.err.printf("\tasyncLoop method completed in %s%n", start.elapsed());
113114
join.get();
114115
System.err.printf("%n%nDONE%n%n");
115116
}
@@ -126,7 +127,7 @@ private static void asyncLoop(
126127
StartTime start = StartTime.now();
127128
asyncPartOfIteration(counter, executionType, delayAsyncExecutionTotalDuration, verbose, c);
128129
if (verbose) {
129-
System.err.printf("\tasyncPartOfIteration returned in %s%n", start.elapsed());
130+
System.err.printf("\tasyncPartOfIteration method completed in %s%n", start.elapsed());
130131
}
131132
}, () -> !counter.done()).finish(callback);
132133
}
@@ -142,7 +143,7 @@ private static void asyncPartOfIteration(
142143
StartTime start = StartTime.now();
143144
callback.complete(callback);
144145
if (verbose) {
145-
System.err.printf("\tasyncPartOfIteration callback.complete returned in %s%n", start.elapsed());
146+
System.err.printf("\tasyncPartOfIteration callback.complete method completed in %s%n", start.elapsed());
146147
}
147148
};
148149
switch (executionType) {

0 commit comments

Comments
 (0)