Skip to content

CompletableFuture方法使用

CompletableFuture 是在使用线程池的时候经常使用到的一个类,在实际使用的时候,发现在汇总的时候会有一些不同的操作。

我们直接来看一个示例:

//并行调用  
public Map<String, String> getBasicTranslationBatch(List<String> text) {  
    if (text == null || text.isEmpty()) {  
        return Map.of();  
    }  
  
    //对text进行去重处理  
    text = text.stream().distinct().collect(Collectors.toList());  
  
    int batchSize = 15;  
    List<CompletableFuture<Map<String, String>>> futures = new ArrayList<>();  
  
    for (int i = 0; i < text.size(); i += batchSize) {  
        int endIndex = Math.min(i + batchSize, text.size());  
        List<String> sublist = text.subList(i, endIndex);  
        CompletableFuture<Map<String, String>> future = CompletableFuture.supplyAsync(  
            () -> getBasicTranslationSafely(sublist),  
            findBasicTranslationExecutor  
        );  
        futures.add(future);  
    }  
  
    try {  
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));  
        allOfFuture.get();  
    } catch (Exception e) {  
        log.error("Error in getBasicTranslationBatch: {}", e.getMessage(), e);  
        throw new GenericException(GenericException.Code.FAIL, "请求外部接口失败");  
    }  
    return futures.stream()  
        .map(CompletableFuture::join)  
        .flatMap(map -> map.entrySet().stream())  
        .collect(Collectors.toMap(  
            Map.Entry::getKey,  
            Map.Entry::getValue,  
            // 如果有重复的源文本,保留第一个翻译结果  
            (existing, replacement) -> existing  
        ));  
}

@Bean("findBasicTranslationExecutor")  
public Executor findBasicTranslationExecutor() {  
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
    // 场景:调用外部接口  
    // 设置最大线程数  
    executor.setMaxPoolSize(25);  
    // 设置核心线程数  
    executor.setCorePoolSize(25);  
    // 配置队列大小  
    executor.setQueueCapacity(500);  
    // 拒绝策略-由调用线程(提交任务的线程)处理该任务  
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
    // 设置线程活跃时间(秒)  
    executor.setKeepAliveSeconds(60);  
    // 设置默认线程名称  
    executor.setThreadNamePrefix("findBasicTranslationExecutor");  
    // 等待所有任务结束后再关闭线程池  
    executor.setWaitForTasksToCompleteOnShutdown(true);  
    // 执行初始化  
    executor.initialize();  
  
    return TtlExecutors.getTtlExecutor(executor);  
}

解释一下上面 allOfFuture.get(); 这一行代码的动作

方式一

主线程合并结果(平衡方案)

  • allOfFuture.get():确保所有异步任务完成。主线程执行结果合并

这个动作是汇总异步操作返回的所有返回结果,除了上面这种写法之外,还有其他常见的两种

方式二

同步合并结果(轻量级)

  • 合并操作是同步的,适合简单快速的聚合操作
public Map<String, String> getBasicTranslationBatch(List<String> text) {
    // ... [批处理创建]
    try {
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        return allOfFuture.thenApply(v ->  // 同步结果合并
            futures.stream()
                .map(CompletableFuture::join)
                .flatMap(map -> map.entrySet().stream())
                .collect(Collectors.toMap(...))
        ).get(); // 阻塞获取最终结果
    } catch (Exception e) {
        // ... [异常处理]
    }
}

方式三

异步合并结果(重量级)

  • 开启一个新的线程,去做汇总操作
  • 聚合逻辑复杂耗时 建议使用这种方式
java
public List<ProductSkuSpecificationListResponse> findSkuSpecifications(...) {
    // ... [批处理创建]
    CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(...);
    CompletableFuture<List<T>> combinedResults = allOfFuture.thenCompose(v -> 
        CompletableFuture.supplyAsync(() -> {  // 开启新线程合并结果
            futures.stream()
                .map(CompletableFuture::join)
                .flatMap(list -> list.getList().stream())
                .collect(Collectors.toList())
        })
    );
    List<T> products = combinedResults.get(); // 阻塞等待
    return products;
}

看了一下相关的解释,看 平时使用上面的代码示例方式就好了;

方式一和方式二 的性能差距并不是很明显,可能一般会 在响应式编程链路中需要延续异步特性时使用

方式二;不过使用方式二的话,可能在线程池触发拒绝策略的时候会有问题(看具体拒绝策略)。