做安卓的同学对Handler并不陌生,通过它可以方便的实现线程切换及定时任务,是线程通信的利器。安卓应用本身就是事件驱动的消息轮询模型,系统中的各个组件的正常工作也离不开Handler的支持。其背后的Message、Looper、MessageQueue为这套机制提供了充足的保障。相信大家已经看过不少消息循环机制的源码分析了,出于学习目的,这次让我们一起实现一个简单的消息循环机制。
我们希望在javaSE上实现一个跨平台的消息循环机制,可以做到线程间通讯并支持发送延时消息。线程同步方面选用jdk提供的重入锁(非必须,可选其他)配合Condition完成线程的休眠与唤醒。与AndroidSDK类似,通过MessageQueue对象实现消息队列,通过Handler对象实现消息的发送与处理。但Message与Looper都相应做了简化,只实现核心部分。放一张类图镇镇场子:
消息队列组织为链表形式,通过MessageQueue对象维护。Handler创建一条消息,将其插入到MessageQueue维护的链表中。因为有延时消息的存在,队列中的消息并不一定要立即处理。MessageQueue的next方法会检查消息队列第一条消息的处理时间,若符合要求才将Message出队,Looper会将出队的Message派发给这个Message对象内的Handler(也就是创建这个Message的Handler)来处理。由于MessageQueue的next方法一次只会处理一条消息,所以Looper的loop方法会无限调用next,循环不断处理消息,直至手动退出。
所以Message对象中有指向下一个节点的引用。when字段记录了消息希望被处理的时间。Message中的其他字段定义如下:
public class Message { public Object obj; //你懂得 public int what; //你懂得 Handler target; //处理此Message的Handler(也是生成此Message的Handler) Runnable callback;//消息处理回调 long when; //处理时间 Message next; //next节点 }
定义好Message对象后,就可以着手实现消息队列了。学过操作系统的同学都知道,这套消息循环机制其实就是一个生产者消费者模型,也可以认为是它的一个变种。消费者(Looper)不断消费Message,直到缓冲区中没有消息被阻塞。生产者(Handler)不断向缓冲区生产Message,没有容量限制。综上所述,虽然我们并不需要处理生产者线程的唤醒问题,但我们仍需考虑以下几点:
- 缓冲区(MessageQueue)中的数据(Message)必须是按时间排序的,以便在正确的时间被方便的取出。
- 生产者向缓冲区投递消息的行为可能会打扰到正在休眠的消费者,因为每当消息入队时会唤醒阻塞的消费者线程,而此时消费者线程并不急于处理消息(时间没到)。
- 生产者线程在处理消息/休眠时,应安全的退出消息循环
从以上几个问题出发,我们如此编写MessageQueue的next方法:
/** * 从消息队列中取出一个Message 可能会引起线程的阻塞 * @return 需要处理的消息 */ public Message next() { //需要休眠的时间 0代表缓冲区非空时无限休眠 大于零代表实际的休眠时间 小于零代表不休眠 long waitTimeMillis = 0; while (true) { try { lock.lockInterruptibly(); //如果没有需要马上处理的消息,此方法将在这行代码处阻塞 waitMessage(waitTimeMillis); //quiting为布尔值变量作为消息循环退出的标志 if (quitting) return null; long now = System.currentTimeMillis(); Message msg = messages; //如果缓冲区内有数据,则以队首元素中的时间字段为依据 //要么取出消息并返回,要么计算等待时间重新休眠 if (msg != null) { if (now < msg.when) { waitTimeMillis = msg.when - now; } else { //队首元素出队 messages = messages.next; msg.next = null; return msg; } } else { //缓冲区中没数据,但线程被唤醒,说明消息循环需要退出,将等待时间置为-1以便退出循环 waitTimeMillis = -1; } } catch (InterruptedException e) { return null; } finally { lock.unlock(); } } }
多说一句,上文提到的缓冲区、消息队列、MessageQueue均指代这个由Message对象构成的链表,而消费者、消息循环指代MessageQueue所在的线程。上文的代码中,lock对象为重入锁,定义如下:
private ReentrantLock lock = new ReentrantLock();
messages为链表表头引用,初始情况下为null
private Message messages;
我们知道,每次调用next方法都会返回一个Message对象,如果消息队列中没有合适的对象,此方法将阻塞,当消息循环退出时,next方法将直接返回null,MessageQueue的使用者(Looper)循环不断的通过next方法取出一条条消息,根据Message为空与否,决定消息循环的终止与运行。虽然next方法一次只返回一条消息,但其主体是一个循环。因为我们需要处理延时消息,当一条消息入队时,可能正在阻塞着的next方法将被迫调度起来继续执行,但因为此时消息的处理时间还没到,while循环可以帮助我们在下一轮循环中继续休眠。也就是说, waitMessage 方法返回后,虽然会保证缓冲区非空,但不能保证队首的Message可被立即处理,所以我们可以看到这段代码:
if (now < msg.when) { waitTimeMillis = msg.when - now; }
在队首Message不能立即被处理的情况下,重新记录休眠时间,经过while的下一轮循环,被记录的休眠时间将被waitMessage方法处理:
public void waitMessage(long waitTimeMillis) throws InterruptedException { if (waitTimeMillis < 0) return; if (waitTimeMillis == 0) { //缓冲区空则无限休眠,直到新的消息到来唤醒此线程 while (messages == null) notEmpty.await(); } else { //休眠指定时间 notEmpty.await(waitTimeMillis, TimeUnit.MILLISECONDS); } }
else分支中线程在notEmpty上等待waitTimeMillis毫秒。代码中的notEmpty为Condition对象,用于阻塞消费者线程,定义如下:
private Condition notEmpty = lock.newCondition();
回到 waitMessage 方法,当waitTimeMillis为-1时,函数直接返回,无需等待。这是因为-1代表着线程准备退出,此时直接返回意味着不再阻塞当前线程,代码继续向下执行,遇到 if (quitting) return null; 之后next方法便返回了。
这样,quit方法的负责修改quitting字段的值,同时唤醒线程以便完成退出。
public void quit() { lock.lock(); quitting = true; notEmpty.signal(); lock.unlock(); }
剩下的事情就很明确了。enqueueMessage方法用于入队消息,定义如下:
public boolean enqueueMessage(Message message) { try { lock.lockInterruptibly(); if (quitting) return false; //将message插入合适的位置 insertMessage(message); //唤醒消费者线程 notEmpty.signal(); return true; } catch (InterruptedException e) { } finally { lock.unlock(); } return false; }
代码很简单,先将参数Message插入消息队列,然后唤醒消费者线程。我们直接来看一下insertMessage方法:
private void insertMessage(Message msg) { Message now = messages; if (messages == null || msg.when < now.when) { msg.next = messages; messages = msg; return; } Message pre = now; now = now.next; while (now != null && now.when < msg.when) { pre = now; now = now.next; } msg.next = now; pre.next = msg; }
这里首先处理的是缓冲区空或新消息被插入到队首的情况,头插法返回之。其他情况下,找到第一个比msg晚的Message对象,将其插入到这个对象之前。这样就可以保证入队的消息在缓冲区中按处理时间升序排列了。
以上就是MessageQueue的全部代码。现在我们假设有三条消息在同一时间点被依次发送,第一个消息需要延时5s,第二个需要立刻处理,第三个需要延时2.5秒。初始状态下缓冲区为空,消费者线程阻塞。第一个消息的入队使得消费者线程被唤醒,在next方法中检查队首元素发现需要延时5s处理,于是将 waitTimeMillis 置为5000,在下一轮while循环中调用 notEmpty.await(waitTimeMillis, TimeUnit.MILLISECONDS); 进行休眠。第二个消息的到来又导致了消费者线程被唤醒,此时第二个消息因为需要立即执行被插入缓冲区队首。next方法取得队首消息发现需要立即处理,便将此消息返回给Looper处理。Looper处理完后继续调用next方法获取消息,由于此时缓冲区非空,无需阻塞,查看队首消息(延时5s的那条消息)发现时间未到,计算剩余时间并休眠自己。随着第三条消息的到来,消费者线程又被唤醒,依然是检查时间并休眠自己,注意,此时消息队列中存在两条消息,依次为延时2.5s消息、延时5s消息,这次的休眠时间也被重置为约2.5s。等时间一到,取出队首元素返回给Looper,后面的动作与处理完无延时消息后的动作别无二致了。当然上面的描述只是可能会出现的一种情况,具体的入队与唤醒顺序取决于操作系统对线程的调度,相信大家自己也能捋出来了。
有了MessageQueue对象,其他角色的工作就轻松许多。上文中多次提到Looper,他长这个样子:
public class Looper { private static ThreadLocal<Looper> localLooper = ThreadLocal.withInitial(Looper::new); private MessageQueue queue; private Looper() { queue = new MessageQueue(); } public static Looper myLooper() { return localLooper.get(); } public static void loop() { Looper me = myLooper(); while (true) { Message msg = me.queue.next(); if (msg == null) return; msg.target.dispatchMessage(msg); } } MessageQueue getMessageQueue() { return queue; } public void quit() { queue.quit(); } }
为了简化编写,去掉了安卓Looper中的prepare方法,直接在声明时初始化threadLocal。loop方法也非常的简单粗暴,不断调用MessageQueue的next方法,获取消息并分发之,在分发的时候,调用的是 msg.target.dispatchMessage(msg) 方法。这里的target对象是一个Handler对象:
public class Handler { private Looper looper; private Callback callback; public Handler(Looper looper) { this(looper, null); } public Handler() { this(Looper.myLooper(), null); } public Handler(Looper looper, Callback callback) { this.looper = looper; this.callback = callback; } protected void handleMessage(Message message) { } void dispatchMessage(Message message) { if (message.callback != null) { message.callback.run(); return; } else { if (callback != null) { if (callback.handleMessage(message)) { return; } } handleMessage(message); } } public void sendMessage(Message message) { if (looper == null) return; MessageQueue queue = looper.getMessageQueue(); message.target = this; queue.enqueueMessage(message); } public void postDelay(Runnable runnable, long delay) { Message message = new Message(); message.when = System.currentTimeMillis() + delay; message.callback = runnable; sendMessage(message); } public void sendMessageDelay(Message message, long delay) { message.when = System.currentTimeMillis() + delay; sendMessage(message); } public void post(Runnable runnable) { postDelay(runnable, 0); } public interface Callback { boolean handleMessage(Message message); } }
dispatchMessage方法的实现与官方的思路一致,分发顺序也尽量复刻原版代码。Handler会通过构造函数获取Looper,或者通过参数传入,或是直接通过Looper的静态方法获取。有了looper对象,在发送消息的时候我们就可以向looper内的MessageQueue插入消息了。摘出两段有代表性的发送消息的代码:
public void sendMessageDelay(Message message, long delay) { message.when = System.currentTimeMillis() + delay; sendMessage(message); }
public void sendMessage(Message message) { if (looper == null) return; MessageQueue queue = looper.getMessageQueue(); message.target = this; queue.enqueueMessage(message); }
在Handler发送消息之前,将自己绑定到message对象的target字段上,这样looper就可以将取出的message对象重新派发回创建它的handler,完成消息的处理。
好了,到此为止就是这套消息循环机制的全部代码,需要的小伙伴就麻烦自己整理下源码吧,就这样。
以上。
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。