Skip to content

Commit dbbd2a0

Browse files
committed
BAEL-6579: completable future's thread pool
1 parent dad0e8c commit dbbd2a0

2 files changed

Lines changed: 110 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.baeldung.concurrent.completablefuture.threadpool;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.Executor;
5+
import java.util.concurrent.Executors;
6+
import java.util.function.Supplier;
7+
8+
public class CustomCompletableFuture<T> extends CompletableFuture<T> {
9+
private static final Executor executor = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "Custom-Single-Thread"));
10+
11+
public static <TYPE> CustomCompletableFuture<TYPE> supplyAsync(Supplier<TYPE> supplier) {
12+
CustomCompletableFuture<TYPE> future = new CustomCompletableFuture<>();
13+
executor.execute(() -> {
14+
try {
15+
future.complete(supplier.get());
16+
} catch (Exception ex) {
17+
future.completeExceptionally(ex);
18+
}
19+
});
20+
return future;
21+
}
22+
23+
@Override
24+
public Executor defaultExecutor() {
25+
return executor;
26+
}
27+
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package com.baeldung.concurrent.completablefuture.threadpool;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.Executor;
7+
import java.util.concurrent.Executors;
8+
9+
import org.junit.jupiter.api.Test;
10+
11+
public class CompletableFutureThreadPoolUnitTest {
12+
13+
@Test
14+
void whenUsingNonAsync_thenUsesMainThread() {
15+
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
16+
17+
CompletableFuture<Integer> nameLength = name.thenApply(value -> {
18+
printCurrentThread();
19+
return value.length();
20+
});
21+
22+
assertThat(nameLength).isCompletedWithValue(8);
23+
}
24+
25+
@Test
26+
void whenUsingNonAsync_thenUsesCallersThread() throws InterruptedException {
27+
Runnable test = () -> {
28+
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
29+
30+
CompletableFuture<Integer> nameLength = name.thenApply(value -> {
31+
printCurrentThread();
32+
return value.length();
33+
});
34+
35+
assertThat(nameLength).isCompletedWithValue(8);
36+
};
37+
38+
new Thread(test, "test-thread").start();
39+
Thread.sleep(100l);
40+
}
41+
42+
@Test
43+
void whenUsingAsync_thenUsesCommonPool() {
44+
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
45+
46+
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
47+
printCurrentThread();
48+
return value.length();
49+
});
50+
51+
assertThat(nameLength).isCompletedWithValue(8);
52+
}
53+
54+
@Test
55+
void whenUsingAsync_thenUsesCustomExecutor() {
56+
Executor testExecutor = Executors.newFixedThreadPool(5);
57+
CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");
58+
59+
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
60+
printCurrentThread();
61+
return value.length();
62+
}, testExecutor);
63+
64+
assertThat(nameLength).isCompletedWithValue(8);
65+
}
66+
67+
@Test
68+
void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() {
69+
CompletableFuture<String> name = CustomCompletableFuture.supplyAsync(() -> "Baeldung");
70+
71+
CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
72+
printCurrentThread();
73+
return value.length();
74+
});
75+
76+
assertThat(nameLength).isCompletedWithValue(8);
77+
}
78+
79+
private static void printCurrentThread() {
80+
System.out.println(Thread.currentThread().getName());
81+
}
82+
}

0 commit comments

Comments
 (0)