首页 技术 正文
技术 2022年11月14日
0 收藏 341 点赞 3,192 浏览 22114 个字

接着上一篇文章Android消息机制不完全解析(上),接着看C++部分的实现。

首先,看看在/frameworks/base/core/jni/android_os_MessageQueue.cpp文件中看看android.os.MessageQueue类中的四个原生函数的实现:

static void android_os_MessageQueue_nativeInit(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//构造NativeMessageQueue实例
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return;
}
nativeMessageQueue->incStrong(env);//强引用+1
android_os_MessageQueue_setNativeMessageQueue(env, obj, nativeMessageQueue);
}static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jobject obj) {
NativeMessageQueue* nativeMessageQueue =
android_os_MessageQueue_getNativeMessageQueue(env, obj);
if (nativeMessageQueue) {
android_os_MessageQueue_setNativeMessageQueue(env, obj, NULL);
nativeMessageQueue->decStrong(env);//强引用-1,实际上会导致释放NativeMessageQueue实例
}
}static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jint ptr, jint timeoutMillis) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);//指针强制转换
nativeMessageQueue->pollOnce(env, timeoutMillis);//调用nativeMessageQueue的pollonce函数
}static void android_os_MessageQueue_nativeWake(JNIEnv* env, jobject obj, jint ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
return nativeMessageQueue->wake();//调用nativeMessageQueue的wake函数
}

从代码中,可以看到这四个函数的实现都是依赖于NativeMessageQueue类。不过,在开始解析NativeMessageQueue之前,我们再看一些有意思的代码:

static void android_os_MessageQueue_setNativeMessageQueue(JNIEnv* env, jobject messageQueueObj,
NativeMessageQueue* nativeMessageQueue) {
env->SetIntField(messageQueueObj, gMessageQueueClassInfo.mPtr,
reinterpret_cast<jint>(nativeMessageQueue));//把nativeMessageQueue的实例地址强转为java的int类型并保存到gMessageQueueClassInfo.mPtr中
}

那么gMessageQueueClassInfo.mPtr是什么呢?

static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()V", (void*)android_os_MessageQueue_nativeInit },
{ "nativeDestroy", "()V", (void*)android_os_MessageQueue_nativeDestroy },
{ "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
{ "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake }
};#define FIND_CLASS(var, className) \
var = env->FindClass(className); \
LOG_FATAL_IF(! var, "Unable to find class " className);#define GET_FIELD_ID(var, clazz, fieldName, fieldDescriptor) \
var = env->GetFieldID(clazz, fieldName, fieldDescriptor); \
LOG_FATAL_IF(! var, "Unable to find field " fieldName);
//这个函数在Android启动的时候,会被系统调用
int register_android_os_MessageQueue(JNIEnv* env) {
int res = jniRegisterNativeMethods(env, "android/os/MessageQueue",
gMessageQueueMethods, NELEM(gMessageQueueMethods));//关联MessageQueueQueue的原生函数
LOG_FATAL_IF(res < 0, "Unable to register native methods."); jclass clazz;
FIND_CLASS(clazz, "android/os/MessageQueue");//获取MessageQueue的class GET_FIELD_ID(gMessageQueueClassInfo.mPtr, clazz,
"mPtr", "I");//获取MessageQueue class的mPtr field的Id return 0;
}

上面的代码很像java的反射有木有?

    Class cls = Class.forName("android.os.MessageQueue");
Field feild = cls.getField("mPtr");

到这里,我们就明白了android_os_MessageQueue_setNativeMessageQueue函数实际上把android.os.MessageQueue实例的mPtr值设置为nativeMessageQueue实例的地址。虽然Java语言没有指针的说法,但是,这里的mPtr却的的确确是作为一个指针使用的。现在,我们也就理解了,为什么mPtr可以被强制转换为nativeMessageQueue了。

小结:

  1. android_os_MessageQueue_nativeInit和android_os_MessageQueue_nativeDestory两个函数做了些什么:
    • android_os_MessageQueue_nativeInit:构造NativeMessageQueue实例
    • android_os_MessageQueue_nativeDestory:销毁NativeMessageQeue实例

NativeMessageQueue

    先来看看NativeMessageQueue的声明:

class NativeMessageQueue : public MessageQueue {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue(); virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj); void pollOnce(JNIEnv* env, int timeoutMillis); void wake();private:
bool mInCallback;
jthrowable mExceptionObj;
};

NativeMessageQueue继承自MessageQueue(不是java中的android.os.MessageQueue哦),关于MessageQueue我们只需要了解,它包含了一个成员mLooper即可(有兴趣的同学可以查看/frameworks/base/core/jni/android_os_MessageQueue.h)

class MessageQueue  {    ......protected:
sp<Looper> mLooper;
};

继续看NativeMessageQueue的代码:

NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);//实例化mLooper
Looper::setForThread(mLooper);
}
}

NativeMessageQueue构造实例的时候,会实例化mLooper。

void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
mInCallback = true;
mLooper->pollOnce(timeoutMillis);
mInCallback = false;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}void NativeMessageQueue::wake() {
mLooper->wake();
}

小结:

  1. NativeMessageQueue的函数pollonce和wake实现相当简单,交给mLooper的同名函数。

Looper

先来看看Looper的声明/frameworks/native/include/utils/Looper.h:

class Looper : public ALooper, public RefBase {
protected:
virtual ~Looper();public:
Looper(bool allowNonCallbacks); bool getAllowNonCallbacks() const; int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
} void wake();private: const bool mAllowNonCallbacks; // immutable int mWakeReadPipeFd; // immutable
int mWakeWritePipeFd; // immutable
Mutex mLock; int mEpollFd; // immutable int pollInner(int timeoutMillis);
void awoken();
};

因为代码有点多,所以上面的声明,已经被我精简了大部分,现在我们只关注我们关心的:pollonce和wake函数。

还是从构造函数开始(frameworks/native/utils/Looper.cpp):

Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);//创建命名管道
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
// 保存命名管道
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1]; result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking. errno=%d",
errno); result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//设置为非阻塞模式
LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking. errno=%d",
errno);
// 开始使用epoll API,实现轮询
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);//创建epoll文件描述符
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno); struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); // 把刚才创建的命名管道的读端加入的到epoll的监听队列中
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance. errno=%d",
errno);
}Looper::~Looper() {
close(mWakeReadPipeFd);//释放命名管道
close(mWakeWritePipeFd);
close(mEpollFd);//释放epoll文件描述符
}

Looper的构造函数中,出现了命名管道和epoll相关的代码,这是为什么呢?别急,接着看下去就知道了:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 这段代码暂时无视
while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++);
            int ident = response.request.ident;
            if (ident >= 0) {//ident > 0, 即此response为noncallback,需要返回event,data等数据给调用者处理
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        } if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
} result = pollInner(timeoutMillis);//这一行才是重点!
}
}

接着往下看,代码有些长,但请仔细看:

int Looper::pollInner(int timeoutMillis) {#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif // 设置timeoutMillis的值为Math.min(timeoutMills, mNextMessageUptime)
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }    // Poll.
    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0; // 开始轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // Acquire lock.
mLock.lock(); // Check for poll error.
if (eventCount < 0) { //处理error
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
} // Check for poll timeout.
if (eventCount == 0) { // 未能等到event,故timeout
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
} // Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif for (int i = 0; i < eventCount; i++) {//有event,则处理
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();//读取命名管道内的数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到mResponses中,等待后续处理
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// 处理C++层的Message
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);//调用handler->handleMessage
} // release handler mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;//更新mNextMessageUptime
break;
}
} // Release lock.
mLock.unlock();
// 处理mResponses
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}
return result;
}

代码比较长,所以分段分析:

    // 设置timeoutMillis的值为Math.min(timeoutMillis, mNextMessageUptime)
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}

为了解析这段代码,需要先补充一些C++层的Message相关的代码:

struct Message {
Message() : what(0) { }
Message(int what) : what(what) { } /* The message type. (interpretation is left up to the handler) */
int what;
};class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler() { } public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
}struct MessageEnvelope {
MessageEnvelope() : uptime(0) { }
MessageEnvelope(nsecs_t uptime, const sp<MessageHandler> handler,const Message& message) : uptime(uptime), handler(handler), message(message) {} nsecs_t uptime;
MessageHandler> handler;
Message message;
};void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%lld, handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock); size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
} MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1); // Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock // Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}void Looper::removeMessages(const sp<MessageHandler>& handler) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p", this, handler.get());
#endif { // acquire lock
AutoMutex _l(mLock); for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}void Looper::removeMessages(const sp<MessageHandler>& handler, int what) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ removeMessages - handler=%p, what=%d", this, handler.get(), what);
#endif { // acquire lock
AutoMutex _l(mLock); for (size_t i = mMessageEnvelopes.size(); i != 0; ) {
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(--i);
if (messageEnvelope.handler == handler
&& messageEnvelope.message.what == what) {
mMessageEnvelopes.removeAt(i);
}
}
} // release lock
}

是不是觉得上面的代码似曾相识?和Java层的MessageQueue很像似有木有?和java层一样,C++层存在消息队列和消息处理机制,消息被保存到成员mMessageEvelopes中,并在pollInner函数中处理消息(调用MesageHandler的handleMesage函数处理)。

现在回到Looper::pollonceh函数,我们就应该能够理解,pollOnce函数到timeOutMillis参数仅仅代表了Java层下一个Message的触发延迟,所以,我们还需要考虑C++层下一个Message的触发延迟,所以,代码设置timeoutMillis为timeoutMillis和mNextMessageUpTime中的较小值。

继续下一段代码:

    int result = ALOOPER_POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// 开始轮询
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // Acquire lock.
mLock.lock();

调用epoll函数,等待event发生,epoll_wait函数的返回值有三种可能:失败出错、没有event、有一个或多个event。

1. 失败的处理:

    // Check for poll error.
if (eventCount < 0) { //处理error
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = ALOOPER_POLL_ERROR;
goto Done;
}

2. 没有event发生:

    // Check for poll timeout.
if (eventCount == 0) { // 未能等到event,故timeout
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = ALOOPER_POLL_TIMEOUT;
goto Done;
}

3. 有event发生:

    // Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif for (int i = 0; i < eventCount; i++) {//有event,则处理
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {//说明java层或者C++层有新的Message
if (epollEvents & EPOLLIN) {
awoken();//读取命名管道内的数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//把event添加到mResponses中,等待后续处理
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}

处理event的时候,需要分为两类:fd==mWakeReadPipeFd和fd!=mWakeReadPipeFd

fd==mWakeReadPipeFd:说明C++层,或者java层有新的Message出现,需要处理。这种情况下,只需要读mWakeReadPipeFd内的数据即可

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ awoken", this);
#endif char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

fd!=mWakeReadPipeFd:首先需要搞明白fd是哪来的:

    struct Request {
        int fd;
        int ident;
        sp<LooperCallback> callback;
        void* data;
    };    struct Response {
        int events;
        Request request;
    };
/**
 * A looper callback.
 */
class LooperCallback : public virtual RefBase {
protected:
    virtual ~LooperCallback() { }public:
    /**
     * Handles a poll event for the given file descriptor.
     * It is given the file descriptor it is associated with,
     * a bitmask of the poll events that were triggered (typically ALOOPER_EVENT_INPUT),
     * and the data pointer that was originally supplied.
     *
     * Implementations should return 1 to continue receiving callbacks, or 0
     * to have this file descriptor and callback unregistered from the looper.
     */
    virtual int handleEvent(int fd, int events, void* data) = 0;
};
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ addFd - fd=%d, ident=%d, events=0x%x, callback=%p, data=%p", this, fd, ident,
events, callback.get(), data);
#endif if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
} if (ident < 0) {//仅当Looper支持NonCallbacks,并且ident大于0时,允许添加callback为null的Fd
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = ALOOPER_POLL_CALLBACK;
} int epollEvents = 0;
if (events & ALOOPER_EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & ALOOPER_EVENT_OUTPUT) epollEvents |= EPOLLOUT; { // acquire lock
AutoMutex _l(mLock); Request request;
request.fd = fd;
request.ident = ident;
request.callback = callback;
request.data = data; struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = epollEvents;
eventItem.data.fd = fd; ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.add(fd, request);
} else {//存在则替换
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
if (epollResult < 0) {
ALOGE("Error modifying epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}

从上面的代码,我们可知,Looper还支持添加Fd和自定义的callback,类似java层的Message.callback。

通过addFd函数,可以向Looper的mEpollFd添加指定的Fd,当Fd触发指定的event .e.i  EPOLLIN or EPOLLOUT时,指定的相应的自定义callback就会得到执行。

另外,当Looper支持noncallback时,还可以向Looper添加callback为null的Fd,因为没有callback,所以Fd添加者需要调用int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) 通过outFd、outEvent、outData参数获取数据并进行处理。

所以当Fd触发消息时,需要生成对应到reponse并添加到meResponses中,等待后续的处理。

void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}

到这里为止,epoll_wait函数返回的三种结果的不同处理已经解析完毕,接下来代码进入共同的Done环节。

处理C++层的Message:

Done: ;
// 处理C++层的Message
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
handler->handleMessage(message);//调用handler->handleMessage
} // release handler mLock.lock();
mSendingMessage = false;
result = ALOOPER_POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;//更新mNextMessageUptime
break;
}
}

处理Response:

    // 处理mResponses
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == ALOOPER_POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
int callbackResult = response.request.callback->handleEvent(fd, events, data);//调用callback->handleEvent
if (callbackResult == 0) {
removeFd(fd);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = ALOOPER_POLL_CALLBACK;
}
}

搞懂了上面的代码,我们就很容易明白wake函数做了些什么:

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);//向mWakeWritePipeFd中写入一个字符,所以mWakeReadPipeFd就会触发EPOLLIN event } while (nWrite == -1 && errno == EINTR); if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}

小结:

  1. Looper通过epoll函数组实现了一个可以支持随时唤醒的阻塞机制
  2. Looper支持两种不同的方式处理消息:Message + MessageHandler 和 LooperCallback。
  3. Looper的阻塞在如下四种条件下会被唤醒:
    • 发生错误
    • 等待超时
    • 出现需要处理的新Message(包括C++层和Java层)
    • 由addFd函数添加的Fd触发event

总结:

Android消息机制不完全解析(下)

  1. 在哪个线程调用JAVA层的Looper.loop(),Mesage和callback(包括Java层和C++层)就在哪个线程被处理,上图为Looper.loop函数的时序图。
  2. C++层的NativeMesasgeQueue不应该是Java层的MesageQueue的内部实现,而更接近于“栾生兄弟”的关系。MessageQueue负责处理java层上到消息,NativeMessageQueue负责处理C++层上的消息。其中Java层是在android.os.Looper.looper函数中调用android.os.Handler.dispatchMessage处理,而C++层是在android::Looper::pollInner函数中调用android::MessageHandler::handleMessage & android:LooperCallback::handleEvent函数处理。
  3. NativeMessageQueue利用Looper类实现了一个基于epoll函数和文件描述符(Fd)的可唤醒的阻塞机制。
相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:8,965
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,486
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,331
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,114
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,747
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,781