CompletableFuture 异步编程
面试高频考点:CompletableFuture 是 Java 8 引入的异步编程工具,是 Future 的增强版,支持函数式编程、链式调用和异步任务编排。
核心概念
什么是 CompletableFuture
CompletableFuture 是 Java 8 引入的一个实现了 Future 和 CompletionStage 接口的类,它提供了更强大的异步编程能力:
- 函数式编程:支持 Lambda 表达式
- 链式调用:可以串联多个异步操作
- 异常处理:提供了完善的异常处理机制
- 任务组合:可以组合多个异步任务
CompletableFuture 与 Future 对比
| 特性 | Future | CompletableFuture |
|---|---|---|
| 获取结果 | get() 阻塞 | 支持回调,非阻塞 |
| 异常处理 | 需要捕获 ExecutionException | 支持链式异常处理 |
| 任务编排 | 不支持 | 支持链式调用和组合 |
| 手动完成 | 不支持 | 支持 complete() 方法 |
| 函数式风格 | 不支持 | 完全支持 |
创建异步任务
runAsync - 无返回值
// 使用默认线程池(ForkJoinPool)
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
System.out.println("执行异步任务,无返回值");
});
// 指定自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("使用自定义线程池执行");
}, executor);supplyAsync - 有返回值
// 使用默认线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
// 模拟耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "异步任务结果";
});
// 指定自定义线程池
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
return 100 + 200;
}, executor);
// 获取结果
String result = future1.get(); // 阻塞等待
System.out.println(result);回调方法
thenApply - 转换结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).thenApply(result -> {
return result.toUpperCase(); // 转换结果
}).thenApply(result -> {
return result + " WORLD"; // 再次转换
});
System.out.println(future.get()); // 输出: HELLO WORLDthenAccept - 消费结果
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "hello world";
}).thenAccept(result -> {
// 消费结果,无返回值
System.out.println("处理结果: " + result);
});
future.get(); // 等待任务完成thenRun - 任务完成后执行
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).thenRun(() -> {
// 不关心上一个任务的结果
System.out.println("任务完成后的后续操作");
});异步回调方法
// 同步执行(在当前线程)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello")
.thenApply(r -> r.toUpperCase());
// 异步执行(在新线程)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello")
.thenApplyAsync(r -> r.toUpperCase());
// 异步执行(指定线程池)
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "hello")
.thenApplyAsync(r -> r.toUpperCase(), executor);组合多个任务
thenCompose - 扁平化组合
// thenCompose 用于两个有依赖关系的任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).thenCompose(result -> {
// 第二个任务依赖第一个任务的结果
return CompletableFuture.supplyAsync(() -> {
return result + " world";
});
});
System.out.println(future.get()); // 输出: hello worldthenCombine - 独立任务组合
// thenCombine 用于两个独立任务的组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return 100;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return 200;
});
// 两个任务并行执行,然后合并结果
CompletableFuture<Integer> result = future1.thenCombine(future2, (r1, r2) -> {
return r1 + r2;
});
System.out.println(result.get()); // 输出: 300thenCombine vs thenCompose 区别
// thenCombine: 两个独立任务,各自执行后合并结果
future1.thenCombine(future2, (r1, r2) -> combine(r1, r2));
// thenCompose: 第二个任务依赖第一个任务的结果
future1.thenCompose(r1 -> createFuture2(r1));处理异常
exceptionally - 异常恢复
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("模拟异常");
}
return "正常结果";
}).exceptionally(ex -> {
System.out.println("捕获异常: " + ex.getMessage());
return "默认值"; // 返回默认值
});
System.out.println(future.get()); // 输出: 默认值handle - 统一处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int result = 10 / 0; // 模拟异常
return "正常结果";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("发生异常: " + ex.getMessage());
return "错误恢复值";
}
return result + " - 处理完成";
});
System.out.println(future.get()); // 输出: 错误恢复值whenComplete - 完成时处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).whenComplete((result, ex) -> {
// 类似于 finally,不能改变结果
if (ex == null) {
System.out.println("任务完成,结果: " + result);
} else {
System.out.println("任务异常: " + ex.getMessage());
}
});异常处理方法对比
| 方法 | 作用 | 是否可改变结果 |
|---|---|---|
| exceptionally | 仅处理异常 | 是,返回默认值 |
| handle | 处理正常和异常 | 是 |
| whenComplete | 类似 finally | 否 |
多任务组合
allOf - 等待所有任务完成
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "任务1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "任务2";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "任务3";
});
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
// 阻塞等待所有任务完成
allFutures.get();
// 获取各个任务的结果
System.out.println(future1.get()); // 任务1
System.out.println(future2.get()); // 任务2
System.out.println(future3.get()); // 任务3anyOf - 任一任务完成即返回
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "慢任务";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "快任务";
});
// 任一任务完成即返回
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
System.out.println(anyFuture.get()); // 输出: 快任务获取所有任务结果列表
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(() -> "结果1"),
CompletableFuture.supplyAsync(() -> "结果2"),
CompletableFuture.supplyAsync(() -> "结果3")
);
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 等待所有完成
allFutures.join();
// 收集所有结果
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(results); // [结果1, 结果2, 结果3]线程池配置
默认线程池问题
// 默认使用 ForkJoinPool.commonPool()
// 问题:共享线程池,任务过多会相互影响
CompletableFuture.supplyAsync(() -> {
return "使用默认线程池";
});自定义线程池配置
// IO 密集型任务线程池
ThreadPoolExecutor ioExecutor = new ThreadPoolExecutor(
10, // 核心线程数
50, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// CPU 密集型任务线程池
ThreadPoolExecutor cpuExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50),
new ThreadFactoryBuilder().setNameFormat("cpu-pool-%d").build()
);
// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 执行任务
return "结果";
}, ioExecutor);根据任务类型选择线程池
// IO 密集型:线程数 = CPU核心数 * (1 + 等待时间/计算时间)
// 例如:数据库查询、网络请求
// CPU 密集型:线程数 = CPU核心数 + 1
// 例如:复杂计算、图像处理实际应用示例
并行调用多个服务
public class UserService {
// 获取用户基本信息
public CompletableFuture<User> getUserInfo(Long userId) {
return CompletableFuture.supplyAsync(() -> {
return userClient.getUser(userId);
}, ioExecutor);
}
// 获取用户订单
public CompletableFuture<List<Order>> getUserOrders(Long userId) {
return CompletableFuture.supplyAsync(() -> {
return orderClient.getOrders(userId);
}, ioExecutor);
}
// 获取用户积分
public CompletableFuture<Integer> getUserPoints(Long userId) {
return CompletableFuture.supplyAsync(() -> {
return pointsClient.getPoints(userId);
}, ioExecutor);
}
// 组合调用
public CompletableFuture<UserDetailVO> getUserDetail(Long userId) {
CompletableFuture<User> userFuture = getUserInfo(userId);
CompletableFuture<List<Order>> ordersFuture = getUserOrders(userId);
CompletableFuture<Integer> pointsFuture = getUserPoints(userId);
return CompletableFuture.allOf(userFuture, ordersFuture, pointsFuture)
.thenApply(v -> {
UserDetailVO vo = new UserDetailVO();
vo.setUser(userFuture.join());
vo.setOrders(ordersFuture.join());
vo.setPoints(pointsFuture.join());
return vo;
});
}
}超时处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
sleep(3000);
return "正常结果";
});
// Java 9+ 支持 orTimeout
try {
String result = future.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> "超时默认值")
.get();
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}
// Java 8 兼容方案:使用 completeOnTimeout
// future.completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);面试要点
问题1
Q: CompletableFuture 与 Future 有什么区别? A:
- 异步回调:Future 只能通过 get() 阻塞获取结果;CompletableFuture 支持回调,非阻塞
- 任务编排:Future 不支持任务编排;CompletableFuture 支持链式调用和组合
- 异常处理:Future 需要捕获 ExecutionException;CompletableFuture 支持链式异常处理
- 手动完成:CompletableFuture 可以手动完成(complete 方法)
- 函数式风格:CompletableFuture 完全支持 Lambda 和函数式编程
问题2
Q: thenApply 和 thenCompose 有什么区别? A:
- thenApply:同步转换结果,相当于 map 操作,返回新的结果
- thenCompose:扁平化组合,相当于 flatMap 操作,返回新的 CompletableFuture
- thenApply 适用于简单的结果转换
- thenCompose 适用于需要链式调用另一个异步任务的场景
// thenApply: 返回转换后的值
future.thenApply(r -> r.toUpperCase())
// thenCompose: 返回新的 CompletableFuture
future.thenCompose(r -> CompletableFuture.supplyAsync(() -> process(r)))问题3
Q: 如何处理 CompletableFuture 中的异常? A: 有三种主要方式:
- exceptionally:专门处理异常,返回默认值恢复
- handle:统一处理正常和异常情况,可以改变结果
- whenComplete:类似 finally,不能改变结果,用于日志记录等
问题4
Q: allOf 和 anyOf 有什么区别? A:
- allOf:等待所有任务完成才继续,适用于需要汇总多个结果的场景
- anyOf:任一任务完成即继续,适用于竞速场景,取最快的结果
问题5
Q: CompletableFuture 默认使用什么线程池?有什么问题? A: 默认使用 ForkJoinPool.commonPool(),存在以下问题:
- 共享线程池,任务过多会相互阻塞
- 线程数默认为 CPU 核心数 - 1,可能不够用
- IO 密集型任务会导致线程饥饿
解决方案:根据任务类型自定义线程池
问题6
Q: 如何实现 CompletableFuture 的超时控制? A:
// Java 9+ 方式
future.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> "超时默认值");
// 或使用 completeOnTimeout
future.completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
// Java 8 兼容方式:配合 ScheduledExecutorService
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> future.complete("超时默认值"), 2, TimeUnit.SECONDS);总结
CompletableFuture 是 Java 异步编程的核心工具:
| 场景 | 推荐方法 |
|---|---|
| 无返回值异步任务 | runAsync |
| 有返回值异步任务 | supplyAsync |
| 结果转换 | thenApply |
| 结果消费 | thenAccept |
| 依赖任务组合 | thenCompose |
| 独立任务组合 | thenCombine |
| 异常恢复 | exceptionally |
| 统一处理 | handle |
| 等待所有 | allOf |
| 任一完成 | anyOf |
最佳实践:
- 始终指定自定义线程池
- 合理设置超时时间
- 正确处理异常
- 根据任务类型选择合适的组合方法
掌握 CompletableFuture 是 Java 并发编程进阶的关键一步!