知识模块
☕ Java 知识模块
三、Java 并发编程
CompletableFuture

CompletableFuture 异步编程

面试高频考点:CompletableFuture 是 Java 8 引入的异步编程工具,是 Future 的增强版,支持函数式编程、链式调用和异步任务编排。

核心概念

什么是 CompletableFuture

CompletableFuture 是 Java 8 引入的一个实现了 Future 和 CompletionStage 接口的类,它提供了更强大的异步编程能力:

  • 函数式编程:支持 Lambda 表达式
  • 链式调用:可以串联多个异步操作
  • 异常处理:提供了完善的异常处理机制
  • 任务组合:可以组合多个异步任务

CompletableFuture 与 Future 对比

特性FutureCompletableFuture
获取结果get() 阻塞支持回调,非阻塞
异常处理需要捕获 ExecutionException支持链式异常处理
任务编排不支持支持链式调用和组合
手动完成不支持支持 complete() 方法
函数式风格不支持完全支持

创建异步任务

runAsync - 无返回值

// 使用默认线程池(ForkJoinPool)
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
    System.out.println("执行异步任务,无返回值");
});
 
// 指定自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
    System.out.println("使用自定义线程池执行");
}, executor);

supplyAsync - 有返回值

// 使用默认线程池
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时操作
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "异步任务结果";
});
 
// 指定自定义线程池
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    return 100 + 200;
}, executor);
 
// 获取结果
String result = future1.get();  // 阻塞等待
System.out.println(result);

回调方法

thenApply - 转换结果

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).thenApply(result -> {
    return result.toUpperCase();  // 转换结果
}).thenApply(result -> {
    return result + " WORLD";     // 再次转换
});
 
System.out.println(future.get());  // 输出: HELLO WORLD

thenAccept - 消费结果

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    return "hello world";
}).thenAccept(result -> {
    // 消费结果,无返回值
    System.out.println("处理结果: " + result);
});
 
future.get();  // 等待任务完成

thenRun - 任务完成后执行

CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).thenRun(() -> {
    // 不关心上一个任务的结果
    System.out.println("任务完成后的后续操作");
});

异步回调方法

// 同步执行(在当前线程)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "hello")
    .thenApply(r -> r.toUpperCase());
 
// 异步执行(在新线程)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "hello")
    .thenApplyAsync(r -> r.toUpperCase());
 
// 异步执行(指定线程池)
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "hello")
    .thenApplyAsync(r -> r.toUpperCase(), executor);

组合多个任务

thenCompose - 扁平化组合

// thenCompose 用于两个有依赖关系的任务
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).thenCompose(result -> {
    // 第二个任务依赖第一个任务的结果
    return CompletableFuture.supplyAsync(() -> {
        return result + " world";
    });
});
 
System.out.println(future.get());  // 输出: hello world

thenCombine - 独立任务组合

// thenCombine 用于两个独立任务的组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return 100;
});
 
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return 200;
});
 
// 两个任务并行执行,然后合并结果
CompletableFuture<Integer> result = future1.thenCombine(future2, (r1, r2) -> {
    return r1 + r2;
});
 
System.out.println(result.get());  // 输出: 300

thenCombine vs thenCompose 区别

// thenCombine: 两个独立任务,各自执行后合并结果
future1.thenCombine(future2, (r1, r2) -> combine(r1, r2));
 
// thenCompose: 第二个任务依赖第一个任务的结果
future1.thenCompose(r1 -> createFuture2(r1));

处理异常

exceptionally - 异常恢复

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    if (true) {
        throw new RuntimeException("模拟异常");
    }
    return "正常结果";
}).exceptionally(ex -> {
    System.out.println("捕获异常: " + ex.getMessage());
    return "默认值";  // 返回默认值
});
 
System.out.println(future.get());  // 输出: 默认值

handle - 统一处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    int result = 10 / 0;  // 模拟异常
    return "正常结果";
}).handle((result, ex) -> {
    if (ex != null) {
        System.out.println("发生异常: " + ex.getMessage());
        return "错误恢复值";
    }
    return result + " - 处理完成";
});
 
System.out.println(future.get());  // 输出: 错误恢复值

whenComplete - 完成时处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return "hello";
}).whenComplete((result, ex) -> {
    // 类似于 finally,不能改变结果
    if (ex == null) {
        System.out.println("任务完成,结果: " + result);
    } else {
        System.out.println("任务异常: " + ex.getMessage());
    }
});

异常处理方法对比

方法作用是否可改变结果
exceptionally仅处理异常是,返回默认值
handle处理正常和异常
whenComplete类似 finally

多任务组合

allOf - 等待所有任务完成

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "任务1";
});
 
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "任务2";
});
 
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    sleep(1500);
    return "任务3";
});
 
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
 
// 阻塞等待所有任务完成
allFutures.get();
 
// 获取各个任务的结果
System.out.println(future1.get());  // 任务1
System.out.println(future2.get());  // 任务2
System.out.println(future3.get());  // 任务3

anyOf - 任一任务完成即返回

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(3000);
    return "慢任务";
});
 
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "快任务";
});
 
// 任一任务完成即返回
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);
 
System.out.println(anyFuture.get());  // 输出: 快任务

获取所有任务结果列表

List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "结果1"),
    CompletableFuture.supplyAsync(() -> "结果2"),
    CompletableFuture.supplyAsync(() -> "结果3")
);
 
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);
 
// 等待所有完成
allFutures.join();
 
// 收集所有结果
List<String> results = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());
 
System.out.println(results);  // [结果1, 结果2, 结果3]

线程池配置

默认线程池问题

// 默认使用 ForkJoinPool.commonPool()
// 问题:共享线程池,任务过多会相互影响
CompletableFuture.supplyAsync(() -> {
    return "使用默认线程池";
});

自定义线程池配置

// IO 密集型任务线程池
ThreadPoolExecutor ioExecutor = new ThreadPoolExecutor(
    10,  // 核心线程数
    50,  // 最大线程数
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    new ThreadFactoryBuilder().setNameFormat("io-pool-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);
 
// CPU 密集型任务线程池
ThreadPoolExecutor cpuExecutor = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors() * 2,
    30L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(50),
    new ThreadFactoryBuilder().setNameFormat("cpu-pool-%d").build()
);
 
// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 执行任务
    return "结果";
}, ioExecutor);

根据任务类型选择线程池

// IO 密集型:线程数 = CPU核心数 * (1 + 等待时间/计算时间)
// 例如:数据库查询、网络请求
 
// CPU 密集型:线程数 = CPU核心数 + 1
// 例如:复杂计算、图像处理

实际应用示例

并行调用多个服务

public class UserService {
    
    // 获取用户基本信息
    public CompletableFuture<User> getUserInfo(Long userId) {
        return CompletableFuture.supplyAsync(() -> {
            return userClient.getUser(userId);
        }, ioExecutor);
    }
    
    // 获取用户订单
    public CompletableFuture<List<Order>> getUserOrders(Long userId) {
        return CompletableFuture.supplyAsync(() -> {
            return orderClient.getOrders(userId);
        }, ioExecutor);
    }
    
    // 获取用户积分
    public CompletableFuture<Integer> getUserPoints(Long userId) {
        return CompletableFuture.supplyAsync(() -> {
            return pointsClient.getPoints(userId);
        }, ioExecutor);
    }
    
    // 组合调用
    public CompletableFuture<UserDetailVO> getUserDetail(Long userId) {
        CompletableFuture<User> userFuture = getUserInfo(userId);
        CompletableFuture<List<Order>> ordersFuture = getUserOrders(userId);
        CompletableFuture<Integer> pointsFuture = getUserPoints(userId);
        
        return CompletableFuture.allOf(userFuture, ordersFuture, pointsFuture)
            .thenApply(v -> {
                UserDetailVO vo = new UserDetailVO();
                vo.setUser(userFuture.join());
                vo.setOrders(ordersFuture.join());
                vo.setPoints(pointsFuture.join());
                return vo;
            });
    }
}

超时处理

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    sleep(3000);
    return "正常结果";
});
 
// Java 9+ 支持 orTimeout
try {
    String result = future.orTimeout(2, TimeUnit.SECONDS)
        .exceptionally(ex -> "超时默认值")
        .get();
    System.out.println(result);
} catch (Exception e) {
    e.printStackTrace();
}
 
// Java 8 兼容方案:使用 completeOnTimeout
// future.completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);

面试要点

问题1

Q: CompletableFuture 与 Future 有什么区别? A:

  1. 异步回调:Future 只能通过 get() 阻塞获取结果;CompletableFuture 支持回调,非阻塞
  2. 任务编排:Future 不支持任务编排;CompletableFuture 支持链式调用和组合
  3. 异常处理:Future 需要捕获 ExecutionException;CompletableFuture 支持链式异常处理
  4. 手动完成:CompletableFuture 可以手动完成(complete 方法)
  5. 函数式风格:CompletableFuture 完全支持 Lambda 和函数式编程

问题2

Q: thenApply 和 thenCompose 有什么区别? A:

  • thenApply:同步转换结果,相当于 map 操作,返回新的结果
  • thenCompose:扁平化组合,相当于 flatMap 操作,返回新的 CompletableFuture
  • thenApply 适用于简单的结果转换
  • thenCompose 适用于需要链式调用另一个异步任务的场景
// thenApply: 返回转换后的值
future.thenApply(r -> r.toUpperCase())
 
// thenCompose: 返回新的 CompletableFuture
future.thenCompose(r -> CompletableFuture.supplyAsync(() -> process(r)))

问题3

Q: 如何处理 CompletableFuture 中的异常? A: 有三种主要方式:

  1. exceptionally:专门处理异常,返回默认值恢复
  2. handle:统一处理正常和异常情况,可以改变结果
  3. whenComplete:类似 finally,不能改变结果,用于日志记录等

问题4

Q: allOf 和 anyOf 有什么区别? A:

  • allOf:等待所有任务完成才继续,适用于需要汇总多个结果的场景
  • anyOf:任一任务完成即继续,适用于竞速场景,取最快的结果

问题5

Q: CompletableFuture 默认使用什么线程池?有什么问题? A: 默认使用 ForkJoinPool.commonPool(),存在以下问题:

  1. 共享线程池,任务过多会相互阻塞
  2. 线程数默认为 CPU 核心数 - 1,可能不够用
  3. IO 密集型任务会导致线程饥饿

解决方案:根据任务类型自定义线程池

问题6

Q: 如何实现 CompletableFuture 的超时控制? A:

// Java 9+ 方式
future.orTimeout(2, TimeUnit.SECONDS)
    .exceptionally(ex -> "超时默认值");
 
// 或使用 completeOnTimeout
future.completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
 
// Java 8 兼容方式:配合 ScheduledExecutorService
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.schedule(() -> future.complete("超时默认值"), 2, TimeUnit.SECONDS);

总结

CompletableFuture 是 Java 异步编程的核心工具:

场景推荐方法
无返回值异步任务runAsync
有返回值异步任务supplyAsync
结果转换thenApply
结果消费thenAccept
依赖任务组合thenCompose
独立任务组合thenCombine
异常恢复exceptionally
统一处理handle
等待所有allOf
任一完成anyOf

最佳实践

  1. 始终指定自定义线程池
  2. 合理设置超时时间
  3. 正确处理异常
  4. 根据任务类型选择合适的组合方法

掌握 CompletableFuture 是 Java 并发编程进阶的关键一步!