0%

观察者设计模式

我们在实际中会有这种需求:当某些事件发生的时候,就调用某些方法。这个时候就需要用到观察者这一设计模式。

一个例子

找的网上的一个例子,现在有一个天气预报中心,它能够向外部广播出自己的信息,如果有对象需要天气信息,那么就自己去订阅。

所以从上面的这些信息,不难首先定义出天气预报中心作为一个接口:

1
2
3
public interface WeatherCenter {
void publishWeatherInfo();
}

然后把所有的订阅者作为Observer,抽象出对应的接口:

1
2
3
public interface Observer {
void sendWeatherWarning();
}

然后可以有多个对象,它们实现这个Observer接口。

最后我只需要去创建一个对象,实现上面的天气预报中心,并且它持有一个集合,然后把这些所有的订阅者都放到里面,这样在 publishWeatherInfo 方法里面,去挨个调用它们的 sendWeatherWarning 方法即可。

观察者模式的核心在于事件的发布者,其实是它通知了所有的这些观察者(由它主动调用了它们的方法),并不是监听这些事件的人执行了这段代码,它们仅仅是提前定义好了如果发生这件事怎么做,真正调用这段代码的是被观察者

所以,观察者其实是一个被动的角色,而是由被观察者来主动通知观察者,观察者只需要在自己的代码里实现 一旦发生了,就干什么 的逻辑就好了,而不用关心是什么时候发生的,事件是怎么到自己这里来的。

定义

有了上面的例子,我们不难抽象出观察者设计模式中的角色:

  • Observer:观察者。反应到程序中就是一个接口。
  • ConcreteObserver:具体的观察者。反应到程序中就是实现了 Observer 接口的类。
  • Subject:被观察的对象。反映到程序里就是一个接口。它提供了一个方法用来通知观察者,当然也可以有方法来新增观察者和删除观察者(上面的例子里只有通知观察者一个方法)。
  • ConcreteSubject:具体的观察对象。上面的 Subject 的实现类,在里面实现对应的方法。

JDK中设计模式

纯正观察者

从上面的设计模式中我们可以发现,其实就是两个接口,然后实现就可以了。JDK自然为我们实现了对应的观察者模式,分别抽象出了 Observer 接口 和 Observable 类。

由于它们俩是在是有点太简单了,这里就直接跳过了,直接进入到第二步:事件模型。

事件驱动

上述的观察者模式还是很好理解的,于是就可以在此基础上更抽象一步, 抽象出事件驱动模型。其中观察者对应监听器(Listener),而被观察者对应的是事件源source,其中事件源被封存在事件event中。JDK为事件抽象出了两个接口:EventObjectEventListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52

/**
* <p>
* The root class from which all event state objects shall be derived.
* <p>
* All Events are constructed with a reference to the object, the "source",
* that is logically deemed to be the object upon which the Event in question
* initially occurred upon.
*
* @since JDK1.1
*/

public class EventObject implements java.io.Serializable {

private static final long serialVersionUID = 5516075349620653480L;

/**
* The object on which the Event initially occurred.
*/
protected transient Object source;

/**
* Constructs a prototypical Event.
*
* @param source The object on which the Event initially occurred.
* @exception IllegalArgumentException if source is null.
*/
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");

this.source = source;
}

/**
* The object on which the Event initially occurred.
*
* @return The object on which the Event initially occurred.
*/
public Object getSource() {
return source;
}

/**
* Returns a String representation of this EventObject.
*
* @return A a String representation of this EventObject.
*/
public String toString() {
return getClass().getName() + "[source=" + source + "]";
}
}

这里值得注意的是,EventObject 是一个普通的类,所以事件都需要继承自它。而 EventListener 则是一个空接口,我们一般是自己定义一个自己的Listener接口继承这个接口,然后再去实现它。

除此之外我们可以发现,最为重要的是 source 这个对象,所有事件都是通过引用它来进行构造的,也就是这个source其实就是上面观察者模式中的 Observable 类,也就是事件的源。

比如A通知B一件事情,那么这个事情本身就是一个EventObject,然后A就是其中的source,而B则是对应的listener。本质上和观察者完全一致,只不过它又抽象出事件这个概念,并且让事件持有事件源这个对象,这样当listener获得事件的时候,也可以顺手获取到事件源。

一个具体的例子

例子来源:https://zhuanlan.zhihu.com/p/27273286

假设老师需要向学生发布做作业的event,那么毫无疑问,做作业这个事件就是event,而老师是source,学生是listener。

首先先抽象出这个event:

1
2
3
4
5
public class HomeworkEventObject extends EventObject {
public HomeworkEventObject(Object source) {
super(source);
}
}

一般来说,事件中的source就是监听器(学生)能够获取到的对象(老师),所以可以好好封装(比如封装老师需要传递给学生的信息)。当然你也可以在event中持有别的对象引用,这样也行。

接着抽象出Listener(学生),这里推荐的是自己首先抽象出接口去继承JDK提供的,然后具体的监听器再去实现它:

1
2
3
public interface HomeworkListener extends EventListener {
void handleEvent(HomeworkEventObject o, Object arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Student implements HomeworkListener {
private final String name;

public Student(String name) {
this.name = name;
}

@Override
public void handleEvent(HomeworkEventObject o, Object arg) {
Teacher teacher = (Teacher) o.getSource();
System.out.printf("学生%s观察到%s布置了作业《%s》 \n", this.name, teacher.getName(), arg);
}
}

最后才去实现整个事件的源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Teacher {
private String name;
private List<String> homeworks;

private Set<HomeworkListener> homeworkListenerList;

public String getName() {
return this.name;
}

public Teacher(String name) {
this.name = name;
this.homeworks = new ArrayList<>();
this.homeworkListenerList = new HashSet<>();
}

public void setHomework(String homework) {
System.out.printf("%s布置了作业%s \n", this.name, homework);
homeworks.add(homework);
HomeworkEventObject event = new HomeworkEventObject(this);

for (HomeworkListener listener : homeworkListenerList) {
listener.handleEvent(event, homework);
}
}

public void addObserver(HomeworkListener homeworkListener) {
homeworkListenerList.add(homeworkListener);
}
}

可以看到比较难的就是事件源的编写,因为它需要自己维护所有的监听器,并且还需要调用对应的方法。

总结

我们在自己使用JDK的事件机制的时候(虽然基本也没机会用),可以遵循先抽象出事件、然后定义对应监听器,最后自己去实现事件源的顺序去做。

Spring中的事件机制

Spring 同样抽象出了两个接口,分别是 ApplicationEventApplicationListener ,分别对应JDK的两个接口。

当然在spring中做了简化,只需要bean实现了Listener接口并且注册到了容器中,那么每次有ApplicationEvent发生,这个bean就可以获得通知,也就是事件源不再需要自己去维护对应的集合了,配合注解来说就更加简单了。

还是上面的例子

还是上面的例子,首先第一步还是先抽象出对应的事件:

1
2
3
4
5
public class HomeworkEvent extends ApplicationEvent {
public HomeworkEvent(Object source) {
super(source);
}
}

然后是对应的listener:

1
2
3
4
5
6
7
@Component
public class Student implements ApplicationListener<HomeworkEvent> {
@Override
public void onApplicationEvent(HomeworkEvent event) {
// 对应的处理逻辑
}
}

对应的event的source这里就省略了,我自己定义的就是一个空对象,然后自己定义了一个Service进行测试。这里其实event的source也可以是一个publisher,这样可能更加符合常识。

发送事件在spring中特别简单,只需要在容器中自动注入一个ApplicationEventPublisher的对象(ApplicationContext实现了这个接口,所以其实可以直接使用ApplicationContext进行事件的发布) ,然后利用这个对象进行事件的发布即可。

所以我们在使用spring中只需要定义好事件,写好监听器的处理逻辑,最后只需要在你需要发送该事件的时候,注入ApplicationEventPublisher并进行事件的发送即可。但是需要注意的是,发送事件和处理事件是在同一个线程中进行处理的,而且默认情况下事件发送之后,需要等待事件处理完成之后,才会接下去处理。

角色定义

spring中的事件机制一共有四个角色:

  • 事件,对应的类是 ApplicationEvent,它本身是一个抽象类,继承自JDK的EventObject。除了原有的source之外,只是额外新增了一个时间戳而已。
  • 事件监听器,对应的类是 ApplicationListener,它是一个接口继承自JDK的EventListener,作为一个函数型接口,只有一个方法,也就是当事件发生的时候会进行的逻辑,所以我们可以使用lambda表达式。
  • 事件发布者,对应的类是ApplicationEventPublisher,这也是和JDK最与众不同的地方,spring抽象出了这个类,用这个类来进行事件的发布。
  • 事件广播器,对应的类是ApplicationEventMulticaster,从名字上看,它是把事件发送给所有的监听器,但是其实它是用来对上面的三者进行管理的对象。所有的事件监听器都注册到广播器中,这样广播器就能从全局视角知道所有事件和所有监听器之间的关系了。

同步与异步

spring的事件机制默认就是同步的,即事件发布之后,假设有三个监听器在监听,那么需要等监听器1处理完其中的逻辑之后,然后监听器2的逻辑才会进行处理,然后等到监听器2的处理逻辑处理完,监听器3的逻辑才开始处理… 等到所有的监听器的逻辑都走完之后,才回到发送事件之后的代码之后去继续执行。总体流程可参考下图:

image-20210809142500117

如果我们不希望它按照默认的逻辑来做,那么可以在对应的监听器的处理逻辑中,新开一个线程进行逻辑的处理,或者直接交给线程池进行处理。

执行顺序

在spring默认的状态下,事件监听器默认是同步的,那么必然有个顺序,spring默认的是bean注册到容器中的顺序进行的。当然是可以通过实现Ordered接口来进行人为的操控的,只要实现Ordered接口,然后自己实现getOrder()就可以了。order的实现机制是,数字越小优先级越高,所以优先级最高的是Integer.MIN_VALUE。当然如果两个bean的order是一样的,那么就继续按照bean的加载顺序来。

除此之外spring本身还有另外一个接口PriorityOrdered,它继承了Ordered接口,在spring中的机制是:任何实现了PriorityOrdered接口的类都会优先于Ordered接口,最后就是那些没有设置过Order的类。

实现原理

所有的这一切,肯定需要首先深入spring 的 ApplicationContext 中才可以解密。在spring的refresh()方法中,可以发现:

1
2
3
4
5
6
7
8
9
10
// 其它流程
// Initialize event multicaster for this context.
initApplicationEventMulticaster();

// Initialize other special beans in specific context subclasses.
onRefresh();

// Check for listener beans and register them.
registerListeners();
// 其它流程

很明显,第一行方法是初始化了事件广播器,最后一行注册了所有的监听器,那我们就把目光放在这两个函数中。

初始化广播器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected void initApplicationEventMulticaster() {
// 获取beanFactory
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
// 如果已经有用户自定义的广播器
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
// 如果用户没有自定义广播器【一般都不会自己去定义】
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}

可以发现,如果用户没有自定义广播器,那么就会使用SimpleApplicationEventMulticaster作为默认的广播器。

注册监听器并发布早期事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
// 把所有statically specified【静态】的监听器放到广播器的管理中
getApplicationEventMulticaster().addApplicationListener(listener);
}

// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
// 从beanFactory中获取所有实现了对应接口的bean,并注册
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}

// Publish early application events now that we finally have a multicaster... 推送那些需要早期推送的事件
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (!CollectionUtils.isEmpty(earlyEventsToProcess)) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}

简单来说就是找到所有的监听器,然后把它们都加入到广播器中就行了。

事件发布逻辑

因为ApplicationContext接口本身就是一个publisher,所以我们可以通过直接使用它进行事件的发布。实际的实现在AbstractApplicationContext中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");

// Decorate event as an ApplicationEvent if necessary 如果事件可以是ApplicationEvent,则强转
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
// 这里的eventType平常使用的话是null
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}

// Multicast right now if possible - or lazily once the multicaster is initialized
// 如果可能就立即广播,或者等待初始化
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
// 可以看到就是获取了广播器,进行了事件的广播
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}

// Publish event via parent context as well... 如果有父容器,那么也需要调用
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}

广播器广播逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// 获取executor,所以如果容器中有executor,那么事件默认下其实是异步的
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}

最后invokeListener就是调用了listener的对应的方法。还有我自己特意往容器中注入了一个executor,但是发现spring还是用的同步的方式。