前言

Native 中遇到的 RefBase,sp,wp 是 Android 提供的智能指针,详情可以参考:Android智能指针RefBase、sp、wp解析

Java 层通过 MessageQueue 和 Native 层的 NativeMessageQueue 进行通信,NativeMessageQueue 的相关能力又是通过 Native 层的 Looper 实现。

Native 层的 Looper 不仅实现了 Java 层 MessageQueue 需要的功能,还包括一些其他处理:

  1. Native 层 Message 的发送和处理
  2. 支持 fd 监听和事件回调

Linux 相关

Native 层同时用到了一些 Linux 相关的能力,因此需要先了解一下 Linux 相关的基础能力。

I/O 多路复用

“I/O 多路复用”,又被称作 “事件驱动”,它本质上是将多个 I/O 操作共用少数几个甚至是一个线程,简单来说:

  1. I/O 多路复用是一种同步 I/O 模型,实现一个线程可以监视多个 fd (文件描述符/文件句柄)
  2. 一旦某个 fd 就绪,就能够通知应用程序进行相应的读写操作
  3. 没有 fd 就绪就会阻塞应用程序,交出 CPU

selectpollepoll 都是 Linux 下的 I/O 多路复用机制

epoll

epoll 的使用十分简单,只有下面这 3 个 api:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//头文件
#include <sys/epoll>

int epoll_create(int _size)
int epoll_ctl(int __epoll_fd, int __op, int __fd, struct epoll_event* __event)
int epoll_wait(int __epoll_fd, struct epoll_event* __events, int __event_count, int __timeout_ms)

struct epoll_event {
__uint32_t events;
epoll_data_t data;
};

typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t

epoll_create 用于创建一个 epoll fd,size 用来告诉内核监听的数目有多少

epoll_ctl 用于向 epoll 注册事件监听

  1. epoll_fd 是通过 epoll_create 方法创建的 epoll fd
  2. op 是 fd 的操作类型,有以下几种值:
    • EPOLL_CTL_ADD:注册 fd 到 epoll 中
    • EPOLL_CTL_DEL:移除 epoll 中的 fd
    • EPOLL_CTL_MOD:修改已注册的 fd
  3. fd 是需要监听的文件描述符/文件句柄
  4. epoll_event 是需要监听的事件,它是一个结构体,其中 events 的取值如下
    • EPOLLIN:表示监听的 fd 可读
    • EPOLLOUT:表示监听的 fd 可写
    • EPOLLPRI:表示监听的 fd 有紧急的数据可读
    • EPOLLERR:表示监听的 fd 发生错误
    • EPOLLHUP:表示监听的 fd 被挂断
    • EPOLLET:将 epoll 设置为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说
    • EPOLLONESHOT:只监听一次事件

epoll_wait 用于等待监听的事件回调。当调用了该方法后,就会进入阻塞状态,等待 epoll 上的 I/O 事件。若 epoll 监听的某个 fd 发生了上述的 event,epoll_wait 就会被唤醒并返回需要处理的 event 数量。若超过了等待时间,同样也会唤醒 epoll_wait 并返回 0。

  1. epoll_fd 是通过 epoll_create 方法创建的 epoll fd
  2. epoll_event 是通过内核得到的事件集合
  3. event_count 是 events 的数量
  4. timeout_ms 是超时时间,-1 表示一直超时

eventfd

epoll 需要监听一个 fd,在旧版本监听的 fd 是 pipe fd,而在新版本监听的是 eventfd
eventfd 是一个计数相关的 fd。计数不为零是有可读事件发生,read 之后计数会清零,write 则会递增计数器。
eventfd 实现了 read/write 的调用,在调用里面实现了一套计数器的逻辑。write 仅仅是加计数,read 是读计数,并且清零。

1
2
3
4
5
6
7
8
// 创建方法,initval 表示初始值
int eventfd(unsigned int initval, int flags);

// 写
ssize_t write (int fd, const void * buf, size_t count)

// 读
ssize_t read(int fd, const void * buf, size_t count);

JNI 注册

Java 层的方法和 Native 层的方法想要建立联系有两种方式,一种是自动绑定,一种是手动绑定。framworks 层因为效率问题使用的是手动绑定的方式,在启动的时候 就会调用 register_jni_procs 方法进行绑定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// AndroidRuntime.cpp 
static const RegJNIRec gRegJNI[] = {
//...
REG_JNI(register_android_os_MessageQueue),
//...
}

int AndroidRuntime::startReg(JNIEnv* env) {
//...
if (register_jni_procs(gRegJNI, NELEM(gRegJNI), env) < 0) {
env->PopLocalFrame(NULL);
return -1;
}
//...
return 0;
}

Java 层的 MessageQueue 在注册的时候:

  1. 确定 Java 层的方法:nativeInitnativeDestroynativePollOncenativeWakenativeIsPollingnativeSetFileDescriptorEvents 对应调用的 Native 层方法
  2. Native 层保留对 Java 层 mPtr 字段和 dispatchEvents 方法的引用,便于后续 Native 层调用 Java 层代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static const JNINativeMethod gMessageQueueMethods[] = {
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V", (void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};

int register_android_os_MessageQueue(JNIEnv* env) {
int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods, NELEM(gMessageQueueMethods));
jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");
gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");
gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz, "dispatchEvents", "(II)I");
return res;
}

Native 消息

MessageEnvelope

MessageEnvelope 类似于 Java 层的 Message

  1. uptime:消息执行的时间
  2. handler:消息执行者,类似于 Java 层的 Handler
  3. message:消息标志,类似于 Java 层的 Message#what 字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }

MessageEnvelope(nsecs_t u, const sp<MessageHandler> h,
const Message& m) : uptime(u), handler(h), message(m) {
}

nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};

struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }

int what;
};

class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();

public:
virtual void handleMessage(const Message& message) = 0;
};

class WeakMessageHandler : public MessageHandler {
protected:
virtual ~WeakMessageHandler();

public:
WeakMessageHandler(const wp<MessageHandler>& handler);
virtual void handleMessage(const Message& message);

private:
wp<MessageHandler> mHandler;
};

发送

Native 发送消息的方法有很多,但是最终都会调用到 Looper#sendMessageAtTime 方法。在该方法中会根据 uptime 插入到 Looper#mMessageEnvelopes 集合中。
如果插入的消息是在头部,会和 Java 层一样调用 Looper#wake 方法唤醒队列。Native 消息处理的时机和 Java 层消息处理的时机一致,一起放到后续分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler, const Message& message) {
//...
size_t i = 0;
{
//...
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

if (mSendingMessage) {
return;
}
}

if (i == 0) {
wake();
}
}

fd 监听回调

fd 监听

关于 fd 的监听,又需要回到 Java 层的 MessageQueue 中,它提供了 addOnFileDescriptorEventListener/removeOnFileDescriptorEventListener 方法用于添加/移除对指定 fd 上操作的监听。
这两个方法最终会调用到 Native 方法 nativeSetFileDescriptorEvents,它会调用 NativeMessageQueue#setFileDescriptorEvents 方法。
首先将 Java 层传入的 events 转化成 Native 层的 events,之后就会调用 Looper#addFd 方法,传入的 LooperCallback 就是当前对象 NativeMessageQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz, jlong ptr, jint fd, jint events) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd, events);
}

void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
if (events) {
int looperEvents = 0;
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this, reinterpret_cast<void*>(events));
} else {
mLooper->removeFd(fd);
}
}

// Looper.cpp ident
enum {
// 唤醒
POLL_WAKE = -1,
// 回调
POLL_CALLBACK = -2,
// 超时
POLL_TIMEOUT = -3,
// 出错
POLL_ERROR = -4,
}

// Looper.cpp events
enum {
// 可读
EVENT_INPUT = 1 << 0,
// 可写
EVENT_OUTPUT = 1 << 1,
// 发生错误
EVENT_ERROR = 1 << 2,
// 中断
EVENT_HANGUP = 1 << 3,
// 不合法
EVENT_INVALID = 1 << 4,
}

Looper#addFd 的实现如下,这里会将传入的信息封装成 Request,然后会通过 epoll 监听传入的 fd 指定的事件,这个 Request 会被存储在 Looper#mRequests 集合中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// Looper.cpp
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
//...

{
//...
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0;

struct epoll_event eventItem;
request.initEventItem(&eventItem);

ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
// ...
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
// ...
}
}
return 1;
}

int Looper::removeFd(int fd, int seq) {
//...
{
//...
mRequests.removeItemsAt(requestIndex);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_DEL, fd, nullptr);
//...
}
return 1;
}

这里需要对涉及到的三个数据结构:

  1. Request:注册监听 fd 上事件时,会将相关参数包装成 Request
  2. Response:因为可能有多个 Request,所以在处理的时候会先包装成 Response 缓存到一个集合中,到最后统一处理。Request#events 指的是想要监听的 fd 事件,Response#events 指的是fd 上发生的事件。
  3. LooperCallback:fd 上发生事件的回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct Request {
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;

void initEventItem(struct epoll_event* eventItem) const;
};

struct Response {
int events;
Request request;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback();

public:
virtual int handleEvent(int fd, int events, void* data) = 0;
};

class SimpleLooperCallback : public LooperCallback {
protected:
virtual ~SimpleLooperCallback();

public:
SimpleLooperCallback(Looper_callbackFunc callback);
virtual int handleEvent(int fd, int events, void* data);

private:
Looper_callbackFunc mCallback;
};

fd 回调

由于 Looper#addFd 传入的 LooperCallback 就是 NativeMessageQueue,所以具体执行回调的代码就在 NativeMessageQueu#handleEvent 方法中。
在其中首先会将 events 转化成 Java 层的 events,然后通过 gMessageQueueClassInfo.dispatchEvents 保存的 Java 方法引用,调用 Java 层的 MessageQueue#dispatchEvents 方法。
Fd 回调实际和 Java 层消息处理的时机一致,一起放到后续分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
int events = 0;
if (looperEvents & Looper::EVENT_INPUT) {
events |= CALLBACK_EVENT_INPUT;
}
if (looperEvents & Looper::EVENT_OUTPUT) {
events |= CALLBACK_EVENT_OUTPUT;
}
if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
events |= CALLBACK_EVENT_ERROR;
}
int oldWatchedEvents = reinterpret_cast<intptr_t>(data);
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj, gMessageQueueClassInfo.dispatchEvents, fd, events);
if (!newWatchedEvents) {
return 0;
}
if (newWatchedEvents != oldWatchedEvents) {
setFileDescriptorEvents(fd, newWatchedEvents);
}
return 1;
}

Native 处理

Native 的核心部分就是被 Java 层调用的四个方法:

  • nativeInit:初始化
  • nativeWake:唤醒
  • nativePollOnce:阻塞
  • nativeDestroy:销毁

初始化

MessageQueue 初始化的时候调用了 nativeInit 方法,它会调用到 Native 层,其中会初始化 NativeMessageQueue 对象,并将其指针或者说地址返回,保存在 Java 层的 MessageQueue#mPtr 字段中,用于后续 Java 层对 Native 层的调用。

1
2
3
4
5
6
7
// android_os_MessageQueue.cpp
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
//...
nativeMessageQueue->incStrong(env);
return reinterpret_cast<jlong>(nativeMessageQueue);
}

NativeMessageQueue

NativeMessageQueue 继承于 MessageQueue,这个 MessageQueue 是 Native 层的,和 Java 层的没有关系,只是同名而已。
NativeMessageQueue 构造方法中会初始化 Looper,这个 Looper 也是 Native 层的,和 Java 层的只是同名而已。
Looper 创建方式和 Java 层类似,Looper::getForThreadLooper::setForThread 中通过使用 pthread_getspecificpthread_setspecific 达到了和 Java 层的 ThreadLocal 一样的作用。

1
2
3
4
5
6
7
8
// android_os_MessageQueue.cpp
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Looper.cpp
void Looper::setForThread(const sp<Looper>& looper) {
sp<Looper> old = getForThread();

if (looper != nullptr) {
looper->incStrong((void*)threadDestructor);
}

pthread_setspecific(gTLSKey, looper.get());

if (old != nullptr) {
old->decStrong((void*)threadDestructor);
}
}

sp<Looper> Looper::getForThread() {
int result = pthread_once(& gTLSOnce, initTLSKey);
LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");

return (Looper*)pthread_getspecific(gTLSKey);
}

Looper

Looper 构造方法中会创建 eventfd,并将其保存在 mWakeEventFd 字段中,同时会调用 rebuildEpollLocked 方法构建 epoll 来监听这个 eventfd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {

mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
//...
rebuildEpollLocked();
}

rebuildEpollLocked 方法中通过 epoll_create1 创建 epoll 实例,并将其保存在 mEpollFd 字段中。
通过 epoll_ctl 方法将 mWakeEventFd 注册进 mEpollFd 中,监听其可读事件 EPOLLIN,同时会将已经存储在 mRequests 中的 fd 注册进 mEpollFd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Looper.cpp
void Looper::rebuildEpollLocked() {
//...
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
//...
struct epoll_event eventItem;
//...
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
//...

for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);

int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
//...
}
}

唤醒

这里的唤醒指的是通过向 mWakeEventFd 写入事件实现,Java 层调用 nativeWake 会导致系统向 mWakeEventFd 中写入 1,如果此时被阻塞,对应的 epoll_wait 调用处就会被唤醒来处理。

1
2
3
4
5
6
7
8
9
// android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
mLooper->wake();
}

wake

1
2
3
4
5
6
7
// Looper.cpp
void Looper::wake() {
//...
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
//...
}

阻塞

Java 层调用 MessageQueue#next 获取 Message 时,会调用到MessageQueue#nativePollOnce 方法,它会调用到 Native 层的 NativeMessageQueue#pollOnce 方法, 最终会调用到 Looper#pollOnce 方法。

1
2
3
4
5
6
7
8
9
10
11
12
// android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
//...
mLooper->pollOnce(timeoutMillis);
//...
}

pollOnce

  1. pollOnce 首先处理没有设置 callback 的 Response(ident > 0,POLL_CALLBACK = -2),第一次进入 mResponses 集合其实空的,具体的 Response 会在 Looper#pollInner 方法中插入。
  2. 调用 Looper#pollInner 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
//...
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}

if (result != 0) {
//
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}

result = pollInner(timeoutMillis);
}
}

pollInner

  1. 确定 epoll_wait 的超时时间,timeoutMillis 是 Java 层最新消息的间隔时间,messageTimeoutMillis 是 Native 层最新消息的间隔时间,取其中最短的值作为超时时间。
  2. 调用 epoll_wait 方法阻塞当前调用,等待手动唤醒或者时间超时。
  3. 如果 eventCount < 0,说明发生错误,直接跳转到 Done 。
  4. 如果 eventCount = 0,说明发生超时,直接跳转到 Done。
  5. 如果 eventCount > 0,说明 epoll 监听的 fd 指定事件发生了,遍历 epoll_event 数组:
    • 如果事件发生的 fd 是 mWakeEventFd,并且是 EPOLLIN 事件,那么就会调用 awoken 读取 mWakeEventFd 中的所有数据。
    • 如果事件发生的 fd 不是 mWakeEventFd,就会将发生的事件和对应的 Request 包装成 Response,存储到 mResponses 集合中。
  6. 执行 Done 模块:
    • 遍历 mMessageEnvelopes 集合,处理所有满足条件的 Native 消息,通过 MessageHandler#handleMessage 方法执行消息。
    • 遍历 mResponses 集合,处理所有含有 callback 的 fd 监听,通过 LooperCallback#handleEvent 方法回调 fd 上发生的事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
// Looper.cpp
int Looper::pollInner(int timeoutMillis) {
//...
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
//...
}
//...
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//...
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
//...
result = POLL_ERROR;
goto Done;
}
//...
if (eventCount == 0) {
//...
result = POLL_TIMEOUT;
goto Done;
}
//...
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
}
//...
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
}
//...
}
}
Done: ;
//...
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
//...
{
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
//...
handler->handleMessage(message);
} // release handler
//...
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

//...
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
//...
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}

awoken

1
2
3
4
5
6
// Looper.cpp
void Looper::awoken() {
//...
uint64_t counter;
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}

pushResponse

1
2
3
4
5
6
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}

销毁

释放掉 NativeMessageQueue 的引用

1
2
3
4
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->decStrong(env);
}

参考

管道

eventfd

epoll

I/O 多路复用