0%

基于 Redisson 实现 DelayQueue

1. 监听过期 key

基于监听过期key的方式来实现延迟队列是我想到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收获

1.1 Redis发布订阅模式

一谈到发布订阅模式,其实一想到的就是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图:

图中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。

生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。

1.2 keyspace notifications

在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变化。

这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知。

这些默认的channel被分为两类:

  • __keyspace@<db>__:为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件。
    举个例子,现在有个消费者监听了__keyspace@0__:sanyou这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息
  • __keyevent@<db>__:为前缀,后面跟的是消息事件类型,表示监听某个事件
    同样举个例子,现在有个消费者监听了__keyevent@0__:expired这个channel,代表了监听key的过期事件。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息。如果把expired换成del,那么监听的就是删除事件。具体支持哪些事件,可从官网查。

上述db是指具体的数据库,Redis不是默认分为16个库么,序号从0-15,所以db就是0到15的数字,示例中的0就是指0对应的数据库。

1.3 问题

第一,由于 Redis 采用惰性清除和定时清除,所以key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布。

第二,Redis的发布订阅模式消息消费只有广播模式一种。所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。

如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息。

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。

所以,综上能够得出一个非常重要的结论,那就是监听Redis过期Key这种方式实现延迟队列,不稳定,坑贼多!那有没有比较靠谱的延迟队列的实现方案呢?这就不得不提到我研究的第二种方案了。

2. Redisson实现延迟队列

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。先来个demo,后面再来说说这种实现的原理。

2.1 Demo

引入pom

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.1</version>
</dependency>

封装了一个RedissonDelayQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Component
@Slf4j
public class RedissonDelayQueue {

private RedissonClient redissonClient;
private RDelayedQueue<String> delayQueue;
private RBlockingQueue<String> blockingQueue;

@PostConstruct
public void init() {
initDelayQueue();
startDelayQueueConsumer();
}

private void initDelayQueue() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress("redis://localhost:6379");
redissonClient = Redisson.create(config);

blockingQueue = redissonClient.getBlockingQueue("SANYOU");
delayQueue = redissonClient.getDelayedQueue(blockingQueue);
}

private void startDelayQueueConsumer() {
new Thread(() -> {
while (true) {
try {
String task = blockingQueue.take();
log.info("接收到延迟任务:{}", task);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "SANYOU-Consumer").start();
}

public void offerTask(String task, long seconds) {
log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);
delayQueue.offer(task, seconds, TimeUnit.SECONDS);
}

}

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueueRBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓。

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。

添加任务的时候是通过RDelayedQueue的offer方法添加的。

controller类,通过接口添加任务,延迟时间为5s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RestController
@RequestMapping("redis/delayqueue")
public class RedissonDelayQueueController {

@Resource
private RedissonDelayQueue redissonDelayQueue;

@PostMapping("/offerTask")
public void offerTask(@RequestBody OfferDelayTaskRequest request) {
redissonDelayQueue.offerTask(request.getTaskBody(), request.getDelaySeconds());
}

}

@Data
public class OfferDelayTaskRequest implements Serializable {
private String taskBody;
private Long delaySeconds;
}

静静等待5s,成功获取到任务。

2.2 实现原理

如下图就是上面demo中,一个延迟队列会在Redis内部使用到的channel和数据类型

SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

  • redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
  • redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。
  • SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
  • redisson_delay_queue_channel:SANYOU,是一个channel,用来通知客户端开启一个延迟任务

有了这些概念之后,再来看看整体的运行原理图

  • 生产者在提交任务的时候将任务放到redisson_delay_queue_timeout:SANYOU中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳

  • 客户端会有一个延迟任务,为了区分,后面我都说是客户端延迟任务。这个延迟任务会向Redis Server发送一段lua脚本,Redis执行lua脚本中的命令,并且是原子性的

这段lua脚本主要干了两件事:

  • 将到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列
  • 获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期时间的延迟任务的到期时间戳,然后发布到redisson_delay_queue_channel:SANYOU这个channel中

当客户端监听到redisson_delay_queue_channel:SANYOU这个channel的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到过期时间的延迟任务的到期时间戳)- 当前时间戳,这个时间其实也就是redisson_delay_queue_channel:SANYOU中最早到过期时间的任务还剩余的延迟时间。

此处可以等待10s,好好想想。。

这样,一旦时间来到了上面说的最早到过期时间任务的到期时间戳,redisson_delay_queue_timeout:SANYOU中上面说的最早到过期时间的任务已经到期了,客户端的延迟任务也同时到期,于是开始执行lua脚本操作,及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中,如此循环往复,一直运行下去,保证redisson_delay_queue_timeout:SANYOU中到期的数据能及时放到目标队列中。

所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列。

这里再补充两个特殊情况,图中没有画出:

第一个就是如果redisson_delay_queue_timeout:SANYOU是新添加的任务(队列之前有或者没有任务)是队列中最早需要被执行的,也会发布消息到channel,之后就按时上面说的流程走了。

添加任务代码如下,也是通过lua脚本来的

img

第二种特殊情况就是项目启动的时候会执行一次客户端延迟任务。项目在重启时,由于没有客户端延迟任务的执行,可能会出现redisson_delay_queue_timeout:SANYOU队列中有到期但是没有被放到目标队列的可能,重启就执行一次就是为了保证到期的数据能被及时放到目标队列中。

3. 总结

第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟。

第二个丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据。

第三个广播消费任务的问题,这个是不会出现的,因为每个客户端都是从同一个目标队列中获取任务的。

第四个问题是Redis内部channel发布事件的问题,跟这种方案不沾边,就更不可能存在了。

所以,通过上面的对比可以看出,Redisson这种实现方案就显得更加的靠谱了。

文章来源:

------ 本文结束------