前言

MessageBus 类似于 EventBus,是一个消息总线框架,常用于处理跨模块通信问题。它基于订阅者模式实现,主要角色如下:

  • 订阅者:在 MessageBus 中的订阅者是一个个被 Subscriber 注解标记的方法,被 Subscriber 标记的方法只能有一个参数:event
  • 订阅 class:含有被 Subscriber 注解标记方法的类,一个订阅者 class 可能有多个订阅者。订阅 class 需要通过 register 方法告诉调度中心它有哪些订阅者,否则订阅者就无法收到对应的 event
  • 调度中心:
    • 接受订阅 class 的注册,将其中所有的订阅者存储起来
    • 提供事件发送能力,并将事件通知给对应的订阅者
  • 发布者:一般位于业务中,通过 MessageBus 发送指定的 event,使得订阅者可以执行

MessageBus

初始化

MessageBus 的作用决定了它的初始化方式,由于 MessageBus 需要在全局获得,所以它采用了双检锁单例模式。

1
2
3
4
5
6
7
8
9
10
public static MessageBus getInstance() {
if (sBus == null) {
synchronized (MessageBus.class) {
if (sBus == null) {
sBus = new MessageBus();
}
}
}
return sBus;
}

MessageBus 的构造方法如下,涉及到的类职责:

  • mSubscriberMap:实际存储订阅者的数据结构
  • SubscriptionRouter:管理订阅者的类
  • MessageDispatcher:处理事件分发的类
  • mLocalMessages:个人觉得没啥作用
  • mStickyMessages:存储粘性事件的数据结构
  • mBusWorkThreadPool:用于异步 register 的线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public MessageBus() {

mSubscriberMap = new ConcurrentHashMap<MessageType, CopyOnWriteArrayList<Subscription>>();

mRouter = new SubscriptionRouter(mSubscriberMap);

mDispatcher = new MessageDispatcher();

mLocalMessages = new ThreadLocal<Queue<MessageType>>() {
protected Queue<MessageType> initialValue() {
return new ConcurrentLinkedQueue<MessageType>();
}
};

mStickyMessages = Collections.synchronizedList(new LinkedList<MessageType>());

mBusWorkThreadPool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setPriority(Thread.NORM_PRIORITY);
thread.setName("MessageBus");
return thread;
}
});
}

接下来就是事件的订阅和发布了,这里需要对事件类型做一下区分:

  • 普通事件:当事件发送时,此时已经注册的订阅者可以收到,之后注册的订阅者不会收到
  • 粘性事件:当事件发送时,此时的的订阅者可以还没注册,当注册的时候就会收到事件

普通事件

订阅过程

普通事件通过 MessageBus#register 注册,但是实际的注册能力交由 SubscriptionRouter 实现的。

1
2
3
4
5
6
7
8
9
10
11
12
public void register(Object subscriber) {
if (subscriber == null) {
return;
}
synchronized (MessageBus.this) {
try {
mRouter.addSubscriber(subscriber);
} catch (Throwable e) {
// ignore
}
}
}

SubscriptionRouter

SubscriptionRouter 只进行订阅者添加和移除,并不实际存储订阅者。订阅者实际存储在 SubscriptionRouter 构造方法中传入的 mSubcriptionMap,它实际存储在 MessageBus 中。

添加注册者的过程:

Object subscriber:订阅 class
Subscription:订阅者
MessageType:事件

  1. 判断当前 “订阅 class” 是否有缓存,如果有缓存就直接通过缓存进行注册,否则进行下面的操作
  2. 反射 “订阅 class 及其父类” :遍历其所有方法,判断该方法是否有 Subscriber 注解,如果该方法有则:
    • 获取 Method 的参数类型注解的 tag,构建 MessageType
    • 根据 MethodMessageType注解的 mode,构建 TargetMethod
    • 根据 SubscriberTargetMethod 构建 Subscription
  3. 缓存 “订阅 class”:以 MessageType -> TargetMethod 键值对的形式,放入 mCacheMap 中,避免重复反射调用
  4. 存储 “订阅者”:以 MessageType -> Subscription 键值对的形式,放入 mSubcriptionMap 中存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void addSubscriber(Object subscriber) {
if (mSubcriptionMap == null) {
// Log.d(TAG, "the mSubcriptionMap is null.");
return;
}
Class<?> clazz = subscriber.getClass();
// 缓存中有则直接用缓存
if (mCacheMap.containsKey(clazz)) {
List<MethodInfo> infos = mCacheMap.get(clazz);
for (MethodInfo info : infos) {
doAddSubscriber(info.messageType, info.targetMethod, subscriber);
}
return;
}
// 没有缓存就反射解析一下
List<MethodInfo> infos = new LinkedList<>();
while (clazz != null && !isSystemClass(clazz.getName())) {
final Method[] allMethods = clazz.getDeclaredMethods();
for (int i = 0; i < allMethods.length; i++) {
Method method = allMethods[i];
Subscriber annotation = method.getAnnotation(Subscriber.class);
if (annotation == null) {
continue;
}
Class<?>[] paramsTypeClass = method.getParameterTypes();
if (paramsTypeClass == null || paramsTypeClass.length != 1) {
continue;
}
Class<?> paramType = convertType(paramsTypeClass[0]);
MessageType messageType = new MessageType(paramType, annotation.tag());
TargetMethod subscribeMethod = new TargetMethod(method, messageType, annotation.mode());
infos.add(new MethodInfo(messageType, subscribeMethod));
doAddSubscriber(messageType, subscribeMethod, subscriber);
}
clazz = clazz.getSuperclass();
}
mCacheMap.put(subscriber.getClass(), infos);
}

private void doAddSubscriber(MessageType msgType, TargetMethod method, Object subscriber) {
CopyOnWriteArrayList<Subscription> subscriptions = mSubcriptionMap.get(msgType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<Subscription>();
}
Subscription newSubscription = new Subscription(subscriber, method);
if (subscriptions.contains(newSubscription)) {
return;
}
subscriptions.add(newSubscription);
mSubcriptionMap.put(msgType, subscriptions);
}

发布过程

普通事件通过 MessageBus#post 发布,实际上真正执行发布的是 MessageDispatcher

1
2
3
4
5
6
7
8
9
10
11
12
public void post(Object msg) {
post(msg, MessageType.DEFAULT_TAG);
}

public void post(Object msg, String tag) {
if (msg == null) {
// Log.d(TAG, "The message object is null");
return;
}
mLocalMessages.get().offer(new MessageType(msg.getClass(), tag));
mDispatcher.dispatchMessages(msg);
}

MessageDispatcher

对于 MessageDispatcher 来说,想要处理发布,需要解决两个问题:

  1. 哪些 event 需要发布 ?
  2. 这些 event 怎么发布 ?

这两个问题被 MessageBus 拆分出两个抽象接口处理:MatchPolicyBaseMessageHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void dispatchMessages(Object msg) {
Queue<MessageType> msgQueue = mLocalMessages.get();
while (msgQueue.size() > 0) {
deliveryMessage(msgQueue.poll(), msg);
}
}

private void deliveryMessage(MessageType type, Object msg) {
List<MessageType> msgTypes = getMatchedMessageTypes(type, msg);
if (msgTypes == null) {
return;
}
for (MessageType msgType : msgTypes) {
handleMessage(msgType, msg);
}
}
MatchPolicy

MatchPolicy 的 默认实现是 DefaultMatchPolicy,不仅订阅了 event 的订阅者需要被通知,订阅了 event 对应的所有父 event 也应该收到通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DefaultMatchPolicy implements MatchPolicy {

@Override
public List<MessageType> findMatchMessageTypes(MessageType messageType, Object msg) {
Class<?> messageClass = msg.getClass();
List<MessageType> result = new LinkedList<MessageType>();
while (messageClass != null) {
result.add(new MessageType(messageClass, messageType.tag));
addInterfaces(result, messageClass, messageType.tag);
messageClass = messageClass.getSuperclass();
}
return result;
}

private void addInterfaces(List<MessageType> messageTypes, Class<?> messageClass, String tag) {
if (messageClass == null) {
return;
}
Class<?>[] interfacesClasses = messageClass.getInterfaces();
for (Class<?> interfaceClass : interfacesClasses) {
if (!messageTypes.contains(interfaceClass)) {
messageTypes.add(new MessageType(interfaceClass, tag));
addInterfaces(messageTypes, interfaceClass, tag);
}
}
}
}

由于 DefaultMatchPolicy 在调用的时候存在反射的调用,所以 MessageDispatcher 在使用的工程中会做一层缓存。

1
2
3
4
5
6
7
8
9
10
private List<MessageType> getMatchedMessageTypes(MessageType type, Object msg) {
List<MessageType> msgTypes;
if (mMessageTypeCache.containsKey(type)) {
msgTypes = mMessageTypeCache.get(type);
} else {
msgTypes = mMatchPolicy.findMatchMessageTypes(type, msg);
mMessageTypeCache.put(type, msgTypes);
}
return msgTypes;
}
BaseMessageHandler

BaseMessageHandler 最常用到的实现是 UIMessageHandler,不过它其实只是将处理的过程 post 到 UI 线程,本质上还是调用 DefaultMessageHandler 实现。

1
2
3
4
5
6
7
8
9
10
private void handleMessage(MessageType msgType, Object msg) {
List<Subscription> subscriptions = mSubscriberMap.get(msgType);
if (subscriptions == null) {
return;
}
for (Subscription subscription : subscriptions) {
BaseMessageHandler messageHandler = getMessageHandler(subscription.threadMode);
messageHandler.handleMessage(subscription, msg);
}
}

DefaultMessageHandler 根据订阅者(Subscription)存储的信息,通过反射的方式调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class UIMessageHandler implements BaseMessageHandler {

private Handler mUIHandler = new Handler(Looper.getMainLooper());

DefaultMessageHandler mMessageHandler = new DefaultMessageHandler();

@Override
public void handleMessage(final Subscription subscription, final Object msg) {
mUIHandler.post(new Runnable() {
@Override
public void run() {
mMessageHandler.handleMessage(subscription, msg);
}
});
}

}

public class DefaultMessageHandler implements BaseMessageHandler {

@Override
public void handleMessage(Subscription subscription, Object msg) {
if (subscription == null) {
return;
}
Object subscriber = subscription.subscriber.get();
if (subscriber == null) {
return;
}
try {
subscription.targetMethod.invoke(subscriber, msg);
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}

粘性事件

订阅过程

由于粘性事件是滞后的,所以它的注册是晚于发布的,因此它是在发布的时候完成注册。粘性事件通过 MessageBus#postSticky 发布,处理很简单,就是将 event 包装成 MessageType,然后放入 MessageBusmStickyMessages 数据结构中存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void postSticky(Object msg) {
postSticky(msg, MessageType.DEFAULT_TAG);
}

public void postSticky(Object msg, String tag) {
if (msg == null) {
// Log.d(TAG, "The message object is null");
return;
}
MessageType messageType = new MessageType(msg.getClass(), tag);
messageType.message = msg;
mStickyMessages.add(messageType);
}

发布过程

对于粘性事件来说,真正的发布过程是在注册的时候。粘性事件通过 MessageBus#registerSticky 注册,处理如下:

  1. 和普通事件一样完成注册过程
  2. 注册后就会调用 MessageBus#dispatchStickyMessages 处理粘性事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// MessageBus
public void registerSticky(Object subscriber) {
this.register(subscriber);
// 处理sticky事件
mDispatcher.dispatchStickyMessages(subscriber);
}

// MessageDispatcher
void dispatchStickyMessages(Object subscriber) {
for (MessageType messageType : mStickyMessages) {
handleStickyMessage(messageType, subscriber);
}
}

private void handleStickyMessage(MessageType messageType, Object subscriber) {
List<MessageType> messageTypes = getMatchedMessageTypes(messageType, messageType.message);
if (messageTypes == null) {
return;
}
Object msg = messageType.message;
for (MessageType type : messageTypes) {
// Log.d(TAG, "Found message type : " + type.paramClass.getSimpleName()
// + ", message class : " + msg.getClass().getSimpleName());
final List<Subscription> subscriptions = mSubscriberMap.get(type);
if (subscriptions == null) {
continue;
}
for (Subscription subItem : subscriptions) {
final ThreadMode mode = subItem.threadMode;
BaseMessageHandler messageHandler = getMessageHandler(mode);
// 如果订阅者为空,那么该sticky事件分发给所有订阅者.否则只分发给该订阅者
if (isTarget(subItem, subscriber)
&& (subItem.messageType.equals(type)
|| subItem.messageType.paramClass.isAssignableFrom(type.paramClass))) {
messageHandler.handleMessage(subItem, msg);
}
}
}
}

private boolean isTarget(Subscription item, Object subscriber) {
Object cacheObj = item.subscriber != null ? item.subscriber.get() : null;
return subscriber == null
|| (subscriber != null && cacheObj != null && cacheObj.equals(subscriber));
}

参考