一.Handler 运行原理图

二.创建
1.主线程中创建
一般在 Activity
中这样创建 Handler
,其实当前的 Handler
获取的 Looper
是在 ActivityThread 中创建的。
1
|
Handler handler = new Handler();//Looper 是在ActivityThread 中创建的
|
ActivityThread
源码,ActivityThread
并不是一个 Thread
,它只是叫 ActivityThread
来表示它是 Activity
的开始类。
1
2
3
4
5
6
7
8
9
10
11
|
//ActivityThread.java
final H mH = new H();//真正来处理 Message 信息
final Looper mLooper = Looper.myLooper();//当ActivityThread创建的时候,就获得了 Looper
public static void main(String[] args) {
//...code
Looper.prepareMainLooper();//开启 Looper 的主循环 sThreadLocal.set(new Looper(quitAllowed));
//...code
ActivityThread thread = new ActivityThread();
//...code
Looper.loop();//开启循环
}
|
2.子线程中创建
如果在其他线程想要开启一个 handler ,下面的代码直接运行的话,就会报错。
1
2
3
4
5
6
7
|
//handler 在子线程中创建
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Handler handler = new Handler();
}
});
|

报错提示:如果要在其他线程创建就必须调用 Looper.prepare()
和 Looper.loop()
Looper.prepare()
在当前线程创建Looper 对象
Looper.loop()
开启循环,保证此线程一直存活
1
2
3
4
5
6
7
8
9
|
//handler 在子线程中创建
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Looper.prepare();
Handler handler = new Handler();
Looper.loop();
}
});
|
3.Looper分析
3.1). Looper.perpare()
1
2
3
4
5
6
7
8
9
|
public static void prepare() {
prepare(true);//创建一个可以退出的 Looper 对象
}
private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));//在此线程创建 Looper 对象
}
|
3.2). Looper.loop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public static void loop() {
final Looper me = myLooper();//通过 sThreadLocal.get();得到具体的Looper
if (me == null) {//如果 looper 为空 ,那么直接报错
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;
Binder.clearCallingIdentity();//清除Binder中的调用 sid,并获取当前的 pid, 让IPCThread 知道 pid
final long ident = Binder.clearCallingIdentity();//清除Binder中的调用 sid,并获取当前的 pid, 让IPCThread 知道 pid
//...
for (;;) {
Message msg = queue.next(); // 取出消息,可能会中断
if (msg == null) {
//没有消息,则退出循环
return;
}
//...
try {
msg.target.dispatchMessage(msg);//真正执行 Runnable()中的 run() 或者 handleMessage()
//执行 和 msg 相关联 handler 的 dispatchMessage() 方法
//请看下面的代码分析
} catch (Exception exception) {
} finally {
}
//...
final long newIdent = Binder.clearCallingIdentity();//清除Binder中的调用 sid,并获取当前的 pid, 让IPCThread 知道 pid
//...
msg.recycleUnchecked();//消息回收
}
}
|
msg.target.dispatchMessage(msg)
源码分析
我们看Message
类,里面保存了当前绑定的 Handler
、Runnable
、nextMessage
等重要的元素。
1
2
3
4
5
6
7
8
|
//Message.java
public final class Message implements Parcelable {
/*package*/ Handler target;//关联的具体的 Handler 对象
/*package*/ Runnable callback;//如果使用的 post(Runnable r); 方法的都会吧 Runable
/*package*/ android.os.Message next;//单链表,指向下一个消息
}
|
我们从 post()
方法看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
//Handler.java
//发送消息
public final boolean post(@NonNull Runnable r) {
return sendMessageDelayed(getPostMessage(r), 0);
}
//把具体的 Runnable 绑定到 Message.callback 上
private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();
m.callback = r;
return m;
}
//Handler.java
public void dispatchMessage(@NonNull Message msg) {
if (msg.callback != null) {//
handleCallback(msg);//执行绑定的 Runnable callback
} else {
if (mCallback != null) {// Handler 构造方法传入 public Handler(Callback callback, boolean async);
if (mCallback.handleMessage(msg)) {//
return;
}
}
handleMessage(msg);//执行 Handler 的 handleMessage()方法
}
}
|
3.3).MessageQueue.next()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
Message next() {
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
//...
for (;;) {
//...
nativePollOnce(ptr, nextPollTimeoutMillis);//没有任务就挂起 nextPollTimeoutMillis:-1 表示永久阻塞
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {//取出异步消息
//被障碍物挡住了。查找队列中的第一个异步消息。
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
//如果为isAsynchronous()=true :while(true&&false)=> false 已经获取到异步消息
//如果为isAsynchronous()=false :while(true&&true) => true 继续循环获取异步消息
//总结:先会执行异步消息,然后才执行正常消息。
}
if (msg != null) {
if (now < msg.when) {//时间点内没有任务,计算下次要执行的之间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {//从链表中获取元素,并从链表中删除此元素
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;//清除消息的关联的节点
msg.markInUse();//标记次消息已经在执行
return msg;
}
} else {// 没有消息
nextPollTimeoutMillis = -1;//-1 表示永久阻塞
}
//没有消息就会执行下面的方法 1.检测是否要退出 2.执行空闲handler
if (mQuitting) {//如果要退出
dispose();//退出
return null;
}
//空闲任务 IdleHandler msg!=null
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {//等待空闲中的任务 IdleHandler
//
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();//执行空闲任务 IdleHandler 方法
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);//如果 keep 为 false 则移除空闲的handler
}
}
}
pendingIdleHandlerCount = 0;
nextPollTimeoutMillis = 0;//不挂起任务,下次继续执行
}
}
|
3.4).空闲任务 IdleHandler
https://www.jianshu.com/p/1dc73c8ab6a1

ActivityThread 中的空闲任务有,它们是在Handler 空闲的情况下,才会执行任务。
- 更新ActivityTaskManager的状态
- GC 操作
- 清除悬而未决的资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
private class Idler implements MessageQueue.IdleHandler {
@Override
public final boolean queueIdle() {
ActivityClientRecord a = mNewActivities;
boolean stopProfiling = false;
if (mBoundApplication != null && mProfiler.profileFd != null
&& mProfiler.autoStopProfiler) {
stopProfiling = true;
}
if (a != null) {
mNewActivities = null;
IActivityTaskManager am = ActivityTaskManager.getService();
ActivityClientRecord prev;
do {
if (localLOGV) Slog.v(
TAG, "Reporting idle of " + a +
" finished=" +
(a.activity != null && a.activity.mFinished));
if (a.activity != null && !a.activity.mFinished) {
try {
am.activityIdle(a.token, a.createdConfig, stopProfiling);
a.createdConfig = null;
} catch (RemoteException ex) {
throw ex.rethrowFromSystemServer();
}
}
prev = a;
a = a.nextIdle;
prev.nextIdle = null;
} while (a != null);
}
if (stopProfiling) {
mProfiler.stopProfiling();//停止剖析
}
applyPendingProcessState();//VMRuntime.getRuntime().updateProcessState(state); 更新 runtime 状态
return false;
}
}
final class GcIdler implements MessageQueue.IdleHandler {
@Override
public final boolean queueIdle() {
doGcIfNeeded();//做 GC 操作,VMRuntime.getRuntime().requestConcurrentGC();
purgePendingResources();//清除悬而未决的资源
return false;
}
}
final class PurgeIdler implements MessageQueue.IdleHandler {
@Override
public boolean queueIdle() {
purgePendingResources();//清除悬而未决的资源
return false;
}
}
|
3.5).postSyncBarrier()
同步障碍消息,发送同步阻塞消息。让当前的 Looper
不要处理普通的 Message
,优先处理异步消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
//MessageQueue.java
public int postSyncBarrier() {//发送同步障碍消息
return postSyncBarrier(SystemClock.uptimeMillis());
}
private int postSyncBarrier(long when) {
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {//如果之前有消息的话,消息的 when<= 同步障碍消息的 when,找到其消息
prev = p;
p = p.next;
}
}
if (prev != null) { // 如果有前节点,那么 同步障碍消息 放到其后面
msg.next = p;
prev.next = msg;
} else {//把同步障碍消息 放到链表头部
msg.next = p;
mMessages = msg;
}
return token;
}
}
|
当第一个 msg的 target 为空的时候,说明当前 Looper 要停止处理普通消息,优先处理异步消息,如果没有异步消息,则一直阻塞。
1
2
3
4
5
6
7
8
9
10
11
|
Message next() {
//...
// msg.target == null 当第一个 msg的 target 为空的时候,说明当前 Looper 要停止处理消息,优先处理异步消息
if (msg != null && msg.target == null) {//取出第一个异步消息
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
//...
}
|
removeSyncBarrier(int token)
根据 token 移除同步障碍消息,回复当前 Looper 循环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
//移除同步障碍消息
public void removeSyncBarrier(int token) {
synchronized (this) {
Message prev = null;
Message p = mMessages;
while (p != null && (p.target != null || p.arg1 != token)) {
prev = p;
p = p.next;
}
if (p == null) {
throw new IllegalStateException("The specified message queue synchronization "
+ " barrier token has not been posted or has already been removed.");
}
final boolean needWake;
if (prev != null) {
prev.next = p.next;
needWake = false;
} else {
mMessages = p.next;
needWake = mMessages == null || mMessages.target != null;
}
p.recycleUnchecked();
if (needWake && !mQuitting) {
nativeWake(mPtr);
}
}
}
|
3.6).异步消息
来看看有哪些任务是异步消息,ViewRootImpl
、InputMethodManager
、Choreographer
这些类,这些类都是系统级别的,所以这些类发送的消息都是需要优先执行的。但是这个isAsynchronous
叫法应该起名为优先执行
,这样才更显得正确,不会让人在看源码的时候懵逼。

3.7).nativePollOnce()
MessageQueue
中的方法,用来检测是否挂起整个线程,还是继续执行任务。
1
|
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
//android_os_MessageQueue.cpp
static const JNINativeMethod gMessageQueueMethods[] = {
//...
{ "nativePollOnce", "(JI)V", (void*)android_os_MessageQueue_nativePollOnce },
//...
};
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//通过传入的 ptr 找到具体的 NativeMessageQueue 对象,然后执行 pollOnce()
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);//执行 Looper::pollOnce()
mPollObj = NULL;
mPollEnv = NULL;
if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
|
Looper
中的 pollOnce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
//Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
while (mResponseIndex < mResponses.size()) {//当mResponseIndex 的索引小于mResponses.size()的时候,说明需要处理 Responses ,立马执行
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
if (result != 0) {
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
//获取状态
result = pollInner(timeoutMillis);
}
}
//
int Looper::pollInner(int timeoutMillis) {
// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
// Poll. 唤醒状态
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mIdling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//epoll的 wait ,如果从管道中读取不到消息就挂起,有消息就继续执行
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mIdling = false;
// Acquire lock.
mLock.lock();
// Check for poll error.
if (eventCount < 0) {//如果事件数量小于 0 ,则直接跳转到 Done
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
result = POLL_ERROR;
goto Done;
}
// Check for poll timeout.
if (eventCount == 0) {//如果时间数量等于 0 ,则直接跳转到 Done
result = POLL_TIMEOUT;
goto Done;
}
// Handle all events. 处理所有事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();//取出 pipe 中写入的操作符,表示已经处理过管道中的数据了
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));//添加结果
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;//下次消息的唤醒时间
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.调用所有响应回调。
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd);//删除事件
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
//读取操作符
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
//保存返回状态,用于最后清除事件
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
//删除管道的多余操作符
int Looper::removeFd(int fd) {
{ // acquire lock
AutoMutex _l(mLock);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
return 0;
}
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_DEL, fd, NULL);//删除事件
if (epollResult < 0) {
ALOGE("Error removing epoll events for fd %d, errno=%d", fd, errno);
return -1;
}
mRequests.removeItemsAt(requestIndex);
} // release lock
return 1;
}
|
dispose()
退出事件循环做的操作,让 NativeMessageQueue 对象的引用计数-1
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr);
mPtr = 0;
}
}
private native static void nativeDestroy(long ptr);
//Android_os_MessageQuenue.cpp
static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->decStrong(env);//引用计数-1
}
|
三.消息
1. 发送消息
在 Handler 中都是通过 post 来发送一个消息。
1
2
3
4
5
6
|
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
}
}, 1000);
|
2.源码分析
2.1).handler.postDelayed()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
//Handler.java
//handler 中的 postDelayed 的代码流程
public final boolean postDelayed(@NonNull Runnable r, long delayMillis) {
return sendMessageDelayed(getPostMessage(r), delayMillis);
}
public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}
public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
MessageQueue queue = mQueue;
if (queue == null) {
RuntimeException e = new RuntimeException(
this + " sendMessageAtTime() called with no mQueue");
Log.w("Looper", e.getMessage(), e);
return false;
}
return enqueueMessage(queue, msg, uptimeMillis);
}
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
msg.target = this;//设置 target
msg.workSourceUid = ThreadLocalWorkSource.getUid();
if (mAsynchronous) {//是否是异步,是否是优先执行
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}
|
关于在 handler 中使用的时间戳,使用的是SystemClock.uptimeMillis()
,这样时间长度会小,计算量会减少
1
2
3
4
|
//https://tool.lu/timestamp/ 时间戳在线工具
//使用这个时间,可以保证 数字不会太大
Log.d("zyh", String.valueOf(SystemClock.uptimeMillis()));//553636828 系统启动到当前时刻经过的时间
Log.d("zyh", String.valueOf(System.currentTimeMillis()));//1578644426852
|
2.2).MessageQueue.enqueueMessage()
发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
//MessageQueue.java
boolean enqueueMessage(Message msg, long when) {
if (msg.target == null) {//target 是否为空
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {//是否已经使用
throw new IllegalStateException(msg + " This message is already in use.");
}
synchronized (this) {
if (mQuitting) {//是否要退出
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();//回收,重置 msg
return false;
}
msg.markInUse();//标记在使用
msg.when = when;//设置时间
Message p = mMessages;//获取消息队列的首元素
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 添加的元素是首元素,如果事件还在挂起,那么唤醒
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {//如果不是首元素
needWake = mBlocked && p.target == null && msg.isAsynchronous();//是否要唤醒
Message prev;//前继节点
for (;;) {//找到需要插入的节点位置
prev = p;
p = p.next;
if (p == null || when < p.when) {//如果只有一个元素或者当前时间比后面的要早,那么跳出此循环,
break;
}
if (needWake && p.isAsynchronous()) {//如果需要唤醒,但是为异步线程,那么设置needWake=false
needWake = false;
}
}
//插入元素 o(2)->o(5)->this->o(7)->o(7)->o(9) 新元素o(5)
// this 处插入o(5)
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
if (needWake) {// 唤醒
nativeWake(mPtr);
}
}
return true;
}
|
2.3).android_os_MessageQueue
MessageQueue.java 中创建 NativeMessageQueue 对象。
1
2
3
4
5
6
7
8
|
//MessageQueue.java
private long mPtr; // c++层的 NativeMessageQueue 的指针地址
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();//创建初始化对象,得到 c++ 中的指针
}
private native static long nativeInit();//创建初始化对象
private native static void nativeWake(long ptr);//唤醒操作
|
对应的是framework/base/core/jni/android_os_MessageQueue.cpp
中的源码
可以发现,jni 方法是通过动态添加的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
//android_os_MessageQueue.cpp
//NativeMessageQueue对象结构
class NativeMessageQueue : public MessageQueue, public LooperCallback {
public:
NativeMessageQueue();
virtual ~NativeMessageQueue();
virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
void pollOnce(JNIEnv* env, jobject obj, int timeoutMillis);
void wake();
void setFileDescriptorEvents(int fd, int events);
virtual int handleEvent(int fd, int events, void* data);
private:
JNIEnv* mPollEnv;
jobject mPollObj;
jthrowable mExceptionObj;
};
//动态方法,创建流程
//android_os_MessageQueue.cpp
static const JNINativeMethod gMessageQueueMethods[] = {
{ "nativeInit", "()J", (void*)android_os_MessageQueue_nativeInit },//初始化方法
//...
};
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();//创建NativeMessageQueue对象
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
nativeMessageQueue->incStrong(env);//引用计数+1
return reinterpret_cast<jlong>(nativeMessageQueue);//返回指针,reinterpret_cast 二进制数据拷贝
}
NativeMessageQueue::NativeMessageQueue() ://NativeMessageQueue 构造方法
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);//创建Looper 对象,在 Looper 对象中通过pipe,epoll_create创建监听,epoll_ctl来添加文件描述符
Looper::setForThread(mLooper);
}
}
// http://gityuan.com/2015/12/06/linux_epoll/ epoll介绍
static const int EPOLL_SIZE_HINT = 8;//epoll 初始化大小
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);//通过pipe,创建读和写管道
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);//创建读操作符
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);//创建写操作符
mIdling = false;
// 创建epoll的句柄
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
struct epoll_event eventItem;
//分配内存
memset(& eventItem, 0, sizeof(epoll_event));
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
// mEpollFd 为 epoll 句柄 ,EPOLL_CTL_ADD 表示添加 mWakeReadPipeFd 读文件描述符
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}
|
在 system/core/libutils/Looper.cpp
中找到wake()
方法,是通过Linux
管道的操作, CPU 是通过epoll_wait来监听操作符,进行cpu的挂起唤醒操作的,write
只会往操作符里写一个简单的1,read
来读取,也只会去读出内容是否为1。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
//android_os_MessageQueue.cpp
//唤醒流程
//nativeWake 对应 android_os_MessageQueue_nativeWake 方法
//ptr 是在 c++层创建的NativeMessageQueue 对象指针,是在 java 层中进行保存,然后传入到 c++ 层中的
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
//对应的 NativeMessageQueue 的源码如下,发现 wake()是调用mLooper.wake()
void NativeMessageQueue::wake() {
mLooper->wake();
}
//Looper.cpp
#include <fcntl.h> //https://baike.baidu.com/item/fcntl.h
//ssize_t write (int fd,const void * buf,size_t count); 定义在 <fcntl.h> 头文件中
//在 unix 中,所有的都是文件操作。所以这里写入1 就相当于通知系统或者发送一个消息给系统。
void Looper::wake() {
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);//通过操作符在管道中写入数据,epoll_wait()监听到,就会唤醒
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
|
2.4).epoll
epoll是在内核2.6中提出的,是select和poll的增强版。相对于select和poll来说,epoll更加灵活,没有描述符数量限制。epoll使用一个文件描述符管理多个描述符,将用户空间的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。epoll机制是Linux最高效的I/O复用机制,在一处等待多个文件句柄的I/O事件。
select/poll都只有一个方法,epoll操作过程有3个方法,分别是epoll_create()
, epoll_ctl()
,epoll_wait()
。
2.4.1) epoll_create
1
|
int epoll_create(int size);
|
功能:用于创建一个epoll的句柄,size是指监听的描述符个数, 现在内核支持动态扩展,该值的意义仅仅是初次分配的fd个数,后面空间不够时会动态扩容。 当创建完epoll句柄后,占用一个fd值.
1
|
ls /proc/<pid>/fd/ //可通过终端执行,看到该fd
|
使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
2.4.2) epoll_ctl
1
|
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
|
功能:用于对需要监听的文件描述符(fd)执行op操作,比如将fd加入到epoll句柄。
-
epfd:是epoll_create()的返回值;
-
op:表示op操作,用三个宏来表示,分别代表添加、删除和修改对fd的监听事件;
- EPOLL_CTL_ADD(添加)
- EPOLL_CTL_DEL(删除)
- EPOLL_CTL_MOD(修改)
-
fd:需要监听的文件描述符;
-
epoll_event:需要监听的事件,struct epoll_event结构如下:
1
2
3
4
|
struct epoll_event {
__uint32_t events; /* Epoll事件 */
epoll_data_t data; /*用户可用数据*/
};
|
events可取值:(表示对应的文件描述符的操作)
- EPOLLIN :可读(包括对端SOCKET正常关闭);
- EPOLLOUT:可写;
- EPOLLERR:错误;
- EPOLLHUP:中断;
- EPOLLPRI:高优先级的可读(这里应该表示有带外数据到来);
- EPOLLET: 将EPOLL设为边缘触发模式,这是相对于水平触发来说的。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后就不再监听该事件
2.4.3) epoll_wait
1
|
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
|
功能:等待事件的上报
- epfd:等待epfd上的io事件,最多返回maxevents个事件;
- events:用来从内核得到事件的集合;
- maxevents:events数量,该maxevents值不能大于创建epoll_create()时的size;
- timeout:超时时间(毫秒,0会立即返回)。
该函数返回需要处理的事件数目,如返回0表示已超时。
2.4.4) CPU 处理 epoll时机
cpu 处理中断的时机,当 cpu执行完指令的时候,如果该条指令容许中断,那么就会检查是否有中断,如果有中断,就执行中断,处理指令。

3. clearCallingIdentity
Binder.clearCallingIdentity()
清除Binder中的调用 sid,并获取当前的 pid, 让IPCThread 知道 pid
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
//Looper.java
public static void loop() {
//...
Binder.clearCallingIdentity()
//...
}
//Binder.java
public static final native long clearCallingIdentity();
//core/jni/android_util_Binder.cpp
static const JNINativeMethod gBinderMethods[] = {
// @CriticalNative
{ "clearCallingIdentity", "()J", (void*)android_os_Binder_clearCallingIdentity },
//...
};
static jlong android_os_Binder_clearCallingIdentity()
{
return IPCThreadState::self()->clearCallingIdentity();
}
//binder/IPCThreadState.cpp 通过 self 里面的pthread_getspecific()方法得到 IPCThreadState
IPCThreadState* IPCThreadState::self()
{
if (gHaveTLS.load(std::memory_order_acquire)) {
restart:
const pthread_key_t k = gTLS;
IPCThreadState* st = (IPCThreadState*)pthread_getspecific(k);
if (st) return st;
return new IPCThreadState;
}
// Racey, heuristic test for simultaneous shutdown.
if (gShutdown.load(std::memory_order_relaxed)) {
ALOGW("Calling IPCThreadState::self() during shutdown is dangerous, expect a crash.\n");
return nullptr;
}
pthread_mutex_lock(&gTLSMutex);
if (!gHaveTLS.load(std::memory_order_relaxed)) {
int key_create_value = pthread_key_create(&gTLS, threadDestructor);
if (key_create_value != 0) {
pthread_mutex_unlock(&gTLSMutex);
ALOGW("IPCThreadState::self() unable to create TLS key, expect a crash: %s\n",
strerror(key_create_value));
return nullptr;
}
gHaveTLS.store(true, std::memory_order_release);
}
pthread_mutex_unlock(&gTLSMutex);
goto restart;
}
//clearCallingIdentity
int64_t IPCThreadState::clearCallingIdentity()
{
// ignore mCallingSid for legacy reasons
//UID 用户身份证明(User Identification)的缩写
int64_t token = ((int64_t)mCallingUid<<32) | mCallingPid;
clearCaller();
return token;
}
//http://blog.ifjy.me/android/2016/07/16/Android%E9%80%9A%E4%BF%A1%E8%BF%87%E7%A8%8B%E4%B8%AD%E7%9A%84%E7%BA%BF%E7%A8%8B%E6%B1%A0%E7%AE%A1%E7%90%86.html
//https://blog.csdn.net/qq_33160790/article/details/81346663 关于各种 id
void IPCThreadState::clearCaller()
{
mCallingPid = getpid();//取得进程识别码,PID即process id
//sid:几个进程组可以合并成一个会话组(使用setsid系统调用),可以用于终端程序设计。会话组中所有进程都有相同的SID。
mCallingSid = nullptr; // expensive to lookup
mCallingUid = getuid();//取得进程识别码
}
|
四.其他
1. 子线程创建 Handler
如果想在一个线程中创建一个 handler 的话,那么就必须在创建的时候,调用 Looper.prepare(),在当前线程创建一个 looper,然后打开事件循环 Looper.loop(),那么这个线程就成为了一个不会自动销毁的 handlerThread 线程.
如果想要停止这个handlerThread,那么就得执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class ThreadTest extends Thread {
public Looper looper;
@Override
public void run() {
Looper.prepare();
looper = Looper.myLooper();
Handler handler = new Handler(new Handler.Callback() {
@Override
public boolean handleMessage(Message msg) {
//todo
return false;
}
});
Looper.loop();//将会阻塞当前线程,当该线程中的 Message 处理完,就可以销毁这个线程
}
}
public class TestActivity...{
onCreate(...){
ThreadTest threadTest = new ThreadTest();
threadTest.start();
mButtonEnd.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
threadTest.looper.quit();//如果一个线程 成为一个异步消息线程之后,那么之后显示调用 quit() 方法才能结束这个线程.
}
});
}
}
|
2. IntentService
在执行 onCreate()
方法的时候,会创建 HandlerThread
对象,然后把 Looper
传递给ServiceHandler
.
1
2
3
4
5
6
7
8
9
10
11
|
//IntentService.java
@Override
public void onCreate() {
super.onCreate();
HandlerThread thread = new HandlerThread("IntentService[" + mName + "]");
thread.start();
mServiceLooper = thread.getLooper();
mServiceHandler = new ServiceHandler(mServiceLooper);
}
protected abstract void onHandleIntent(@Nullable Intent intent);
|
查看 ServiceHandler
源码,发现handleMessage
方法中有onHandleIntent
方法,执行完成之后,执行stopSelf
来关闭当前的IntentService,onHandleIntent
是运行在HandlerThread子线程中.
1
2
3
4
5
6
7
8
9
10
11
|
private final class ServiceHandler extends Handler {
public ServiceHandler(Looper looper) {
super(looper);
}
@Override
public void handleMessage(Message msg) {
onHandleIntent((Intent)msg.obj);
stopSelf(msg.arg1);
}
}
|
当执行stopSelf
的时候,会调用mServiceLooper.quit
.然后把Looper
循环关掉.
1
2
3
4
|
@Override
public void onDestroy() {
mServiceLooper.quit();
}
|
3. 进程间消息
可以通过 Handler
类中的 getIMessenger()
,获得可以绑定当前 Handler
的 IMessenger
对象,然后把 IMessenger
对象发送到其他进程,其他进程就可以通过 IMessenger
进行消息的发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//Handler.java
final IMessenger getIMessenger() {//获得可以绑定当前 Handler 的 IMessenger 对象,IMessenger可以进行消息的发送
synchronized (mQueue) {
if (mMessenger != null) {
return mMessenger;
}
mMessenger = new MessengerImpl();
return mMessenger;
}
}
private final class MessengerImpl extends IMessenger.Stub {
public void send(Message msg) {
msg.sendingUid = Binder.getCallingUid();
Handler.this.sendMessage(msg);
}
}
|
五.相关源码地址
android_os_MessageQueue.cpp
IPCThreadState.cpp
Looper.cpp
如果你喜欢我的文章,可以关注我的掘金、公众号、博客、简书或者Github!
简书: https://www.jianshu.com/u/a2591ab8eed2
GitHub: https://github.com/bugyun
Blog: https://ruoyun.vip
掘金: https://juejin.im/user/56cbef3b816dfa0059e330a8/posts
CSDN: https://blog.csdn.net/zxloveooo
欢迎关注微信公众号
