Appearance
CompletableFuture 是在使用线程池的时候经常使用到的一个类,在实际使用的时候,发现在汇总的时候会有一些不同的操作。
我们直接来看一个示例:
//并行调用
public Map<String, String> getBasicTranslationBatch(List<String> text) {
if (text == null || text.isEmpty()) {
return Map.of();
}
//对text进行去重处理
text = text.stream().distinct().collect(Collectors.toList());
int batchSize = 15;
List<CompletableFuture<Map<String, String>>> futures = new ArrayList<>();
for (int i = 0; i < text.size(); i += batchSize) {
int endIndex = Math.min(i + batchSize, text.size());
List<String> sublist = text.subList(i, endIndex);
CompletableFuture<Map<String, String>> future = CompletableFuture.supplyAsync(
() -> getBasicTranslationSafely(sublist),
findBasicTranslationExecutor
);
futures.add(future);
}
try {
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
allOfFuture.get();
} catch (Exception e) {
log.error("Error in getBasicTranslationBatch: {}", e.getMessage(), e);
throw new GenericException(GenericException.Code.FAIL, "请求外部接口失败");
}
return futures.stream()
.map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
// 如果有重复的源文本,保留第一个翻译结果
(existing, replacement) -> existing
));
}
@Bean("findBasicTranslationExecutor")
public Executor findBasicTranslationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 场景:调用外部接口
// 设置最大线程数
executor.setMaxPoolSize(25);
// 设置核心线程数
executor.setCorePoolSize(25);
// 配置队列大小
executor.setQueueCapacity(500);
// 拒绝策略-由调用线程(提交任务的线程)处理该任务
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("findBasicTranslationExecutor");
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
// 执行初始化
executor.initialize();
return TtlExecutors.getTtlExecutor(executor);
}解释一下上面 allOfFuture.get(); 这一行代码的动作
方式一
主线程合并结果(平衡方案)
allOfFuture.get():确保所有异步任务完成。主线程执行结果合并
这个动作是汇总异步操作返回的所有返回结果,除了上面这种写法之外,还有其他常见的两种
方式二
同步合并结果(轻量级)
- 合并操作是同步的,适合简单快速的聚合操作
public Map<String, String> getBasicTranslationBatch(List<String> text) {
// ... [批处理创建]
try {
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
return allOfFuture.thenApply(v -> // 同步结果合并
futures.stream()
.map(CompletableFuture::join)
.flatMap(map -> map.entrySet().stream())
.collect(Collectors.toMap(...))
).get(); // 阻塞获取最终结果
} catch (Exception e) {
// ... [异常处理]
}
}方式三
异步合并结果(重量级)
- 开启一个新的线程,去做汇总操作
- 聚合逻辑复杂耗时 建议使用这种方式
java
public List<ProductSkuSpecificationListResponse> findSkuSpecifications(...) {
// ... [批处理创建]
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(...);
CompletableFuture<List<T>> combinedResults = allOfFuture.thenCompose(v ->
CompletableFuture.supplyAsync(() -> { // 开启新线程合并结果
futures.stream()
.map(CompletableFuture::join)
.flatMap(list -> list.getList().stream())
.collect(Collectors.toList())
})
);
List<T> products = combinedResults.get(); // 阻塞等待
return products;
}看了一下相关的解释,看 平时使用上面的代码示例方式就好了;
方式一和方式二 的性能差距并不是很明显,可能一般会 在响应式编程链路中需要延续异步特性时使用
方式二;不过使用方式二的话,可能在线程池触发拒绝策略的时候会有问题(看具体拒绝策略)。