前言
MessageBus
类似于 EventBus
,是一个消息总线框架,常用于处理跨模块通信问题。它基于订阅者模式实现,主要角色如下:
- 订阅者:在
MessageBus
中的订阅者是一个个被 Subscriber
注解标记的方法,被 Subscriber
标记的方法只能有一个参数:event
。
- 订阅 class:含有被
Subscriber
注解标记方法的类,一个订阅者 class 可能有多个订阅者。订阅 class 需要通过 register
方法告诉调度中心它有哪些订阅者,否则订阅者就无法收到对应的 event
。
- 调度中心:
- 接受订阅 class 的注册,将其中所有的订阅者存储起来
- 提供事件发送能力,并将事件通知给对应的订阅者
- 发布者:一般位于业务中,通过
MessageBus
发送指定的 event
,使得订阅者可以执行

MessageBus
初始化
MessageBus
的作用决定了它的初始化方式,由于 MessageBus
需要在全局获得,所以它采用了双检锁单例模式。
1 2 3 4 5 6 7 8 9 10
| public static MessageBus getInstance() { if (sBus == null) { synchronized (MessageBus.class) { if (sBus == null) { sBus = new MessageBus(); } } } return sBus; }
|
MessageBus 的构造方法如下,涉及到的类职责:
- mSubscriberMap:实际存储订阅者的数据结构
- SubscriptionRouter:管理订阅者的类
- MessageDispatcher:处理事件分发的类
- mLocalMessages:个人觉得没啥作用
- mStickyMessages:存储粘性事件的数据结构
- mBusWorkThreadPool:用于异步 register 的线程池

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public MessageBus() {
mSubscriberMap = new ConcurrentHashMap<MessageType, CopyOnWriteArrayList<Subscription>>();
mRouter = new SubscriptionRouter(mSubscriberMap);
mDispatcher = new MessageDispatcher();
mLocalMessages = new ThreadLocal<Queue<MessageType>>() { protected Queue<MessageType> initialValue() { return new ConcurrentLinkedQueue<MessageType>(); } };
mStickyMessages = Collections.synchronizedList(new LinkedList<MessageType>());
mBusWorkThreadPool = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setPriority(Thread.NORM_PRIORITY); thread.setName("MessageBus"); return thread; } }); }
|
接下来就是事件的订阅和发布了,这里需要对事件类型做一下区分:
- 普通事件:当事件发送时,此时已经注册的订阅者可以收到,之后注册的订阅者不会收到
- 粘性事件:当事件发送时,此时的的订阅者可以还没注册,当注册的时候就会收到事件
普通事件
订阅过程
普通事件通过 MessageBus#register
注册,但是实际的注册能力交由 SubscriptionRouter
实现的。
1 2 3 4 5 6 7 8 9 10 11 12
| public void register(Object subscriber) { if (subscriber == null) { return; } synchronized (MessageBus.this) { try { mRouter.addSubscriber(subscriber); } catch (Throwable e) { } } }
|
SubscriptionRouter
SubscriptionRouter
只进行订阅者添加和移除,并不实际存储订阅者。订阅者实际存储在 SubscriptionRouter
构造方法中传入的 mSubcriptionMap
,它实际存储在 MessageBus
中。
添加注册者的过程:
Object subscriber:订阅 class
Subscription:订阅者
MessageType:事件
- 判断当前 “订阅 class” 是否有缓存,如果有缓存就直接通过缓存进行注册,否则进行下面的操作
- 反射 “订阅 class 及其父类” :遍历其所有方法,判断该方法是否有
Subscriber
注解,如果该方法有则:
- 获取 Method 的参数类型 和 注解的 tag,构建
MessageType
- 根据 Method,MessageType 和 注解的 mode,构建
TargetMethod
- 根据 Subscriber 和 TargetMethod 构建
Subscription
- 缓存 “订阅 class”:以 MessageType -> TargetMethod 键值对的形式,放入
mCacheMap
中,避免重复反射调用
- 存储 “订阅者”:以 MessageType -> Subscription 键值对的形式,放入
mSubcriptionMap
中存储

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public void addSubscriber(Object subscriber) { if (mSubcriptionMap == null) { return; } Class<?> clazz = subscriber.getClass(); if (mCacheMap.containsKey(clazz)) { List<MethodInfo> infos = mCacheMap.get(clazz); for (MethodInfo info : infos) { doAddSubscriber(info.messageType, info.targetMethod, subscriber); } return; } List<MethodInfo> infos = new LinkedList<>(); while (clazz != null && !isSystemClass(clazz.getName())) { final Method[] allMethods = clazz.getDeclaredMethods(); for (int i = 0; i < allMethods.length; i++) { Method method = allMethods[i]; Subscriber annotation = method.getAnnotation(Subscriber.class); if (annotation == null) { continue; } Class<?>[] paramsTypeClass = method.getParameterTypes(); if (paramsTypeClass == null || paramsTypeClass.length != 1) { continue; } Class<?> paramType = convertType(paramsTypeClass[0]); MessageType messageType = new MessageType(paramType, annotation.tag()); TargetMethod subscribeMethod = new TargetMethod(method, messageType, annotation.mode()); infos.add(new MethodInfo(messageType, subscribeMethod)); doAddSubscriber(messageType, subscribeMethod, subscriber); } clazz = clazz.getSuperclass(); } mCacheMap.put(subscriber.getClass(), infos); }
private void doAddSubscriber(MessageType msgType, TargetMethod method, Object subscriber) { CopyOnWriteArrayList<Subscription> subscriptions = mSubcriptionMap.get(msgType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<Subscription>(); } Subscription newSubscription = new Subscription(subscriber, method); if (subscriptions.contains(newSubscription)) { return; } subscriptions.add(newSubscription); mSubcriptionMap.put(msgType, subscriptions); }
|
发布过程
普通事件通过 MessageBus#post
发布,实际上真正执行发布的是 MessageDispatcher
1 2 3 4 5 6 7 8 9 10 11 12
| public void post(Object msg) { post(msg, MessageType.DEFAULT_TAG); }
public void post(Object msg, String tag) { if (msg == null) { return; } mLocalMessages.get().offer(new MessageType(msg.getClass(), tag)); mDispatcher.dispatchMessages(msg); }
|
MessageDispatcher
对于 MessageDispatcher
来说,想要处理发布,需要解决两个问题:
- 哪些 event 需要发布 ?
- 这些 event 怎么发布 ?
这两个问题被 MessageBus
拆分出两个抽象接口处理:MatchPolicy
和 BaseMessageHandler

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| void dispatchMessages(Object msg) { Queue<MessageType> msgQueue = mLocalMessages.get(); while (msgQueue.size() > 0) { deliveryMessage(msgQueue.poll(), msg); } }
private void deliveryMessage(MessageType type, Object msg) { List<MessageType> msgTypes = getMatchedMessageTypes(type, msg); if (msgTypes == null) { return; } for (MessageType msgType : msgTypes) { handleMessage(msgType, msg); } }
|
MatchPolicy
MatchPolicy
的 默认实现是 DefaultMatchPolicy
,不仅订阅了 event
的订阅者需要被通知,订阅了 event
对应的所有父 event
也应该收到通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class DefaultMatchPolicy implements MatchPolicy {
@Override public List<MessageType> findMatchMessageTypes(MessageType messageType, Object msg) { Class<?> messageClass = msg.getClass(); List<MessageType> result = new LinkedList<MessageType>(); while (messageClass != null) { result.add(new MessageType(messageClass, messageType.tag)); addInterfaces(result, messageClass, messageType.tag); messageClass = messageClass.getSuperclass(); } return result; }
private void addInterfaces(List<MessageType> messageTypes, Class<?> messageClass, String tag) { if (messageClass == null) { return; } Class<?>[] interfacesClasses = messageClass.getInterfaces(); for (Class<?> interfaceClass : interfacesClasses) { if (!messageTypes.contains(interfaceClass)) { messageTypes.add(new MessageType(interfaceClass, tag)); addInterfaces(messageTypes, interfaceClass, tag); } } } }
|
由于 DefaultMatchPolicy
在调用的时候存在反射的调用,所以 MessageDispatcher
在使用的工程中会做一层缓存。
1 2 3 4 5 6 7 8 9 10
| private List<MessageType> getMatchedMessageTypes(MessageType type, Object msg) { List<MessageType> msgTypes; if (mMessageTypeCache.containsKey(type)) { msgTypes = mMessageTypeCache.get(type); } else { msgTypes = mMatchPolicy.findMatchMessageTypes(type, msg); mMessageTypeCache.put(type, msgTypes); } return msgTypes; }
|
BaseMessageHandler
BaseMessageHandler
最常用到的实现是 UIMessageHandler
,不过它其实只是将处理的过程 post 到 UI 线程,本质上还是调用 DefaultMessageHandler
实现。
1 2 3 4 5 6 7 8 9 10
| private void handleMessage(MessageType msgType, Object msg) { List<Subscription> subscriptions = mSubscriberMap.get(msgType); if (subscriptions == null) { return; } for (Subscription subscription : subscriptions) { BaseMessageHandler messageHandler = getMessageHandler(subscription.threadMode); messageHandler.handleMessage(subscription, msg); } }
|
DefaultMessageHandler
根据订阅者(Subscription
)存储的信息,通过反射的方式调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class UIMessageHandler implements BaseMessageHandler {
private Handler mUIHandler = new Handler(Looper.getMainLooper());
DefaultMessageHandler mMessageHandler = new DefaultMessageHandler();
@Override public void handleMessage(final Subscription subscription, final Object msg) { mUIHandler.post(new Runnable() { @Override public void run() { mMessageHandler.handleMessage(subscription, msg); } }); }
}
public class DefaultMessageHandler implements BaseMessageHandler {
@Override public void handleMessage(Subscription subscription, Object msg) { if (subscription == null) { return; } Object subscriber = subscription.subscriber.get(); if (subscriber == null) { return; } try { subscription.targetMethod.invoke(subscriber, msg); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } }
|
粘性事件
订阅过程
由于粘性事件是滞后的,所以它的注册是晚于发布的,因此它是在发布的时候完成注册。粘性事件通过 MessageBus#postSticky
发布,处理很简单,就是将 event
包装成 MessageType
,然后放入 MessageBus
的 mStickyMessages
数据结构中存储。
1 2 3 4 5 6 7 8 9 10 11 12 13
| public void postSticky(Object msg) { postSticky(msg, MessageType.DEFAULT_TAG); }
public void postSticky(Object msg, String tag) { if (msg == null) { return; } MessageType messageType = new MessageType(msg.getClass(), tag); messageType.message = msg; mStickyMessages.add(messageType); }
|
发布过程
对于粘性事件来说,真正的发布过程是在注册的时候。粘性事件通过 MessageBus#registerSticky
注册,处理如下:
- 和普通事件一样完成注册过程
- 注册后就会调用
MessageBus#dispatchStickyMessages
处理粘性事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public void registerSticky(Object subscriber) { this.register(subscriber); mDispatcher.dispatchStickyMessages(subscriber); }
void dispatchStickyMessages(Object subscriber) { for (MessageType messageType : mStickyMessages) { handleStickyMessage(messageType, subscriber); } }
private void handleStickyMessage(MessageType messageType, Object subscriber) { List<MessageType> messageTypes = getMatchedMessageTypes(messageType, messageType.message); if (messageTypes == null) { return; } Object msg = messageType.message; for (MessageType type : messageTypes) { final List<Subscription> subscriptions = mSubscriberMap.get(type); if (subscriptions == null) { continue; } for (Subscription subItem : subscriptions) { final ThreadMode mode = subItem.threadMode; BaseMessageHandler messageHandler = getMessageHandler(mode); if (isTarget(subItem, subscriber) && (subItem.messageType.equals(type) || subItem.messageType.paramClass.isAssignableFrom(type.paramClass))) { messageHandler.handleMessage(subItem, msg); } } } }
private boolean isTarget(Subscription item, Object subscriber) { Object cacheObj = item.subscriber != null ? item.subscriber.get() : null; return subscriber == null || (subscriber != null && cacheObj != null && cacheObj.equals(subscriber)); }
|
参考