Skip to content

Commit f6cbb32

Browse files
strogiyotecAlmas Abdrazak
andauthored
* JAVA-5907 * JAVA-5907 use execute within executor service If we don't use the return value from executor then we should use `execute` instead of `submit` * format * revert error log for netty leak --------- Co-authored-by: Almas Abdrazak <abdrazak.almas@mongodb.com>
1 parent 16fecc4 commit f6cbb32

11 files changed

Lines changed: 43 additions & 34 deletions

File tree

bson/src/test/unit/util/ThreadTestHelpers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public static void executeAll(final Runnable... runnables) {
4141
CountDownLatch latch = new CountDownLatch(runnables.length);
4242
List<Throwable> failures = Collections.synchronizedList(new ArrayList<>());
4343
for (final Runnable runnable : runnables) {
44-
service.submit(() -> {
44+
service.execute(() -> {
4545
try {
4646
runnable.run();
4747
} catch (Throwable e) {

driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileDownloadBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void run() throws Exception {
9797
CountDownLatch latch = new CountDownLatch(50);
9898

9999
for (int i = 0; i < 50; i++) {
100-
gridFSService.submit(exportFile(latch, i));
100+
gridFSService.execute(exportFile(latch, i));
101101
}
102102

103103
latch.await(1, TimeUnit.MINUTES);
@@ -107,7 +107,7 @@ private Runnable exportFile(final CountDownLatch latch, final int fileId) {
107107
return () -> {
108108
UnsafeByteArrayOutputStream outputStream = new UnsafeByteArrayOutputStream(5242880);
109109
bucket.downloadToStream(GridFSMultiFileDownloadBenchmark.this.getFileName(fileId), outputStream);
110-
fileService.submit(() -> {
110+
fileService.execute(() -> {
111111
try {
112112
FileOutputStream fos = new FileOutputStream(new File(tempDirectory, String.format("%02d", fileId) + ".txt"));
113113
fos.write(outputStream.getByteArray());
@@ -124,7 +124,7 @@ private void importFiles() throws Exception {
124124
CountDownLatch latch = new CountDownLatch(50);
125125

126126
for (int i = 0; i < 50; i++) {
127-
fileService.submit(importFile(latch, i));
127+
fileService.execute(importFile(latch, i));
128128
}
129129

130130
latch.await(1, TimeUnit.MINUTES);

driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/GridFSMultiFileUploadBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void run() throws Exception {
7575
CountDownLatch latch = new CountDownLatch(50);
7676

7777
for (int i = 0; i < 50; i++) {
78-
fileService.submit(importFile(latch, i));
78+
fileService.execute(importFile(latch, i));
7979
}
8080

8181
latch.await(1, TimeUnit.MINUTES);

driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileExportBenchmark.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void run() throws Exception {
109109
CountDownLatch latch = new CountDownLatch(100);
110110

111111
for (int i = 0; i < 100; i++) {
112-
documentReadingService.submit(exportJsonFile(i, latch));
112+
documentReadingService.execute(exportJsonFile(i, latch));
113113
}
114114

115115
latch.await(1, TimeUnit.MINUTES);
@@ -125,7 +125,7 @@ private Runnable exportJsonFile(final int fileId, final CountDownLatch latch) {
125125
List<RawBsonDocument> documents = collection.find(new BsonDocument("fileId", new BsonInt32(fileId)))
126126
.batchSize(5000)
127127
.into(new ArrayList<>(5000));
128-
fileWritingService.submit(writeJsonFile(fileId, documents, latch));
128+
fileWritingService.execute(writeJsonFile(fileId, documents, latch));
129129
};
130130
}
131131

@@ -154,7 +154,7 @@ private void importJsonFiles() throws InterruptedException {
154154

155155
for (int i = 0; i < 100; i++) {
156156
int fileId = i;
157-
importService.submit(() -> {
157+
importService.execute(() -> {
158158
String resourcePath = "parallel/ldjson_multi/ldjson" + String.format("%03d", fileId) + ".txt";
159159
try (BufferedReader reader = new BufferedReader(readFromRelativePath(resourcePath), 1024 * 64)) {
160160
String json;

driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/MultiFileImportBenchmark.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void run() throws InterruptedException {
8686
CountDownLatch latch = new CountDownLatch(500);
8787

8888
for (int i = 0; i < 100; i++) {
89-
fileReadingService.submit(importJsonFile(latch, i));
89+
fileReadingService.execute(importJsonFile(latch, i));
9090
}
9191

9292
latch.await(1, TimeUnit.MINUTES);
@@ -104,7 +104,7 @@ private Runnable importJsonFile(final CountDownLatch latch, final int fileId) {
104104
documents.add(document);
105105
if (documents.size() == 1000) {
106106
List<RawBsonDocument> documentsToInsert = documents;
107-
documentWritingService.submit(() -> {
107+
documentWritingService.execute(() -> {
108108
collection.insertMany(documentsToInsert, new InsertManyOptions().ordered(false));
109109
latch.countDown();
110110
});

driver-benchmarks/src/main/com/mongodb/benchmark/framework/MongoCryptBenchmarkRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public List<MongocryptBecnhmarkResult> run() throws InterruptedException {
177177
for (int i = 0; i < threadCount; i++) {
178178
DecryptTask decryptTask = new DecryptTask(mongoCrypt, encrypted, NUM_SECS, doneSignal);
179179
decryptTasks.add(decryptTask);
180-
executorService.submit(decryptTask);
180+
executorService.execute(decryptTask);
181181
}
182182

183183
// Await completion of all tasks. Tasks are expected to complete shortly after NUM_SECS. Time out `await` if time exceeds 2 * NUM_SECS.

driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,7 @@ private boolean initUnlessClosed() {
13211321
boolean result = true;
13221322
if (state == State.NEW) {
13231323
worker = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
1324-
worker.submit(() -> runAndLogUncaught(this::workerRun));
1324+
worker.execute(() -> runAndLogUncaught(this::workerRun));
13251325
state = State.INITIALIZED;
13261326
} else if (state == State.CLOSED) {
13271327
result = false;

driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannel.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ public <A> void read(ByteBuffer dst, A attach, CompletionHandler<Integer, ? supe
9898
new ByteBufferSet(dst),
9999
0,
100100
TimeUnit.MILLISECONDS,
101-
c -> group.submit(() -> handler.completed((int) c, attach)),
102-
e -> group.submit(() -> handler.failed(e, attach)));
101+
c -> group.execute(() -> handler.completed((int) c, attach)),
102+
e -> group.execute(() -> handler.failed(e, attach)));
103103
}
104104

105105
@Override
@@ -119,8 +119,8 @@ public <A> void read(
119119
new ByteBufferSet(dst),
120120
timeout,
121121
unit,
122-
c -> group.submit(() -> handler.completed((int) c, attach)),
123-
e -> group.submit(() -> handler.failed(e, attach)));
122+
c -> group.execute(() -> handler.completed((int) c, attach)),
123+
e -> group.execute(() -> handler.failed(e, attach)));
124124
}
125125

126126
@Override
@@ -145,8 +145,8 @@ public <A> void read(
145145
bufferSet,
146146
timeout,
147147
unit,
148-
c -> group.submit(() -> handler.completed(c, attach)),
149-
e -> group.submit(() -> handler.failed(e, attach)));
148+
c -> group.execute(() -> handler.completed(c, attach)),
149+
e -> group.execute(() -> handler.failed(e, attach)));
150150
}
151151

152152
@Override
@@ -185,8 +185,8 @@ public <A> void write(ByteBuffer src, A attach, CompletionHandler<Integer, ? sup
185185
new ByteBufferSet(src),
186186
0,
187187
TimeUnit.MILLISECONDS,
188-
c -> group.submit(() -> handler.completed((int) c, attach)),
189-
e -> group.submit(() -> handler.failed(e, attach)));
188+
c -> group.execute(() -> handler.completed((int) c, attach)),
189+
e -> group.execute(() -> handler.failed(e, attach)));
190190
}
191191

192192
@Override
@@ -205,8 +205,8 @@ public <A> void write(
205205
new ByteBufferSet(src),
206206
timeout,
207207
unit,
208-
c -> group.submit(() -> handler.completed((int) c, attach)),
209-
e -> group.submit(() -> handler.failed(e, attach)));
208+
c -> group.execute(() -> handler.completed((int) c, attach)),
209+
e -> group.execute(() -> handler.failed(e, attach)));
210210
}
211211

212212
@Override
@@ -228,8 +228,8 @@ public <A> void write(
228228
bufferSet,
229229
timeout,
230230
unit,
231-
c -> group.submit(() -> handler.completed(c, attach)),
232-
e -> group.submit(() -> handler.failed(e, attach)));
231+
c -> group.execute(() -> handler.completed(c, attach)),
232+
e -> group.execute(() -> handler.failed(e, attach)));
233233
}
234234

235235
@Override
@@ -251,11 +251,11 @@ public Future<Integer> write(ByteBuffer src) {
251251
}
252252

253253
private <A> void completeWithZeroInt(A attach, CompletionHandler<Integer, ? super A> handler) {
254-
group.submit(() -> handler.completed(0, attach));
254+
group.execute(() -> handler.completed(0, attach));
255255
}
256256

257257
private <A> void completeWithZeroLong(A attach, CompletionHandler<Long, ? super A> handler) {
258-
group.submit(() -> handler.completed(0L, attach));
258+
group.execute(() -> handler.completed(0L, attach));
259259
}
260260

261261
/**

driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.concurrent.ConcurrentHashMap;
4444
import java.util.concurrent.ConcurrentLinkedQueue;
4545
import java.util.concurrent.CountDownLatch;
46+
import java.util.concurrent.Executor;
4647
import java.util.concurrent.ExecutorService;
4748
import java.util.concurrent.Future;
4849
import java.util.concurrent.LinkedBlockingQueue;
@@ -65,7 +66,7 @@
6566
* instance of this class is a singleton-like object that manages a thread pool that makes it
6667
* possible to run a group of asynchronous channels.
6768
*/
68-
public class AsynchronousTlsChannelGroup {
69+
public class AsynchronousTlsChannelGroup implements Executor {
6970

7071
private static final Logger LOGGER = Loggers.getLogger("connection.tls");
7172

@@ -224,8 +225,16 @@ public AsynchronousTlsChannelGroup(@Nullable final ExecutorService executorServi
224225
selectorThread.start();
225226
}
226227

227-
void submit(final Runnable r) {
228-
executor.submit(r);
228+
229+
@Override
230+
public void execute(final Runnable r) {
231+
executor.execute(() -> {
232+
try {
233+
r.run();
234+
} catch (Throwable t) {
235+
LOGGER.error(null, t);
236+
}
237+
});
229238
}
230239

231240
RegisteredSocket registerSocket(TlsChannel reader, SocketChannel socketChannel) {

driver-core/src/test/functional/com/mongodb/internal/connection/DefaultConnectionPoolTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public void shouldThrowOnTimeout() throws InterruptedException {
127127

128128
// when
129129
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
130-
cachedExecutor.submit(connectionGetter);
130+
cachedExecutor.execute(connectionGetter);
131131

132132
connectionGetter.getLatch().await();
133133

@@ -152,7 +152,7 @@ public void shouldNotUseMaxAwaitTimeMSWhenTimeoutMsIsSet() throws InterruptedExc
152152

153153
// when
154154
TimeoutTrackingConnectionGetter connectionGetter = new TimeoutTrackingConnectionGetter(provider, timeoutSettings);
155-
cachedExecutor.submit(connectionGetter);
155+
cachedExecutor.execute(connectionGetter);
156156

157157
sleep(70); // wait for more than maxWaitTimeMS but less than timeoutMs.
158158
internalConnection.close();

0 commit comments

Comments
 (0)