知识模块
☕ Java 知识模块
十、中间件
Kafka 分区

Kafka 分区机制

Kafka 通过分区实现高吞吐量和水平扩展,是分布式消息系统的核心设计。

一、分区基础概念

1. Topic 与 Partition

Topic: orders

    ├── Partition 0: [msg0, msg3, msg6, ...]
    ├── Partition 1: [msg1, msg4, msg7, ...]
    └── Partition 2: [msg2, msg5, msg8, ...]
  • Topic:消息的逻辑分类
  • Partition:消息的物理存储单元
  • Offset:分区内消息的唯一标识,从 0 递增

2. 分区的作用

特性说明
水平扩展分区分布在多个 Broker 上
并行处理多个消费者并行消费
容错副本机制保证数据安全
顺序性分区内消息有序

二、分区策略

1. 默认分区器

// 生产者发送消息
ProducerRecord<String, String> record = 
    new ProducerRecord<>("topic", key, value);
 
// 分区计算逻辑
public int partition(String topic, Object key, byte[] keyBytes, 
                     Object value, byte[] valueBytes, Cluster cluster) {
    // 1. 指定分区 → 使用指定分区
    // 2. 有 key → hash(key) % partitionCount
    // 3. 无 key → sticky partitioner(黏性分区器)
}

2. 三种分区方式

方式触发条件特点
指定分区构造时指定 partition精确控制
Key 哈希指定 key同一 key 到同一分区
轮询/黏性无 key负载均衡

3. 自定义分区器

public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int partitionCount = cluster.partitionCountForTopic(topic);
        
        // 按订单类型分区
        if (key instanceof String) {
            String orderKey = (String) key;
            if (orderKey.startsWith("VIP_")) {
                return 0;  // VIP 订单到分区 0
            }
        }
        
        return Math.abs(key.hashCode()) % partitionCount;
    }
    
    @Override
    public void configure(Map<String, ?> configs) {}
    
    @Override
    public void close() {}
}

三、消费者组与分区分配

1. 消费者组

Topic (3 个分区)

    │    消费者组 A
    │    ├── Consumer-1: 消费 P0, P1
    │    └── Consumer-2: 消费 P2

    │    消费者组 B
    ├── Consumer-3: 消费 P0
    ├── Consumer-4: 消费 P1
    └── Consumer-5: 消费 P2

特点

  • 同一消费者组内,每个分区只被一个消费者消费
  • 不同消费者组独立消费,互不影响

2. 分区分配策略

Range 策略(默认)

分区: [P0, P1, P2, P3, P4, P5]
消费者: [C0, C1, C2]

分配结果:
C0: P0, P1  (2个)
C1: P2, P3  (2个)
C2: P4, P5  (2个)

RoundRobin 策略

分区: [P0, P1, P2, P3, P4, P5]
消费者: [C0, C1, C2]

分配结果:
C0: P0, P3
C1: P1, P4
C2: P2, P5

Sticky 策略

  • 尽量保持原有分配
  • 减少分区迁移

3. 消费者数量与分区数量关系

消费者数分区数结果
24每个消费者 2 个分区
422 个消费者空闲
33每个消费者 1 个分区

建议:消费者数量 ≤ 分区数量

四、分区副本机制

1. 副本类型

Partition 0:
    ┌─────────────────────────────────┐
    │ Leader (Broker 1)               │ ← 读写请求
    │ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]  │
    └─────────────────────────────────┘
              ↑ 同步
    ┌─────────┴─────────┐
    ↓                   ↓
┌───────────┐     ┌───────────┐
│ Follower  │     │ Follower  │
│ (Broker 2)│     │ (Broker 3)│
└───────────┘     └───────────┘
  • Leader:处理读写请求
  • Follower:同步 Leader 数据,故障时参与选举

2. ISR(In-Sync Replicas)

// ISR 配置
min.insync.replicas=2     // 最小同步副本数
unclean.leader.election.enable=false  // 禁止非 ISR 副本当选 Leader

ISR 动态调整

  • Follower 同步延迟超过阈值 → 移出 ISR
  • Follower 追上 Leader → 加入 ISR

3. ACK 配置

// 生产者 ACK 配置
props.put("acks", "all");  // 0, 1, all(-1)
acks说明可靠性性能
0不等待确认最高
1Leader 确认
allISR 全部确认较低

五、消息顺序性

1. 分区内有序

Partition 0: [msg1, msg2, msg3, msg4]
              消费顺序: msg1 → msg2 → msg3 → msg4

2. 分区间无序

Partition 0: [msg1, msg3]
Partition 1: [msg2, msg4]

可能消费顺序: msg1 → msg2 → msg3 → msg4
          或: msg2 → msg1 → msg4 → msg3

3. 保证全局顺序

方案一:单分区

// 所有消息发到同一分区
new ProducerRecord<>("topic", 0, key, value);  // 指定分区 0

方案二:相同 Key

// 相同 key 的消息到同一分区
new ProducerRecord<>("topic", "order-123", value);  // key 相同

六、分区重平衡(Rebalance)

1. 触发条件

  • 消费者加入/离开消费者组
  • 消费者心跳超时
  • 分区数量变化
  • Topic 变化

2. 重平衡过程

1. 消费者发送 JoinGroup 请求
2. Coordinator 选择 Leader 消费者
3. Leader 制定分区分配方案
4. 所有消费者收到分配结果
5. 开始消费

3. 避免 Rebalance

// 心跳配置
session.timeout.ms=10000      // 会话超时时间
heartbeat.interval.ms=3000    // 心跳间隔
max.poll.interval.ms=300000   // 两次 poll 最大间隔
 
// 处理时间过长会导致 Rebalance
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        process(record);  // 单条处理时间不宜过长
    }
}

七、分区数量规划

1. 影响因素

因素影响
吞吐量需求分区数越多,并行度越高
消费者数量消费者数 ≤ 分区数
Broker 数量分区应均匀分布
文件句柄分区数过多增加开销

2. 计算公式

分区数 = max(
    目标吞吐量 / 单个分区最大吞吐量,
    目标吞吐量 / 单个消费者最大吞吐量
)

示例

  • 目标吞吐量:100 MB/s
  • 单分区写入:20 MB/s
  • 单消费者消费:10 MB/s
分区数 = max(100/20, 100/10) = max(5, 10) = 10

3. 最佳实践

  • 测试环境:3-10 个分区
  • 生产环境:根据吞吐量计算
  • 单个 Broker 分区数 < 2000

八、面试高频问题

Q1: Kafka 为什么快?

  1. 顺序写:磁盘顺序写比随机写快
  2. 零拷贝:sendfile 减少数据拷贝
  3. 页缓存:利用 OS 缓存
  4. 批量处理:消息批量发送和压缩

Q2: 如何保证消息不丢失?

生产者:
- acks=all
- retries=3
- enable.idempotence=true

Broker:
- replication.factor>=3
- min.insync.replicas>=2
- unclean.leader.election.enable=false

消费者:
- enable.auto.commit=false
- 手动提交 offset

Q3: 分区数越多越好吗?

不是。缺点:

  • 文件句柄增多
  • Leader 选举时间增加
  • 可用性降低

Q4: 如何实现消息去重?

生产者幂等

props.put("enable.idempotence", true);  // 开启幂等

消费者去重

  • 唯一 ID + 数据库/Redis
  • 业务幂等设计

Q5: Kafka 如何实现延迟消息?

Kafka 本身不支持延迟消息,常见方案:

  1. 定时任务扫描
  2. 外部调度器
  3. 使用支持延迟的消息队列(如 RocketMQ、Pulsar)

九、最佳实践

1. 生产者配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
props.put("compression.type", "lz4");
props.put("batch.size", 16384);
props.put("linger.ms", 5);

2. 消费者配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
props.put("enable.auto.commit", false);
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 500);
props.put("session.timeout.ms", 10000);

3. 监控指标

  • 消息积压(Lag)
  • 分区分布
  • ISR 数量
  • 生产/消费速率

更新时间:2026年3月16日