Skip to content

Commit 2c48995

Browse files
vbabaninrozza
andauthored
Adjust timeout handling in client-side operations to account for RTT variations (#1793)
JAVA-5375 --------- Co-authored-by: Ross Lawley <ross@mongodb.com>
1 parent 07b53b9 commit 2c48995

7 files changed

Lines changed: 239 additions & 116 deletions

File tree

driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,10 @@ static Stream<Arguments> shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS()
331331
);
332332
}
333333

334-
@ParameterizedTest
335-
@MethodSource
336334
@DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS")
335+
@ParameterizedTest(name = "should choose timeoutMS when timeoutMS is less than connectTimeoutMS. "
336+
+ "Parameters: connectTimeoutMS: {0}, timeoutMS: {1}, expected: {2}")
337+
@MethodSource
337338
void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS,
338339
final Long timeoutMS,
339340
final long expected) {
@@ -345,7 +346,7 @@ void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTim
345346
0));
346347

347348
long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs();
348-
assertTrue(expected - calculatedTimeoutMS <= 1);
349+
assertTrue(expected - calculatedTimeoutMS <= 2);
349350
}
350351

351352
private TimeoutContextTest() {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/TimeoutHelper.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,29 @@ public static <T> MongoCollection<T> collectionWithTimeout(final MongoCollection
5555

5656
public static <T> Mono<MongoCollection<T>> collectionWithTimeoutMono(final MongoCollection<T> collection,
5757
@Nullable final Timeout timeout) {
58+
return collectionWithTimeoutMono(collection, timeout, DEFAULT_TIMEOUT_MESSAGE);
59+
}
60+
61+
public static <T> Mono<MongoCollection<T>> collectionWithTimeoutMono(final MongoCollection<T> collection,
62+
@Nullable final Timeout timeout,
63+
final String message) {
5864
try {
59-
return Mono.just(collectionWithTimeout(collection, timeout));
65+
return Mono.just(collectionWithTimeout(collection, timeout, message));
6066
} catch (MongoOperationTimeoutException e) {
6167
return Mono.error(e);
6268
}
6369
}
6470

6571
public static <T> Mono<MongoCollection<T>> collectionWithTimeoutDeferred(final MongoCollection<T> collection,
6672
@Nullable final Timeout timeout) {
67-
return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout));
73+
return collectionWithTimeoutDeferred(collection, timeout, DEFAULT_TIMEOUT_MESSAGE);
6874
}
6975

76+
public static <T> Mono<MongoCollection<T>> collectionWithTimeoutDeferred(final MongoCollection<T> collection,
77+
@Nullable final Timeout timeout,
78+
final String message) {
79+
return Mono.defer(() -> collectionWithTimeoutMono(collection, timeout, message));
80+
}
7081

7182
public static MongoDatabase databaseWithTimeout(final MongoDatabase database,
7283
@Nullable final Timeout timeout) {

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/gridfs/GridFSUploadPublisherImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@
5454
*/
5555
public final class GridFSUploadPublisherImpl implements GridFSUploadPublisher<Void> {
5656

57-
private static final String TIMEOUT_ERROR_MESSAGE = "Saving chunks exceeded the timeout limit.";
57+
private static final String TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING = "Saving chunks exceeded the timeout limit.";
58+
private static final String TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION = "Upload cancellation exceeded the timeout limit.";
5859
private static final Document PROJECTION = new Document("_id", 1);
5960
private static final Document FILES_INDEX = new Document("filename", 1).append("uploadDate", 1);
6061
private static final Document CHUNKS_INDEX = new Document("files_id", 1).append("n", 1);
@@ -226,8 +227,8 @@ private Mono<Long> createSaveChunksMono(final AtomicBoolean terminated, @Nullabl
226227
.append("data", data);
227228

228229
Publisher<InsertOneResult> insertOnePublisher = clientSession == null
229-
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE).insertOne(chunkDocument)
230-
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE)
230+
? collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING).insertOne(chunkDocument)
231+
: collectionWithTimeout(chunksCollection, timeout, TIMEOUT_ERROR_MESSAGE_CHUNKS_SAVING)
231232
.insertOne(clientSession, chunkDocument);
232233

233234
return Mono.from(insertOnePublisher).thenReturn(data.length());
@@ -270,7 +271,8 @@ private Mono<InsertOneResult> createSaveFileDataMono(final AtomicBoolean termina
270271
}
271272

272273
private Mono<DeleteResult> createCancellationMono(final AtomicBoolean terminated, @Nullable final Timeout timeout) {
273-
Mono<MongoCollection<Document>> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout);
274+
Mono<MongoCollection<Document>> chunksCollectionMono = collectionWithTimeoutDeferred(chunksCollection, timeout,
275+
TIMEOUT_ERROR_MESSAGE_UPLOAD_CANCELLATION);
274276
if (terminated.compareAndSet(false, true)) {
275277
if (clientSession != null) {
276278
return chunksCollectionMono.flatMap(collection -> Mono.from(collection

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java

Lines changed: 105 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616

1717
package com.mongodb.reactivestreams.client;
1818

19-
import com.mongodb.ClusterFixture;
2019
import com.mongodb.MongoClientSettings;
2120
import com.mongodb.MongoCommandException;
2221
import com.mongodb.MongoNamespace;
2322
import com.mongodb.MongoOperationTimeoutException;
2423
import com.mongodb.ReadPreference;
2524
import com.mongodb.WriteConcern;
2625
import com.mongodb.client.AbstractClientSideOperationsTimeoutProseTest;
27-
import com.mongodb.client.model.CreateCollectionOptions;
2826
import com.mongodb.client.model.changestream.FullDocument;
2927
import com.mongodb.event.CommandFailedEvent;
3028
import com.mongodb.event.CommandStartedEvent;
@@ -43,6 +41,7 @@
4341
import org.junit.jupiter.api.Test;
4442
import reactor.core.publisher.Flux;
4543
import reactor.core.publisher.Hooks;
44+
import reactor.core.publisher.Mono;
4645
import reactor.test.StepVerifier;
4746

4847
import java.nio.ByteBuffer;
@@ -58,12 +57,16 @@
5857

5958
import static com.mongodb.ClusterFixture.TIMEOUT_DURATION;
6059
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet;
60+
import static com.mongodb.ClusterFixture.isStandalone;
6161
import static com.mongodb.ClusterFixture.serverVersionAtLeast;
6262
import static com.mongodb.ClusterFixture.sleep;
63+
import static com.mongodb.assertions.Assertions.assertTrue;
6364
import static java.util.Collections.singletonList;
65+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
6466
import static org.junit.jupiter.api.Assertions.assertEquals;
6567
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
6668
import static org.junit.jupiter.api.Assertions.assertNotNull;
69+
import static org.junit.jupiter.api.Assumptions.assumeFalse;
6770
import static org.junit.jupiter.api.Assumptions.assumeTrue;
6871

6972

@@ -104,7 +107,6 @@ protected boolean isAsync() {
104107
@Override
105108
public void testGridFSUploadViaOpenUploadStreamTimeout() {
106109
assumeTrue(serverVersionAtLeast(4, 4));
107-
long rtt = ClusterFixture.getPrimaryRTT();
108110

109111
//given
110112
collectionHelper.runAdminCommand("{"
@@ -113,12 +115,12 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
113115
+ " data: {"
114116
+ " failCommands: [\"insert\"],"
115117
+ " blockConnection: true,"
116-
+ " blockTimeMS: " + (rtt + 405)
118+
+ " blockTimeMS: " + 600
117119
+ " }"
118120
+ "}");
119121

120122
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
121-
.timeout(rtt + 400, TimeUnit.MILLISECONDS))) {
123+
.timeout(600, TimeUnit.MILLISECONDS))) {
122124
MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName());
123125
GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME);
124126

@@ -158,7 +160,6 @@ public void testGridFSUploadViaOpenUploadStreamTimeout() {
158160
@Override
159161
public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, InterruptedException, TimeoutException {
160162
assumeTrue(serverVersionAtLeast(4, 4));
161-
long rtt = ClusterFixture.getPrimaryRTT();
162163

163164
//given
164165
CompletableFuture<Throwable> droppedErrorFuture = new CompletableFuture<>();
@@ -170,12 +171,12 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
170171
+ " data: {"
171172
+ " failCommands: [\"delete\"],"
172173
+ " blockConnection: true,"
173-
+ " blockTimeMS: " + (rtt + 405)
174+
+ " blockTimeMS: " + 405
174175
+ " }"
175176
+ "}");
176177

177178
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
178-
.timeout(rtt + 400, TimeUnit.MILLISECONDS))) {
179+
.timeout(400, TimeUnit.MILLISECONDS))) {
179180
MongoDatabase database = client.getDatabase(gridFsFileNamespace.getDatabaseName());
180181
GridFSBucket gridFsBucket = createReaciveGridFsBucket(database, GRID_FS_BUCKET_NAME);
181182

@@ -198,12 +199,25 @@ public void testAbortingGridFsUploadStreamTimeout() throws ExecutionException, I
198199
//then
199200
Throwable droppedError = droppedErrorFuture.get(TIMEOUT_DURATION.toMillis(), TimeUnit.MILLISECONDS);
200201
Throwable commandError = droppedError.getCause();
201-
assertInstanceOf(MongoOperationTimeoutException.class, commandError);
202202

203203
CommandFailedEvent deleteFailedEvent = commandListener.getCommandFailedEvent("delete");
204204
assertNotNull(deleteFailedEvent);
205205

206-
assertEquals(commandError, commandListener.getCommandFailedEvent("delete").getThrowable());
206+
CommandStartedEvent deleteStartedEvent = commandListener.getCommandStartedEvent("delete");
207+
assertTrue(deleteStartedEvent.getCommand().containsKey("maxTimeMS"), "Expected delete command to have maxTimeMS");
208+
long deleteMaxTimeMS = deleteStartedEvent
209+
.getCommand()
210+
.get("maxTimeMS")
211+
.asNumber()
212+
.longValue();
213+
214+
assertTrue(deleteMaxTimeMS <= 420
215+
// some leeway for timing variations, when compression is used it is often less then 300.
216+
// Without it, it is more than 300.
217+
&& deleteMaxTimeMS >= 150,
218+
"Expected maxTimeMS for delete command to be between 150s and 420ms, " + "but was: " + deleteMaxTimeMS + "ms");
219+
assertEquals(commandError, deleteFailedEvent.getThrowable());
220+
207221
// When subscription is cancelled, we should not receive any more events.
208222
testSubscriber.assertNoTerminalEvent();
209223
}
@@ -219,9 +233,8 @@ public void testTimeoutMSAppliesToFullResumeAttemptInNextCall() {
219233
assumeTrue(isDiscoverableReplicaSet());
220234

221235
//given
222-
long rtt = ClusterFixture.getPrimaryRTT();
223236
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
224-
.timeout(rtt + 500, TimeUnit.MILLISECONDS))) {
237+
.timeout(500, TimeUnit.MILLISECONDS))) {
225238

226239
MongoNamespace namespace = generateNamespace();
227240
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
@@ -273,9 +286,8 @@ public void testTimeoutMSAppliedToInitialAggregate() {
273286
assumeTrue(isDiscoverableReplicaSet());
274287

275288
//given
276-
long rtt = ClusterFixture.getPrimaryRTT();
277289
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
278-
.timeout(rtt + 200, TimeUnit.MILLISECONDS))) {
290+
.timeout(200, TimeUnit.MILLISECONDS))) {
279291

280292
MongoNamespace namespace = generateNamespace();
281293
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
@@ -290,7 +302,7 @@ public void testTimeoutMSAppliedToInitialAggregate() {
290302
+ " data: {"
291303
+ " failCommands: [\"aggregate\" ],"
292304
+ " blockConnection: true,"
293-
+ " blockTimeMS: " + (rtt + 201)
305+
+ " blockTimeMS: " + 201
294306
+ " }"
295307
+ "}");
296308

@@ -321,13 +333,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
321333

322334
//given
323335
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
324-
collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
325336
sleep(2000);
326337

327-
328-
long rtt = ClusterFixture.getPrimaryRTT();
329338
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
330-
.timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
339+
.timeout(500, TimeUnit.MILLISECONDS))) {
331340

332341
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
333342
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
@@ -338,7 +347,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsNotSet() {
338347
+ " data: {"
339348
+ " failCommands: [\"getMore\", \"aggregate\"],"
340349
+ " blockConnection: true,"
341-
+ " blockTimeMS: " + (rtt + 200)
350+
+ " blockTimeMS: " + 200
342351
+ " }"
343352
+ "}");
344353

@@ -389,12 +398,10 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
389398

390399
//given
391400
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
392-
collectionHelper.create(namespace.getCollectionName(), new CreateCollectionOptions());
393401
sleep(2000);
394402

395-
long rtt = ClusterFixture.getPrimaryRTT();
396403
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
397-
.timeout(rtt + 300, TimeUnit.MILLISECONDS))) {
404+
.timeout(500, TimeUnit.MILLISECONDS))) {
398405

399406
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
400407
.getCollection(namespace.getCollectionName())
@@ -406,7 +413,7 @@ public void testTimeoutMsRefreshedForGetMoreWhenMaxAwaitTimeMsSet() {
406413
+ " data: {"
407414
+ " failCommands: [\"aggregate\", \"getMore\"],"
408415
+ " blockConnection: true,"
409-
+ " blockTimeMS: " + (rtt + 200)
416+
+ " blockTimeMS: " + 200
410417
+ " }"
411418
+ "}");
412419

@@ -449,9 +456,8 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
449456
assumeTrue(isDiscoverableReplicaSet());
450457

451458
//given
452-
long rtt = ClusterFixture.getPrimaryRTT();
453459
try (MongoClient client = createReactiveClient(getMongoClientSettingsBuilder()
454-
.timeout(rtt + 2500, TimeUnit.MILLISECONDS))) {
460+
.timeout(2500, TimeUnit.MILLISECONDS))) {
455461

456462
MongoCollection<Document> collection = client.getDatabase(namespace.getDatabaseName())
457463
.getCollection(namespace.getCollectionName()).withReadPreference(ReadPreference.primary());
@@ -468,7 +474,78 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt
468474
List<CommandStartedEvent> commandStartedEvents = commandListener.getCommandStartedEvents();
469475
assertCommandStartedEventsInOder(Arrays.asList("aggregate", "getMore", "getMore", "getMore", "killCursors"),
470476
commandStartedEvents);
471-
assertOnlyOneCommandTimeoutFailure("getMore");
477+
478+
}
479+
}
480+
481+
@DisplayName("9. End Session. The timeout specified via the MongoClient timeoutMS option")
482+
@Test
483+
@Override
484+
public void test9EndSessionClientTimeout() {
485+
assumeTrue(serverVersionAtLeast(4, 4));
486+
assumeFalse(isStandalone());
487+
488+
collectionHelper.runAdminCommand("{"
489+
+ " configureFailPoint: \"failCommand\","
490+
+ " mode: { times: 1 },"
491+
+ " data: {"
492+
+ " failCommands: [\"abortTransaction\"],"
493+
+ " blockConnection: true,"
494+
+ " blockTimeMS: " + 400
495+
+ " }"
496+
+ "}");
497+
498+
try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder().retryWrites(false)
499+
.timeout(300, TimeUnit.MILLISECONDS))) {
500+
MongoCollection<Document> collection = mongoClient.getDatabase(namespace.getDatabaseName())
501+
.getCollection(namespace.getCollectionName());
502+
503+
try (ClientSession session = Mono.from(mongoClient.startSession()).block()) {
504+
session.startTransaction();
505+
Mono.from(collection.insertOne(session, new Document("x", 1))).block();
506+
}
507+
508+
sleep(postSessionCloseSleep());
509+
CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction"));
510+
long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS);
511+
assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable());
512+
assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime);
513+
}
514+
}
515+
516+
@Test
517+
@DisplayName("9. End Session. The timeout specified via the ClientSession defaultTimeoutMS option")
518+
@Override
519+
public void test9EndSessionSessionTimeout() {
520+
assumeTrue(serverVersionAtLeast(4, 4));
521+
assumeFalse(isStandalone());
522+
523+
collectionHelper.runAdminCommand("{"
524+
+ " configureFailPoint: \"failCommand\","
525+
+ " mode: { times: 1 },"
526+
+ " data: {"
527+
+ " failCommands: [\"abortTransaction\"],"
528+
+ " blockConnection: true,"
529+
+ " blockTimeMS: " + 400
530+
+ " }"
531+
+ "}");
532+
533+
try (MongoClient mongoClient = createReactiveClient(getMongoClientSettingsBuilder())) {
534+
MongoCollection<Document> collection = mongoClient.getDatabase(namespace.getDatabaseName())
535+
.getCollection(namespace.getCollectionName());
536+
537+
try (ClientSession session = Mono.from(mongoClient.startSession(com.mongodb.ClientSessionOptions.builder()
538+
.defaultTimeout(300, TimeUnit.MILLISECONDS).build())).block()) {
539+
540+
session.startTransaction();
541+
Mono.from(collection.insertOne(session, new Document("x", 1))).block();
542+
}
543+
544+
sleep(postSessionCloseSleep());
545+
CommandFailedEvent abortTransactionEvent = assertDoesNotThrow(() -> commandListener.getCommandFailedEvent("abortTransaction"));
546+
long elapsedTime = abortTransactionEvent.getElapsedTime(TimeUnit.MILLISECONDS);
547+
assertInstanceOf(MongoOperationTimeoutException.class, abortTransactionEvent.getThrowable());
548+
assertTrue(elapsedTime <= 400, "Took too long to time out, elapsedMS: " + elapsedTime);
472549
}
473550
}
474551

@@ -512,6 +589,6 @@ public void tearDown() throws InterruptedException {
512589

513590
@Override
514591
protected int postSessionCloseSleep() {
515-
return 256;
592+
return 1000;
516593
}
517594
}

0 commit comments

Comments
 (0)