- 一、MessageQueue 的 Java 层机制
- 二、MessageQueue 的 native 层阻塞机制
- 三、MessageQueue 的 native 层解除阻塞机制
- 三、MessageQueue 的 native 层 JNI 方法动态注册
- 三、MessageQueue 的 native 层完整代码 android_os_MessageQueue.cpp
之前在 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 ) 中 , 模仿 Android 的 MessageQueue 手写的 MessageQueue , 使用了如下同步机制 ,
从 消息队列 MessageQueue 中取出 消息 Message ,
如果当前链表为空 , 此时会 调用 wait 方法阻塞 , 直到消息入队时 , 链表中有了元素 , 会调用 notify 解除该阻塞 ;
在实际的 Android 中的 消息队列 MessageQueue 的同步机制 是在 native 层实现 的 ;
在创建 消息队列 MessageQueue 时 , 调用了 nativeInit() 方法 , 销毁 MessageQueue 时调用 nativeDestroy 方法 ;
如果调用 next 获取下一个消息时 , 如果当前消息队列 MessageQueue 中没有消息 , 此时需要阻塞 , 调用 nativePollOnce 即可实现在 native 阻塞线程 ;
// 初始化 MessageQueue 时调用的方法
private native static long nativeInit();
// 销毁 MessageQueue 时调用的方法
private native static void nativeDestroy(long ptr);
// 线程阻塞方法
@UnsupportedAppUsage
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
// 线程唤醒方法
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);
// 此处初始化 MessageQueue , 调用了 nativeInit 方法
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}
// 获取消息队列中的下一个消息
@UnsupportedAppUsage
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
int pendingIdleHandlerCount = -1; // -1 only during first iteration
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 此处阻塞线程
nativePollOnce(ptr, nextPollTimeoutMillis);
}
}
二、MessageQueue 的 native 层阻塞机制
线程阻塞方法 private native void nativePollOnce(long ptr, int timeoutMillis) , 是 native 方法 , 该方法在 frameworks/base/core/jni/android_os_MessageQueue.cpp 中实现 ,
从 Java 层传入 long 类型 , 然后转为 NativeMessageQueue* 类型指针 ,
该 Java 层传入的 long 类型是初始化消息队列时 , 由 nativeInit 方法返回 , 是 消息队列在 Native 层的指针 ,
之后 NativeMessageQueue 指针调用了其本身的 pollOnce 函数 , 该函数中 , 主要调用了 Looper 的 pollOnce 函数 , mLooper->pollOnce(timeoutMillis) ;
frameworks/base/core/jni/android_os_MessageQueue.cpp 中阻塞相关源码 :
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 这是主要操作
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
继续查看 Native 层 Looper.cpp 的 pollOnce 方法 ,
Looper.cpp 源码路径是 system/core/libutils/Looper.cpp ,
在该方法中 , 最终调用 Looper.cpp 的 pollInner 方法 ,
在 pollInner 方法中 , 调用了 epoll_wait 方法 , 该方法就是等待方法 , 在该方法中会监听 mEpollFd 文件句柄 ,
#include
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
参考 : epoll_wait
system/core/libutils/Looper.cpp 中阻塞相关源码 :
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex = 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
// ...
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
}
参考 : system/core/libutils/Looper.cpp
三、MessageQueue 的 native 层解除阻塞机制在 MessageQueue 消息队列的 Java 层 , 将 Message 消息插入到链表表头后 , 调用了 nativeWake 方法 , 唤醒了线程 , 即解除了阻塞 ;
public final class MessageQueue {
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when wake();
}
参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp
查看 system/core/libutils/Looper.cpp 中的 wake 方法 , 在该方法中 ,
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t))) 代码说明 ,
向 mWakeEventFd 文件句柄写入了数据 ;
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d: %s",
mWakeEventFd, strerror(errno));
}
}
}
参考 : system/core/libutils/Looper.cpp
阻塞的时候使用的是 mEpollFd 文件句柄 ,
唤醒的时候使用的是 mWakeEventFd 文件句柄 ,
下面分析这两个文件句柄之间的联系 ;
Looper 构造函数 , 调用了 rebuildEpollLocked() 方法 ,
在 rebuildEpollLocked 方法 中调用 mEpollFd = epoll_create(EPOLL_SIZE_HINT) , 创建了一个句柄 ,
调用 int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem) 注册事件监听 ,
注册 mEpollFd 句柄 , 监听 mWakeEventFd 句柄的 eventItem 事件 ,
监听的事件是 eventItem.events = EPOLLIN 事件 ,
该事件代表 , 向 mWakeEventFd 文件句柄写入数据 , 此时就对应解除 epoll_wait 阻塞 ;
system/core/libutils/Looper.cpp 中 Looper 构造函数 , rebuildEpollLocked 方法 :
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
LOG_ALWAYS_FATAL_IF(mWakeEventFd = 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
close(mEpollFd);
}
// Allocate the new epoll instance and register the wake pipe.
// 创建句柄
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
void NativeMessageQueue::wake() {
mLooper->wake();
}
void NativeMessageQueue::setFileDescriptorEvents(int fd, int events) {
if (events) {
int looperEvents = 0;
if (events & CALLBACK_EVENT_INPUT) {
looperEvents |= Looper::EVENT_INPUT;
}
if (events & CALLBACK_EVENT_OUTPUT) {
looperEvents |= Looper::EVENT_OUTPUT;
}
mLooper->addFd(fd, Looper::POLL_CALLBACK, looperEvents, this,
reinterpret_cast(events));
} else {
mLooper->removeFd(fd);
}
}
int NativeMessageQueue::handleEvent(int fd, int looperEvents, void* data) {
int events = 0;
if (looperEvents & Looper::EVENT_INPUT) {
events |= CALLBACK_EVENT_INPUT;
}
if (looperEvents & Looper::EVENT_OUTPUT) {
events |= CALLBACK_EVENT_OUTPUT;
}
if (looperEvents & (Looper::EVENT_ERROR | Looper::EVENT_HANGUP | Looper::EVENT_INVALID)) {
events |= CALLBACK_EVENT_ERROR;
}
int oldWatchedEvents = reinterpret_cast(data);
int newWatchedEvents = mPollEnv->CallIntMethod(mPollObj,
gMessageQueueClassInfo.dispatchEvents, fd, events);
if (!newWatchedEvents) {
return 0; // unregister the fd
}
if (newWatchedEvents != oldWatchedEvents) {
setFileDescriptorEvents(fd, newWatchedEvents);
}
return 1;
}
// ----------------------------------------------------------------------------
sp android_os_MessageQueue_getMessageQueue(JNIEnv* env, jobject messageQueueObj) {
jlong ptr = env->GetLongField(messageQueueObj, gMessageQueueClassInfo.mPtr);
return reinterpret_cast(ptr);
}
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);
return reinterpret_cast(nativeMessageQueue);
}
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
nativeMessageQueue->decStrong(env);
}
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
nativeMessageQueue->wake();
}
static jboolean android_os_MessageQueue_nativeIsPolling(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
return nativeMessageQueue->getLooper()->isPolling();
}
static void android_os_MessageQueue_nativeSetFileDescriptorEvents(JNIEnv* env, jclass clazz,
jlong ptr, jint fd, jint events) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
nativeMessageQueue->setFileDescriptorEvents(fd, events);
}
// ----------------------------------------------------------------------------
// 动态注册 JNI 函数的结构体
// 每个结构体中的元素是 Java 方法名称 , 方法签名 , C++ 中的方法指针
static const JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "(J)V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(J)V", (void*)android_os_MessageQueue_nativeWake },
{ "nativeIsPolling", "(J)Z", (void*)android_os_MessageQueue_nativeIsPolling },
{ "nativeSetFileDescriptorEvents", "(JII)V",
(void*)android_os_MessageQueue_nativeSetFileDescriptorEvents },
};
// 动态注册 JNI 函数
int register_android_os_MessageQueue(JNIEnv* env) {
int res = RegisterMethodsOrDie(env, "android/os/MessageQueue", gMessageQueueMethods,
NELEM(gMessageQueueMethods));
jclass clazz = FindClassOrDie(env, "android/os/MessageQueue");
gMessageQueueClassInfo.mPtr = GetFieldIDOrDie(env, clazz, "mPtr", "J");
gMessageQueueClassInfo.dispatchEvents = GetMethodIDOrDie(env, clazz,
"dispatchEvents", "(II)I");
return res;
}
} // namespace android
参考 : frameworks/base/core/jni/android_os_MessageQueue.cpp