知识模块
☕ Java 知识模块
十一、设计模式
观察者模式与发布订阅

观察者模式与发布订阅

观察者模式和发布订阅模式都是用于对象间的通信,实现松耦合的事件驱动架构。

一、观察者模式

1. 核心概念

观察者模式定义对象间的一对多依赖,当一个对象状态改变时,所有依赖它的对象都会收到通知。

┌─────────────────────────────────────┐
│          Subject (主题)              │
│  - observers: List<Observer>        │
│  + attach(Observer)                 │
│  + detach(Observer)                 │
│  + notify()                         │
└──────────────┬──────────────────────┘
               │ 通知

┌─────────────────────────────────────┐
│          <<interface>>              │
│           Observer                  │
│  + update(message): void            │
└──────────────┬──────────────────────┘
               │ 实现
     ┌─────────┼─────────┐
     ▼         ▼         ▼
┌────────┐ ┌────────┐ ┌────────┐
│Observer│ │Observer│ │Observer│
│   A    │ │   B    │ │   C    │
└────────┘ └────────┘ └────────┘

2. 实现

// 观察者接口
public interface Observer {
    void update(String message);
}
 
// 主题接口
public interface Subject {
    void attach(Observer observer);
    void detach(Observer observer);
    void notifyObservers(String message);
}
 
// 具体主题
public class NewsAgency implements Subject {
    private List<Observer> observers = new ArrayList<>();
    private String news;
    
    @Override
    public void attach(Observer observer) {
        observers.add(observer);
    }
    
    @Override
    public void detach(Observer observer) {
        observers.remove(observer);
    }
    
    @Override
    public void notifyObservers(String message) {
        for (Observer observer : observers) {
            observer.update(message);
        }
    }
    
    public void publishNews(String news) {
        this.news = news;
        notifyObservers(news);
    }
}
 
// 具体观察者
public class NewsChannel implements Observer {
    private String name;
    
    public NewsChannel(String name) {
        this.name = name;
    }
    
    @Override
    public void update(String message) {
        System.out.println(name + " 收到新闻: " + message);
    }
}
 
// 使用
NewsAgency agency = new NewsAgency();
agency.attach(new NewsChannel("CCTV"));
agency.attach(new NewsChannel("BBC"));
agency.publishNews("重大新闻发布!");

3. Java 内置实现

// 被观察者
public class WeatherStation extends Observable {
    private float temperature;
    
    public void setTemperature(float temperature) {
        this.temperature = temperature;
        setChanged();      // 标记状态已改变
        notifyObservers(temperature);
    }
}
 
// 观察者
public class WeatherDisplay implements Observer {
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("温度更新: " + arg);
    }
}
 
// 使用
WeatherStation station = new WeatherStation();
station.addObserver(new WeatherDisplay());
station.setTemperature(25.5f);

二、发布订阅模式

1. 核心概念

发布订阅模式引入消息代理(Broker),发布者和订阅者完全解耦。

┌──────────┐     ┌──────────┐     ┌──────────┐
│Publisher │     │Publisher │     │Publisher │
│   (发布者) │     │   (发布者) │     │   (发布者) │
└────┬─────┘     └────┬─────┘     └────┬─────┘
     │                │                │
     └────────────────┼────────────────┘

         ┌────────────────────────┐
         │   Message Broker       │
         │     (消息代理)          │
         │  ┌────────────────┐    │
         │  │ topic: news    │    │
         │  │ topic: sports  │    │
         │  │ topic: weather │    │
         │  └────────────────┘    │
         └────────┬───────────────┘

     ┌────────────┼────────────┐
     ▼            ▼            ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Subscriber│ │Subscriber│ │Subscriber│
│  (订阅者)  │ │  (订阅者)  │ │  (订阅者)  │
└──────────┘ └──────────┘ └──────────┘

2. 实现

// 消息
public class Message {
    private String topic;
    private String content;
    
    // getter/setter
}
 
// 消息代理
public class MessageBroker {
    private Map<String, List<Consumer<Message>>> subscribers = new ConcurrentHashMap<>();
    
    // 订阅
    public void subscribe(String topic, Consumer<Message> subscriber) {
        subscribers.computeIfAbsent(topic, k -> new ArrayList<>()).add(subscriber);
    }
    
    // 取消订阅
    public void unsubscribe(String topic, Consumer<Message> subscriber) {
        List<Consumer<Message>> list = subscribers.get(topic);
        if (list != null) {
            list.remove(subscriber);
        }
    }
    
    // 发布
    public void publish(String topic, Message message) {
        List<Consumer<Message>> list = subscribers.get(topic);
        if (list != null) {
            for (Consumer<Message> subscriber : list) {
                subscriber.accept(message);
            }
        }
    }
}
 
// 使用
MessageBroker broker = new MessageBroker();
 
// 订阅
broker.subscribe("news", msg -> System.out.println("订阅者1: " + msg.getContent()));
broker.subscribe("news", msg -> System.out.println("订阅者2: " + msg.getContent()));
broker.subscribe("sports", msg -> System.out.println("体育订阅者: " + msg.getContent()));
 
// 发布
Message msg = new Message("news", "今日头条");
broker.publish("news", msg);

3. Spring 事件机制

// 自定义事件
public class UserRegisteredEvent extends ApplicationEvent {
    private String username;
    
    public UserRegisteredEvent(Object source, String username) {
        super(source);
        this.username = username;
    }
    
    public String getUsername() {
        return username;
    }
}
 
// 事件发布者
@Service
public class UserService {
    @Autowired
    private ApplicationEventPublisher eventPublisher;
    
    public void register(String username) {
        // 注册逻辑
        System.out.println("用户注册: " + username);
        
        // 发布事件
        eventPublisher.publishEvent(new UserRegisteredEvent(this, username));
    }
}
 
// 事件监听者
@Component
public class UserEventListener {
    
    @EventListener
    public void onUserRegistered(UserRegisteredEvent event) {
        System.out.println("发送欢迎邮件: " + event.getUsername());
    }
    
    @EventListener
    @Async
    public void onUserRegisteredAsync(UserRegisteredEvent event) {
        System.out.println("异步处理: " + event.getUsername());
    }
}
 
// 使用
userService.register("张三");

三、两种模式对比

对比项观察者模式发布订阅模式
耦合度松耦合(知道彼此)完全解耦(通过 Broker)
通信方式直接通信通过消息代理
适用场景单应用内部分布式系统
扩展性一般
复杂度
观察者模式:Subject ──→ Observer(直接)
发布订阅模式:Publisher ──→ Broker ──→ Subscriber(间接)

四、实际应用

1. 观察者模式应用

GUI 事件处理

button.addActionListener(e -> System.out.println("按钮被点击"));

RxJava/Reactor

Observable.just("Hello")
    .subscribe(s -> System.out.println(s));

2. 发布订阅应用

消息队列

  • RabbitMQ
  • Kafka
  • Redis Pub/Sub

Spring Cloud Stream

// 发布者
@Bean
public Supplier<Message<String>> source() {
    return () -> MessageBuilder.withPayload("message").build();
}
 
// 订阅者
@Bean
public Consumer<Message<String>> sink() {
    return msg -> System.out.println("收到消息: " + msg);
}

3. Vue 响应式原理

// 简化版 Vue 响应式
class Dep {
    constructor() {
        this.subs = [];
    }
    
    addSub(sub) {
        this.subs.push(sub);
    }
    
    notify() {
        this.subs.forEach(sub => sub.update());
    }
}
 
class Watcher {
    constructor(vm, expr, cb) {
        this.cb = cb;
        Dep.target = this;
        this.value = vm[expr];  // 触发 getter,添加依赖
        Dep.target = null;
    }
    
    update() {
        this.cb();
    }
}

五、面试高频问题

Q1: 观察者模式的优缺点?

优点

  • 观察者和被观察者抽象耦合
  • 支持广播通信
  • 符合开闭原则

缺点

  • 可能导致性能问题(大量观察者)
  • 循环依赖问题
  • 不知道具体发生了什么变化

Q2: 发布订阅模式的优势?

  • 完全解耦发布者和订阅者
  • 支持异步处理
  • 易于扩展
  • 支持持久化和重试

Q3: Spring Event 是什么模式?

Spring Event 是观察者模式的变体:

  • ApplicationEventPublisher 相当于 Subject
  • @EventListener 相当于 Observer
  • 支持 @Async 异步处理

Q4: Vue 双向绑定原理?

数据变化 ──→ 触发 setter ──→ Dep.notify() ──→ Watcher.update() ──→ 视图更新
视图更新 ──→ 触发事件 ──→ 修改数据 ──→ 循环开始

Q5: 如何选择两种模式?

单应用内部、简单场景 → 观察者模式
分布式系统、需要解耦 → 发布订阅模式

六、最佳实践

1. 观察者模式注意内存泄漏

// 及时取消订阅
public class View implements Observer {
    private Subject subject;
    
    public View(Subject subject) {
        this.subject = subject;
        subject.attach(this);
    }
    
    public void destroy() {
        subject.detach(this);  // 防止内存泄漏
    }
}

2. 发布订阅使用异步

@EventListener
@Async
public void handleEvent(MyEvent event) {
    // 异步处理,不阻塞主线程
}

3. 合理设计事件粒度

// 不推荐:事件过于宽泛
public class DataChangedEvent { }
 
// 推荐:事件具体明确
public class UserPasswordChangedEvent {
    private Long userId;
    private String newPassword;
}

更新时间:2026年3月16日