知识模块
☕ Java 知识模块
十、中间件
ZooKeeper 应用场景

ZooKeeper 应用场景

ZooKeeper 是一个分布式协调服务,为分布式应用提供一致性服务。

一、ZooKeeper 基础

1. 数据模型

/
├── app
│   ├── servers
│   │   ├── server1
│   │   ├── server2
│   │   └── server3
│   └── config
│       └── db_url
└── lock
    └── order_123
  • ZNode:数据节点,类似文件系统的目录
  • 路径:每个 ZNode 有唯一路径
  • 数据:每个 ZNode 可存储数据(默认 1MB)

2. ZNode 类型

类型说明应用场景
持久节点永久存在,直到删除配置、服务注册
临时节点会话结束自动删除服务发现、锁
持久顺序持久 + 自动编号分布式队列
临时顺序临时 + 自动编号分布式锁、选举

3. Watcher 机制

// 监听节点变化
zk.exists("/path", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            // 节点数据变化
        }
    }
});

特点

  • 一次性触发(触发后需重新注册)
  • 异步通知
  • 可以监听数据变化、子节点变化

二、核心应用场景

1. 配置中心

                    ┌──────────────┐
                    │   ZK 集群     │
                    │  /config     │
                    │  ├── db_url  │
                    │  ├── timeout │
                    │  └── threads │
                    └──────┬───────┘
                           │ Watch
          ┌────────────────┼────────────────┐
          ↓                ↓                ↓
    ┌──────────┐     ┌──────────┐     ┌──────────┐
    │ Server A │     │ Server B │     │ Server C │
    └──────────┘     └──────────┘     └──────────┘

实现步骤

// 1. 初始化:读取配置
byte[] data = zk.getData("/config/db_url", true, null);
String dbUrl = new String(data);
 
// 2. 监听变化
zk.exists("/config/db_url", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeDataChanged) {
            // 重新读取配置
            byte[] newData = zk.getData("/config/db_url", true, null);
            updateConfig(new String(newData));
            // 重新注册 Watcher
            zk.exists("/config/db_url", this);
        }
    }
});

2. 服务注册与发现

服务提供者:
    启动 → 创建临时节点 /services/order-service/server1
    停止 → 临时节点自动删除

服务消费者:
    订阅 /services/order-service 子节点变化
    获取可用服务列表

服务提供者

// 注册服务(临时节点)
String path = zk.create(
    "/services/order-service/server",  // 路径
    "192.168.1.100:8080".getBytes(),   // 数据:服务地址
    ZooDefs.Ids.OPEN_ACL_UNSAFE,
    CreateMode.EPHEMERAL_SEQUENTIAL    // 临时顺序节点
);
// 结果: /services/order-service/server0000000001

服务消费者

// 获取服务列表
List<String> servers = zk.getChildren("/services/order-service", true);
 
// 监听服务变化
zk.getChildren("/services/order-service", new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
            // 更新本地服务列表
            updateServerList();
        }
    }
});

3. 分布式锁

方案一:临时节点

public boolean tryLock(String lockPath) {
    try {
        zk.create(lockPath, new byte[0], 
                  ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                  CreateMode.EPHEMERAL);
        return true;  // 创建成功,获取锁
    } catch (KeeperException.NodeExistsException e) {
        return false;  // 节点已存在,锁被占用
    }
}
 
public void unlock(String lockPath) {
    zk.delete(lockPath, -1);
}

问题:惊群效应 - 锁释放时所有等待者同时竞争

方案二:临时顺序节点(推荐)

public void lock(String lockPath) throws Exception {
    // 1. 创建临时顺序节点
    String myNode = zk.create(lockPath + "/lock-", 
                              new byte[0],
                              ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                              CreateMode.EPHEMERAL_SEQUENTIAL);
    
    // 2. 获取所有子节点
    List<String> children = zk.getChildren(lockPath, false);
    Collections.sort(children);
    
    // 3. 判断自己是否是最小节点
    String myName = myNode.substring(lockPath.length() + 1);
    int myIndex = children.indexOf(myName);
    
    if (myIndex == 0) {
        // 是最小节点,获取锁成功
        return;
    }
    
    // 4. 监听前一个节点
    String prevNode = lockPath + "/" + children.get(myIndex - 1);
    final CountDownLatch latch = new CountDownLatch(1);
    
    zk.exists(prevNode, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                latch.countDown();  // 前节点删除,尝试获取锁
            }
        }
    });
    
    latch.await();  // 等待前节点删除
}

4. 分布式选举

场景:集群中选举一个 Leader

方案:临时顺序节点

/leader-election
    ├── node-0000000001  ← 最小节点成为 Leader
    ├── node-0000000002
    ├── node-0000000003
    └── node-0000000004
public void electLeader(String electionPath) throws Exception {
    // 1. 创建临时顺序节点
    String myNode = zk.create(electionPath + "/node-", 
                              "server-info".getBytes(),
                              ZooDefs.Ids.OPEN_ACL_UNSAFE, 
                              CreateMode.EPHEMERAL_SEQUENTIAL);
    
    // 2. 获取所有候选者
    List<String> candidates = zk.getChildren(electionPath, false);
    Collections.sort(candidates);
    
    // 3. 最小节点为 Leader
    String smallest = candidates.get(0);
    if (myNode.equals(electionPath + "/" + smallest)) {
        // 成为 Leader
        becomeLeader();
    } else {
        // 成为 Follower,监听前一个节点
        int myIndex = candidates.indexOf(myNode.substring(electionPath.length() + 1));
        String prevNode = electionPath + "/" + candidates.get(myIndex - 1);
        
        zk.exists(prevNode, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDeleted) {
                    // 前节点宕机,重新选举
                    electLeader(electionPath);
                }
            }
        });
    }
}

5. 分布式队列

FIFO 队列

// 入队
public void enqueue(String queuePath, byte[] data) throws Exception {
    zk.create(queuePath + "/element-", data,
              ZooDefs.Ids.OPEN_ACL_UNSAFE, 
              CreateMode.PERSISTENT_SEQUENTIAL);
}
 
// 出队
public byte[] dequeue(String queuePath) throws Exception {
    while (true) {
        List<String> elements = zk.getChildren(queuePath, false);
        if (elements.isEmpty()) {
            Thread.sleep(100);
            continue;
        }
        
        Collections.sort(elements);
        String firstElement = queuePath + "/" + elements.get(0);
        
        try {
            byte[] data = zk.getData(firstElement, false, null);
            zk.delete(firstElement, -1);
            return data;
        } catch (KeeperException.NodeExistsException e) {
            // 节点已被消费,重试
        }
    }
}

6. 集群管理

/cluster
    ├── servers
    │   ├── server-192.168.1.100:8080  (临时节点)
    │   ├── server-192.168.1.101:8080
    │   └── server-192.168.1.102:8080
    └── master
        └── server-192.168.1.100:8080  (当前 Master)

功能

  • 实时感知集群节点状态
  • 自动故障转移
  • Master 选举

三、ZooKeeper 集群

1. 集群架构

┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│   Leader    │  │  Follower   │  │  Follower   │
│   (写请求)   │←→│  (读请求)   │←→│  (读请求)   │
└─────────────┘  └─────────────┘  └─────────────┘
       │                │                │
       └────────────────┼────────────────┘

                   数据同步(ZAB 协议)

2. ZAB 协议

崩溃恢复

  1. Leader 宕机 → 进入选举阶段
  2. 选举新的 Leader(ZXID 最大)
  3. 数据同步

消息广播

  1. Leader 收到写请求
  2. 生成提案(Proposal)
  3. 发送给所有 Follower
  4. 多数 Follower 确认后提交

3. 选举机制

选举规则:
1. ZXID(事务ID)最大者优先
2. ZXID 相同,myid 大者优先
3. 获得超过半数投票者当选

四、面试高频问题

Q1: ZooKeeper 如何保证一致性?

  • ZAB 协议:保证主从数据一致性
  • 原子广播:写操作同步到多数节点
  • 顺序一致性:全局有序的事务 ID

Q2: 临时节点和持久节点的区别?

类型生命周期应用场景
持久节点手动删除配置、元数据
临时节点会话结束服务注册、锁

Q3: 分布式锁的实现方式对比?

方案优点缺点
ZK 临时顺序节点避免惊群效应性能较低
Redis SETNX性能高需要处理锁超时
数据库唯一索引简单性能最低

Q4: ZooKeeper vs Eureka?

特性ZooKeeperEureka
CAPCPAP
一致性强一致最终一致
可用性选举期间不可用高可用
适用场景配置、锁、选举服务发现

Q5: Watcher 为什么是一次性的?

设计原因:

  • 减轻服务端压力
  • 避免重复通知
  • 客户端可按需重新注册

五、最佳实践

1. 节点设计

  • 持久节点:配置、元数据
  • 临时节点:服务实例、锁
  • 顺序节点:选举、队列

2. 监听设计

  • 及时重新注册 Watcher
  • 异步处理事件
  • 避免阻塞 Watcher 回调

3. 异常处理

  • 连接断开重试
  • Session 超时处理
  • 节点不存在处理

4. 性能优化

  • 合理设置 Watcher
  • 避免频繁创建节点
  • 使用连接池

更新时间:2026年3月16日