1.使用场景
在很多系统中,特别是电商系统常常存在需要执行延迟任务。例如一个待支付订单,需要在30分钟后自动关闭。虽然有很多方式可以实现,比如说Job等,这里主要介绍利用Redis的新特性 keyspace notifications来实现。
2.基础知识
重点!!! Redis 2.8.0版本开始支持 keyspace notifications。如果你的Redis版本太低,可以洗洗睡了……
如果你还不了解Redis的Pub/Sub,强烈建议你先阅读该篇文章:Redis发布与订阅
接下来说说我们的主角:keyspace notifications
keyspace notifications默认是关闭状态,开启则需要修改redis.conf文件或通过CONFIG SET来开启或关闭该功能。这里我们使用CONFIG SET来开启:
$ redis-cli config set notify-keyspace-events Ex
这里有人会问了, Ex 是什么意思呢?这是 notify-keyspace-events 的参数,完整的参数列表看看下面的表格:
字符 | 发送的通知 |
---|---|
K |
键空间通知,所有通知以 __keyspace@<db>__ 为前缀 |
E |
键事件通知,所有通知以 __keyevent@<db>__ 为前缀 |
g |
DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知 |
$ |
字符串命令的通知 |
l |
列表命令的通知 |
s |
集合命令的通知 |
h |
哈希命令的通知 |
z |
有序集合命令的通知 |
x |
过期事件:每当有过期键被删除时发送 |
e |
驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 |
A |
参数 g$lshzxe 的别名 |
可以看出,我们只开启了键事件通知和过期事件。因为我们实现延时任务只需要这两个就足够了。话不多说,直接看代码。
3. 实现方案
一个延迟任务应该具备哪些属性? 我觉得至少有以下属性:
- 任务类型。(例如:关闭订单)
- 任务ID。(例如:订单ID)
- 任务延迟时间。(例如:30分钟)
- 任务额外数据。(例如:订单其他相关数据)
确定好后,我们可以继续往下走。
3.1 注册事件处理器
首先在工程启动后,我们需要根据不同的事件注册不同的处理器:
const _ = require('lodash') // 任务处理器map const handlers = {} // 事件类型map const events = {} const registerEventHandler = (type, handler) => { if (!type) { throw new Error('type不能为空') } if (!_.isFunction(handler)) { throw new Error('handler类型非function') } handlers[type] = handler events[type] = true }
3.2 创建延迟任务
const redis = require('redis') const client = redis.createClient() const eventKeyPrefix = 'custom_event_'// 任务列表 const jobs = {} const addDelayEvent = (type, id, body = {}, delay = 10 * 60) => { const key = `${eventKeyPrefix}${type}_${id}` const jobKey = `${type}_${id}` client.setex(key, delay, 'delay event', (err) => { if (err) { return console.log('添加延迟事件失败:', err); } console.log('添加延迟事件成功'); jobs[jobKey] = body }) }
这里比较关键的点就是client.setex(key, expired, value)这个方法,我们需要给key添加一个过期时间,那么当key过期后redis才会发出一个过期事件。
3.3 订阅过期事件
实现了前两个步骤后,我们已经可以往redis里写入带有过期时间的key了。接下来关键的就是订阅过期事件并处理。
const redis = require('redis') const sub = redis.createClient() sub.on('pmessage', (pattern, channel, message) => { // match key const keyMatcher = new RegExp(`^${eventKeyPrefix}(${_.keys(events).join('|')})_(\\S+)$`) const result = message.match(keyMatcher) if (result) { const type = result[1]; const id = result[2]; const handler = handlers[type] console.log('订阅消息:type=%s, id=%s', type, id); if (_.isFunction(handler)) { const jobKey = `${type}_${id}` if (jobs[jobKey]) { handler(id, jobs[jobKey]) } else { console.log('未找到延迟事件,type=%s,id=%s', type, id); } } else { console.log('未找到事件处理器。type=%s', type) } } }) // 订阅频道 sub.psubscribe('__key*__:expired')
3.4 编写Demo
最后我们写一个Demo来验证下我们的功能。
const eventManager = require('./utils/eventManager') eventManager.registerEventHandler('closeorder', (id, body) => { console.log('关闭订单 id=%s, body=%o', id, body); }) eventManager.addDelayEvent('closeorder', 1111, {name: 'test'}, 5)
Done!
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。