EventBus源码简析

摘要

基于3.0版本EventBus分析。

1.1 创建EventBus

我们一般通过EventBus.getDefault()获取到EventBus对象,这里使用单例模式,保证getDefault()得到的都是同一个实例,如果不存在实例,就调用EventBus的构造方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
// Map<Class<?>, CopyOnWriteArrayList<Subscription>>
// key:订阅的事件类型(class),value:订阅这个事件的所有订阅者集合(class集合)
subscriptionsByEventType = new HashMap<>();
// Map<Object, List<Class<?>>>
// key:订阅者对象,value:订阅者订阅的事件类型集合
typesBySubscriber = new HashMap<>();
// Map<Class<?>, Object> 粘性事件
// key:粘性事件类型(class),value:事件对象
stickyEvents = new ConcurrentHashMap<>();
// 事件主线程处理
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
// 事件Background处理
backgroundPoster = new BackgroundPoster(this);
// 事件异步线程处理
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
// 订阅者响应函数信息存储和查找类
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
// 是否支持事件继承
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

通过初始化EventBusBuilder()对象来初始化EventBus的一些配置,将配置解耦,使代码结构更清晰。

1.2 注册过程源码简析

注册过程源码简析

SubscriberMethodFinder的实现

  1. 在3.0版本,EventBus提供EventBusAnnotationProcessor注解处理器在编译期通过读取@Subscribe()注解并解析,处理其中所包含的信息,然后生成java类来保存所有订阅者关于订阅的信息,这样就比在运行时使用反射来获得这些订阅者的信息速度要快。

    我们可以参考EventBus项目里的EventBusPerformance这个例子,编译后我们可以在build文件夹里找到这个类MyEventBusIndex类(当然类名是可以自定义的),我们大致看一下生成的MyEventBusIndex类是什么样的:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public class MyEventBusIndex implements SubscriberInfoIndex {
    private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;
    static {
    SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();
    putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscriberClassEventBusAsync.class,
    true, new SubscriberMethodInfo[] {
    new SubscriberMethodInfo("onEventAsync", TestEvent.class, ThreadMode.ASYNC),
    }));
    putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusMain.class,
    true, new SubscriberMethodInfo[] {
    new SubscriberMethodInfo("onEventMainThread", TestEvent.class, ThreadMode.MAIN),
    }));
    putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.SubscribeClassEventBusDefault.class,
    true, new SubscriberMethodInfo[] {
    new SubscriberMethodInfo("onEvent", TestEvent.class),
    }));
    putIndex(new SimpleSubscriberInfo(org.greenrobot.eventbusperf.testsubject.PerfTestEventBus.SubscribeClassEventBusBackground.class,
    true, new SubscriberMethodInfo[] {
    new SubscriberMethodInfo("onEventBackgroundThread", TestEvent.class, ThreadMode.BACKGROUND),
    }));
    putIndex(new SimpleSubscriberInfo(TestRunnerActivity.class, true, new SubscriberMethodInfo[] {
    new SubscriberMethodInfo("onEventMainThread", TestFinishedEvent.class, ThreadMode.MAIN),
    }));
    }
    private static void putIndex(SubscriberInfo info) {
    SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);
    }
    @Override
    public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {
    SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);
    if (info != null) {
    return info;
    } else {
    return null;
    }
    }
    }

可以看到是使用一个静态的HashMapSUBSCRIBER_INDEX来保存订阅类的消息;其中包含订阅类的class类型、是否需要检查父类、以及订阅方法信息的SubscriberMethodInfo数组;SubscriberMethodInfo中保存了订阅方法的方法名、订阅的事件类型、触发线程、是否接收sticky事件、以及优先级priority。

我们可以通过EventBus.builder().addIndex(new MyEventBusIndex()).build()MyEventBusIndex配置进EventBus,这样就能在SubscriberMethodFinder类中直接查找出订阅类的信息,不需要再利用注解判断了,当然这种方法是作为EventBus的可选配置。

  1. SubscriberMethodFinder同样提供了通过反射来获得订阅类信息的方法,即通过findUsingReflection方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    // FindState 用来做订阅方法的校验和保存
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
    // 通过反射来获得订阅方法信息
    findUsingReflectionInSingleClass(findState);
    // 查找父类的订阅方法
    findState.moveToSuperclass();
    }
    // 获取findState中的SubscriberMethod(也就是订阅方法List)并返回
    return getMethodsAndRelease(findState);
    }

通过FindState类来做订阅方法的校验和保存,并通过FIND_STATE_POOL静态数组来保存FindState对象,可以使FindState复用,避免重复创建过多的对象。

通过findUsingReflectionInSingleClass()来具体获取订阅的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private void findUsingReflectionInSingleClass(FindState findState) {
// 通过反射得到方法数组
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
// 找到所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
// 找到 public方法,包含继承和接口的方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
// 遍历Method
for (Method method : methods) {
int modifiers = method.getModifiers();
// 判断修饰符, 是public , 并且不包含 static, abstract
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 获取方法的参数类型
Class<?>[] parameterTypes = method.getParameterTypes();
// 保证必须只有一个事件参数
if (parameterTypes.length == 1) {
// 得到注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 校验是否添加该方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 实例化SubscriberMethod对象并添加
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

  1. 最后再通过getMethodsAndRelease()返回List<SubscriberMethod>

1.3 解除注册源码简析

解除注册只要调用unregister()方法即可,分别从typesBySubscribersubscriptionsByEventType移除订阅者相关信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public synchronized void unregister(Object subscriber) {
// 根据订阅者对象subscriber取出它订阅的事件类型集合
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
// 遍历事件类型,匹配移除subscriptionsByEventType中存储的订阅者对象subscriber
unsubscribeByEventType(subscriber, eventType);
}
// 移除typesBySubscriber中存储的订阅者对象subscriber
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 根据事件类型eventType取出订阅该事件类型的所有订阅者对象
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

1.4 事件分发过程源码简析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
public void post(Object event) {
// 得到当前线程的Posting状态.
PostingThreadState postingState = currentPostingThreadState.get();
// 获取当前线程的事件队列
List<Object> eventQueue = postingState.eventQueue;
// 把消息添加到队列
eventQueue.add(event);
// 判断状态
if (!postingState.isPosting) {
// 判断是否主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
// 修改状态
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 一直发送
while (!eventQueue.isEmpty()) {
// 发送单个事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
// 是否触发订阅了该事件(eventClass)的父类,以及接口的类的响应方法.
if (eventInheritance) {
// 查找eventClass类所有的父类以及接口
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
// 循环postSingleEventForEventType
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 只要右边有一个为true,subscriptionFound就为true
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// post单个
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
// 如果没发现有事件分发
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 发送一个NoSubscriberEvent事件,如果我们需要处理这种状态,接收这个事件就可以了
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
// 获取订阅了这个事件的Subscription列表
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
// 是否被中断
boolean aborted = false;
try {
// 分发给订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 默认的threadMode选项。哪个线程发就哪个线程收。
invokeSubscriber(subscription, event);
break;
case MAIN:
// 切换至主线程收。
if (isMainThread) {
// 如果就在主线程,直接调用订阅者的事件响应方法
invokeSubscriber(subscription, event);
} else {
// 否则通过handler发送消息到主线程处理
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
// 切换至子线程收。
// 由于后台线程是唯一的,当事件超过一个的时候,它们会被放在队列中依次执行,
// 最好不要有重度耗时的操作或太频繁的轻度耗时操作,以造成其他操作等待。
if (isMainThread) {
// 如果在主线程,切换到子线程收
backgroundPoster.enqueue(subscription, event);
} else {
// 否则直接调用订阅者的事件响应方法
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 不论发布线程是否为主线程,都使用一个空闲线程来处理。
// 区别于BACKGROUND,ASYNC所有线程是相互独立的,因此不会出现卡线程的问题。
// 适用场景:长耗时操作,例如网络访问。
asyncPoster.enqueue(subscription, event);
break;
// v3.1.1版本新增MAIN_ORDERED,切换至主线程收,但区别于MAIN,它不会区分当前线程,
// 而是通通使用 mainThreadPoster 来处理,也就是必然会走一遍 Handler 的消息分发。
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

1.5 事件分发线程处理过程源码简析

1.5.1 mainThreadPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
final class HandlerPoster extends Handler {
...
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
// 当下在处理消息,不sendMessage()
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
// 避免频繁的向主线程sendMessage(),是一个消息内尽可能多的处理消息事件,
// 所以使用while循环处理消息事件,持续从消息队列queue中获取消息事件
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
// 为避免长期占用主线程,间隔10ms重新sendMessage(),用于让出主线程执行权,避免造成UI卡顿和ANR
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}

1.5.2 backgroundPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
final class BackgroundPoster implements Runnable {
...
// volatile确保线程运行标识在多线程下的可见性
private volatile boolean executorRunning;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
// 当下有执行线程时,不重复execute
if (!executorRunning) {
executorRunning = true;
// 使用线程池执行
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
// 在一个线程中循环处理消息事件,避免反复使用线程
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}

1.5.3 asyncPoster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AsyncPoster implements Runnable {
...
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
// 每一个消息事件都抛给线程池执行
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}

默认eventBus.getExecutorService()的配置是Executors.newCachedThreadPool()使用无界队列SynchronousQueue当事件过多时,会出现OOM

参考

  1. EventBus 3.0 源码分析
  2. 入职接手旧项目,所有网络请求数据通过 EventBus 分发,吓得我想离职…