Kafka 分区机制
Kafka 通过分区实现高吞吐量和水平扩展,是分布式消息系统的核心设计。
一、分区基础概念
1. Topic 与 Partition
Topic: orders
│
├── Partition 0: [msg0, msg3, msg6, ...]
├── Partition 1: [msg1, msg4, msg7, ...]
└── Partition 2: [msg2, msg5, msg8, ...]- Topic:消息的逻辑分类
- Partition:消息的物理存储单元
- Offset:分区内消息的唯一标识,从 0 递增
2. 分区的作用
| 特性 | 说明 |
|---|---|
| 水平扩展 | 分区分布在多个 Broker 上 |
| 并行处理 | 多个消费者并行消费 |
| 容错 | 副本机制保证数据安全 |
| 顺序性 | 分区内消息有序 |
二、分区策略
1. 默认分区器
// 生产者发送消息
ProducerRecord<String, String> record =
new ProducerRecord<>("topic", key, value);
// 分区计算逻辑
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 1. 指定分区 → 使用指定分区
// 2. 有 key → hash(key) % partitionCount
// 3. 无 key → sticky partitioner(黏性分区器)
}2. 三种分区方式
| 方式 | 触发条件 | 特点 |
|---|---|---|
| 指定分区 | 构造时指定 partition | 精确控制 |
| Key 哈希 | 指定 key | 同一 key 到同一分区 |
| 轮询/黏性 | 无 key | 负载均衡 |
3. 自定义分区器
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int partitionCount = cluster.partitionCountForTopic(topic);
// 按订单类型分区
if (key instanceof String) {
String orderKey = (String) key;
if (orderKey.startsWith("VIP_")) {
return 0; // VIP 订单到分区 0
}
}
return Math.abs(key.hashCode()) % partitionCount;
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}三、消费者组与分区分配
1. 消费者组
Topic (3 个分区)
│
│ 消费者组 A
│ ├── Consumer-1: 消费 P0, P1
│ └── Consumer-2: 消费 P2
│
│ 消费者组 B
├── Consumer-3: 消费 P0
├── Consumer-4: 消费 P1
└── Consumer-5: 消费 P2特点:
- 同一消费者组内,每个分区只被一个消费者消费
- 不同消费者组独立消费,互不影响
2. 分区分配策略
Range 策略(默认):
分区: [P0, P1, P2, P3, P4, P5]
消费者: [C0, C1, C2]
分配结果:
C0: P0, P1 (2个)
C1: P2, P3 (2个)
C2: P4, P5 (2个)RoundRobin 策略:
分区: [P0, P1, P2, P3, P4, P5]
消费者: [C0, C1, C2]
分配结果:
C0: P0, P3
C1: P1, P4
C2: P2, P5Sticky 策略:
- 尽量保持原有分配
- 减少分区迁移
3. 消费者数量与分区数量关系
| 消费者数 | 分区数 | 结果 |
|---|---|---|
| 2 | 4 | 每个消费者 2 个分区 |
| 4 | 2 | 2 个消费者空闲 |
| 3 | 3 | 每个消费者 1 个分区 |
建议:消费者数量 ≤ 分区数量
四、分区副本机制
1. 副本类型
Partition 0:
┌─────────────────────────────────┐
│ Leader (Broker 1) │ ← 读写请求
│ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] │
└─────────────────────────────────┘
↑ 同步
┌─────────┴─────────┐
↓ ↓
┌───────────┐ ┌───────────┐
│ Follower │ │ Follower │
│ (Broker 2)│ │ (Broker 3)│
└───────────┘ └───────────┘- Leader:处理读写请求
- Follower:同步 Leader 数据,故障时参与选举
2. ISR(In-Sync Replicas)
// ISR 配置
min.insync.replicas=2 // 最小同步副本数
unclean.leader.election.enable=false // 禁止非 ISR 副本当选 LeaderISR 动态调整:
- Follower 同步延迟超过阈值 → 移出 ISR
- Follower 追上 Leader → 加入 ISR
3. ACK 配置
// 生产者 ACK 配置
props.put("acks", "all"); // 0, 1, all(-1)| acks | 说明 | 可靠性 | 性能 |
|---|---|---|---|
| 0 | 不等待确认 | 低 | 最高 |
| 1 | Leader 确认 | 中 | 高 |
| all | ISR 全部确认 | 高 | 较低 |
五、消息顺序性
1. 分区内有序
Partition 0: [msg1, msg2, msg3, msg4]
消费顺序: msg1 → msg2 → msg3 → msg42. 分区间无序
Partition 0: [msg1, msg3]
Partition 1: [msg2, msg4]
可能消费顺序: msg1 → msg2 → msg3 → msg4
或: msg2 → msg1 → msg4 → msg33. 保证全局顺序
方案一:单分区
// 所有消息发到同一分区
new ProducerRecord<>("topic", 0, key, value); // 指定分区 0方案二:相同 Key
// 相同 key 的消息到同一分区
new ProducerRecord<>("topic", "order-123", value); // key 相同六、分区重平衡(Rebalance)
1. 触发条件
- 消费者加入/离开消费者组
- 消费者心跳超时
- 分区数量变化
- Topic 变化
2. 重平衡过程
1. 消费者发送 JoinGroup 请求
2. Coordinator 选择 Leader 消费者
3. Leader 制定分区分配方案
4. 所有消费者收到分配结果
5. 开始消费3. 避免 Rebalance
// 心跳配置
session.timeout.ms=10000 // 会话超时时间
heartbeat.interval.ms=3000 // 心跳间隔
max.poll.interval.ms=300000 // 两次 poll 最大间隔
// 处理时间过长会导致 Rebalance
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record); // 单条处理时间不宜过长
}
}七、分区数量规划
1. 影响因素
| 因素 | 影响 |
|---|---|
| 吞吐量需求 | 分区数越多,并行度越高 |
| 消费者数量 | 消费者数 ≤ 分区数 |
| Broker 数量 | 分区应均匀分布 |
| 文件句柄 | 分区数过多增加开销 |
2. 计算公式
分区数 = max(
目标吞吐量 / 单个分区最大吞吐量,
目标吞吐量 / 单个消费者最大吞吐量
)示例:
- 目标吞吐量:100 MB/s
- 单分区写入:20 MB/s
- 单消费者消费:10 MB/s
分区数 = max(100/20, 100/10) = max(5, 10) = 103. 最佳实践
- 测试环境:3-10 个分区
- 生产环境:根据吞吐量计算
- 单个 Broker 分区数 < 2000
八、面试高频问题
Q1: Kafka 为什么快?
- 顺序写:磁盘顺序写比随机写快
- 零拷贝:sendfile 减少数据拷贝
- 页缓存:利用 OS 缓存
- 批量处理:消息批量发送和压缩
Q2: 如何保证消息不丢失?
生产者:
- acks=all
- retries=3
- enable.idempotence=true
Broker:
- replication.factor>=3
- min.insync.replicas>=2
- unclean.leader.election.enable=false
消费者:
- enable.auto.commit=false
- 手动提交 offsetQ3: 分区数越多越好吗?
不是。缺点:
- 文件句柄增多
- Leader 选举时间增加
- 可用性降低
Q4: 如何实现消息去重?
生产者幂等:
props.put("enable.idempotence", true); // 开启幂等消费者去重:
- 唯一 ID + 数据库/Redis
- 业务幂等设计
Q5: Kafka 如何实现延迟消息?
Kafka 本身不支持延迟消息,常见方案:
- 定时任务扫描
- 外部调度器
- 使用支持延迟的消息队列(如 RocketMQ、Pulsar)
九、最佳实践
1. 生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
props.put("compression.type", "lz4");
props.put("batch.size", 16384);
props.put("linger.ms", 5);2. 消费者配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 500);
props.put("session.timeout.ms", 10000);3. 监控指标
- 消息积压(Lag)
- 分区分布
- ISR 数量
- 生产/消费速率
更新时间:2026年3月16日