做安卓的同学对Handler并不陌生,通过它可以方便的实现线程切换及定时任务,是线程通信的利器。安卓应用本身就是事件驱动的消息轮询模型,系统中的各个组件的正常工作也离不开Handler的支持。其背后的Message、Looper、MessageQueue为这套机制提供了充足的保障。相信大家已经看过不少消息循环机制的源码分析了,出于学习目的,这次让我们一起实现一个简单的消息循环机制。
我们希望在javaSE上实现一个跨平台的消息循环机制,可以做到线程间通讯并支持发送延时消息。线程同步方面选用jdk提供的重入锁(非必须,可选其他)配合Condition完成线程的休眠与唤醒。与AndroidSDK类似,通过MessageQueue对象实现消息队列,通过Handler对象实现消息的发送与处理。但Message与Looper都相应做了简化,只实现核心部分。放一张类图镇镇场子:
消息队列组织为链表形式,通过MessageQueue对象维护。Handler创建一条消息,将其插入到MessageQueue维护的链表中。因为有延时消息的存在,队列中的消息并不一定要立即处理。MessageQueue的next方法会检查消息队列第一条消息的处理时间,若符合要求才将Message出队,Looper会将出队的Message派发给这个Message对象内的Handler(也就是创建这个Message的Handler)来处理。由于MessageQueue的next方法一次只会处理一条消息,所以Looper的loop方法会无限调用next,循环不断处理消息,直至手动退出。
所以Message对象中有指向下一个节点的引用。when字段记录了消息希望被处理的时间。Message中的其他字段定义如下:
public class Message {
public Object obj; //你懂得
public int what; //你懂得
Handler target; //处理此Message的Handler(也是生成此Message的Handler)
Runnable callback;//消息处理回调
long when; //处理时间
Message next; //next节点
}
定义好Message对象后,就可以着手实现消息队列了。学过操作系统的同学都知道,这套消息循环机制其实就是一个生产者消费者模型,也可以认为是它的一个变种。消费者(Looper)不断消费Message,直到缓冲区中没有消息被阻塞。生产者(Handler)不断向缓冲区生产Message,没有容量限制。综上所述,虽然我们并不需要处理生产者线程的唤醒问题,但我们仍需考虑以下几点:
- 缓冲区(MessageQueue)中的数据(Message)必须是按时间排序的,以便在正确的时间被方便的取出。
- 生产者向缓冲区投递消息的行为可能会打扰到正在休眠的消费者,因为每当消息入队时会唤醒阻塞的消费者线程,而此时消费者线程并不急于处理消息(时间没到)。
- 生产者线程在处理消息/休眠时,应安全的退出消息循环
从以上几个问题出发,我们如此编写MessageQueue的next方法:
/**
* 从消息队列中取出一个Message 可能会引起线程的阻塞
* @return 需要处理的消息
*/
public Message next() {
//需要休眠的时间 0代表缓冲区非空时无限休眠 大于零代表实际的休眠时间 小于零代表不休眠
long waitTimeMillis = 0;
while (true) {
try {
lock.lockInterruptibly();
//如果没有需要马上处理的消息,此方法将在这行代码处阻塞
waitMessage(waitTimeMillis);
//quiting为布尔值变量作为消息循环退出的标志
if (quitting) return null;
long now = System.currentTimeMillis();
Message msg = messages;
//如果缓冲区内有数据,则以队首元素中的时间字段为依据
//要么取出消息并返回,要么计算等待时间重新休眠
if (msg != null) {
if (now < msg.when) {
waitTimeMillis = msg.when - now;
} else {
//队首元素出队
messages = messages.next;
msg.next = null;
return msg;
}
} else {
//缓冲区中没数据,但线程被唤醒,说明消息循环需要退出,将等待时间置为-1以便退出循环
waitTimeMillis = -1;
}
} catch (InterruptedException e) {
return null;
} finally {
lock.unlock();
}
}
}
多说一句,上文提到的缓冲区、消息队列、MessageQueue均指代这个由Message对象构成的链表,而消费者、消息循环指代MessageQueue所在的线程。上文的代码中,lock对象为重入锁,定义如下:
private ReentrantLock lock = new ReentrantLock();
messages为链表表头引用,初始情况下为null
private Message messages;
我们知道,每次调用next方法都会返回一个Message对象,如果消息队列中没有合适的对象,此方法将阻塞,当消息循环退出时,next方法将直接返回null,MessageQueue的使用者(Looper)循环不断的通过next方法取出一条条消息,根据Message为空与否,决定消息循环的终止与运行。虽然next方法一次只返回一条消息,但其主体是一个循环。因为我们需要处理延时消息,当一条消息入队时,可能正在阻塞着的next方法将被迫调度起来继续执行,但因为此时消息的处理时间还没到,while循环可以帮助我们在下一轮循环中继续休眠。也就是说, waitMessage 方法返回后,虽然会保证缓冲区非空,但不能保证队首的Message可被立即处理,所以我们可以看到这段代码:
if (now < msg.when) {
waitTimeMillis = msg.when - now;
}
在队首Message不能立即被处理的情况下,重新记录休眠时间,经过while的下一轮循环,被记录的休眠时间将被waitMessage方法处理:
public void waitMessage(long waitTimeMillis) throws InterruptedException {
if (waitTimeMillis < 0) return;
if (waitTimeMillis == 0) {
//缓冲区空则无限休眠,直到新的消息到来唤醒此线程
while (messages == null) notEmpty.await();
} else {
//休眠指定时间
notEmpty.await(waitTimeMillis, TimeUnit.MILLISECONDS);
}
}
else分支中线程在notEmpty上等待waitTimeMillis毫秒。代码中的notEmpty为Condition对象,用于阻塞消费者线程,定义如下:
private Condition notEmpty = lock.newCondition();
回到 waitMessage 方法,当waitTimeMillis为-1时,函数直接返回,无需等待。这是因为-1代表着线程准备退出,此时直接返回意味着不再阻塞当前线程,代码继续向下执行,遇到 if (quitting) return null; 之后next方法便返回了。
这样,quit方法的负责修改quitting字段的值,同时唤醒线程以便完成退出。
public void quit() {
lock.lock();
quitting = true;
notEmpty.signal();
lock.unlock();
}
剩下的事情就很明确了。enqueueMessage方法用于入队消息,定义如下:
public boolean enqueueMessage(Message message) {
try {
lock.lockInterruptibly();
if (quitting) return false;
//将message插入合适的位置
insertMessage(message);
//唤醒消费者线程
notEmpty.signal();
return true;
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
return false;
}
代码很简单,先将参数Message插入消息队列,然后唤醒消费者线程。我们直接来看一下insertMessage方法:
private void insertMessage(Message msg) {
Message now = messages;
if (messages == null || msg.when < now.when) {
msg.next = messages;
messages = msg;
return;
}
Message pre = now;
now = now.next;
while (now != null && now.when < msg.when) {
pre = now;
now = now.next;
}
msg.next = now;
pre.next = msg;
}
这里首先处理的是缓冲区空或新消息被插入到队首的情况,头插法返回之。其他情况下,找到第一个比msg晚的Message对象,将其插入到这个对象之前。这样就可以保证入队的消息在缓冲区中按处理时间升序排列了。
以上就是MessageQueue的全部代码。现在我们假设有三条消息在同一时间点被依次发送,第一个消息需要延时5s,第二个需要立刻处理,第三个需要延时2.5秒。初始状态下缓冲区为空,消费者线程阻塞。第一个消息的入队使得消费者线程被唤醒,在next方法中检查队首元素发现需要延时5s处理,于是将 waitTimeMillis 置为5000,在下一轮while循环中调用 notEmpty.await(waitTimeMillis, TimeUnit.MILLISECONDS); 进行休眠。第二个消息的到来又导致了消费者线程被唤醒,此时第二个消息因为需要立即执行被插入缓冲区队首。next方法取得队首消息发现需要立即处理,便将此消息返回给Looper处理。Looper处理完后继续调用next方法获取消息,由于此时缓冲区非空,无需阻塞,查看队首消息(延时5s的那条消息)发现时间未到,计算剩余时间并休眠自己。随着第三条消息的到来,消费者线程又被唤醒,依然是检查时间并休眠自己,注意,此时消息队列中存在两条消息,依次为延时2.5s消息、延时5s消息,这次的休眠时间也被重置为约2.5s。等时间一到,取出队首元素返回给Looper,后面的动作与处理完无延时消息后的动作别无二致了。当然上面的描述只是可能会出现的一种情况,具体的入队与唤醒顺序取决于操作系统对线程的调度,相信大家自己也能捋出来了。
有了MessageQueue对象,其他角色的工作就轻松许多。上文中多次提到Looper,他长这个样子:
public class Looper {
private static ThreadLocal<Looper> localLooper = ThreadLocal.withInitial(Looper::new);
private MessageQueue queue;
private Looper() {
queue = new MessageQueue();
}
public static Looper myLooper() {
return localLooper.get();
}
public static void loop() {
Looper me = myLooper();
while (true) {
Message msg = me.queue.next();
if (msg == null) return;
msg.target.dispatchMessage(msg);
}
}
MessageQueue getMessageQueue() {
return queue;
}
public void quit() {
queue.quit();
}
}
为了简化编写,去掉了安卓Looper中的prepare方法,直接在声明时初始化threadLocal。loop方法也非常的简单粗暴,不断调用MessageQueue的next方法,获取消息并分发之,在分发的时候,调用的是 msg.target.dispatchMessage(msg) 方法。这里的target对象是一个Handler对象:
public class Handler {
private Looper looper;
private Callback callback;
public Handler(Looper looper) {
this(looper, null);
}
public Handler() {
this(Looper.myLooper(), null);
}
public Handler(Looper looper, Callback callback) {
this.looper = looper;
this.callback = callback;
}
protected void handleMessage(Message message) {
}
void dispatchMessage(Message message) {
if (message.callback != null) {
message.callback.run();
return;
} else {
if (callback != null) {
if (callback.handleMessage(message)) {
return;
}
}
handleMessage(message);
}
}
public void sendMessage(Message message) {
if (looper == null) return;
MessageQueue queue = looper.getMessageQueue();
message.target = this;
queue.enqueueMessage(message);
}
public void postDelay(Runnable runnable, long delay) {
Message message = new Message();
message.when = System.currentTimeMillis() + delay;
message.callback = runnable;
sendMessage(message);
}
public void sendMessageDelay(Message message, long delay) {
message.when = System.currentTimeMillis() + delay;
sendMessage(message);
}
public void post(Runnable runnable) {
postDelay(runnable, 0);
}
public interface Callback {
boolean handleMessage(Message message);
}
}
dispatchMessage方法的实现与官方的思路一致,分发顺序也尽量复刻原版代码。Handler会通过构造函数获取Looper,或者通过参数传入,或是直接通过Looper的静态方法获取。有了looper对象,在发送消息的时候我们就可以向looper内的MessageQueue插入消息了。摘出两段有代表性的发送消息的代码:
public void sendMessageDelay(Message message, long delay) {
message.when = System.currentTimeMillis() + delay;
sendMessage(message);
}
public void sendMessage(Message message) {
if (looper == null) return;
MessageQueue queue = looper.getMessageQueue();
message.target = this;
queue.enqueueMessage(message);
}
在Handler发送消息之前,将自己绑定到message对象的target字段上,这样looper就可以将取出的message对象重新派发回创建它的handler,完成消息的处理。
好了,到此为止就是这套消息循环机制的全部代码,需要的小伙伴就麻烦自己整理下源码吧,就这样。
以上。
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。