0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何用Redis实现延迟队列呢?

jf_ro2CN3Fa 来源:三友的java日记 2023-03-16 14:28 次阅读

背景

前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了。

虽然基于MQ这个方式走不通了,但是这个项目中使用到Redis,所以我就想是否能够使用Redis来代替MQ实现延迟队列的功能,于是我就查了一下有没有现成可用的方案,别说,还真给我查到了两种方案,并且我还仔细研究对比了这两个方案,发现要想很好的实现延迟队列,并不简单。

监听过期key

基于监听过期key的方式来实现延迟队列是我查到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收获

1、Redis发布订阅模式

一谈到发布订阅模式,其实一想到的就是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图:

f3e30a0a-c3b1-11ed-bfe3-dac502259ad0.png

图中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。

生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。

2、keyspace notifications

在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变化。

这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知。

这些默认的channel被分为两类:

以__keyspace@__:为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件。

举个例子,现在有个消费者监听了__keyspace@0__:sanyou这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息

以__keyevent@__:为前缀,后面跟的是消息事件类型,表示监听某个事件

同样举个例子,现在有个消费者监听了__keyevent@0__:expired这个channel,代表了监听key的过期事件。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息。如果把expired换成del,那么监听的就是删除事件。具体支持哪些事件,可从官网查。

上述db是指具体的数据库,Redis不是默认分为16个库么,序号从0-15,所以db就是0到15的数字,示例中的0就是指0对应的数据库。

f3fa3f36-c3b1-11ed-bfe3-dac502259ad0.png

3、延迟队列实现原理

通过对上面的两个概念了解之后,应该就对监听过期key的实现原理一目了然了,其实就是当这个key过期之后,Redis会发布一个key过期的事件到__keyevent@__:expired这个channel,只要我们的服务监听这个channel,那么就能知道过期的Key,从而就算实现了延迟队列功能。

所以这种方式实现延迟队列就只需要两步:

发送延迟任务,key是延迟消息本身,过期时间就是延迟时间

监听__keyevent@__:expired这个channel,处理延迟任务

4、demo

好了,基本概念和核心原理都说完了之后,又到了show me the code环节。

好巧不巧,Spring已经实现了监听__keyevent@*__:expired这个channel这个功能,__keyevent@*__:expired中的*代表通配符的意思,监听所有的数据库。

所以demo写起来就很简单了,只需3步即可

引入pom


org.springframework.boot
spring-boot-starter-data-redis
2.2.5.RELEASE



org.springframework.boot
spring-boot-starter-web
2.2.5.RELEASE

配置类

@Configuration
publicclassRedisConfiguration{

@Bean
publicRedisMessageListenerContainerredisMessageListenerContainer(RedisConnectionFactoryconnectionFactory){
RedisMessageListenerContainerredisMessageListenerContainer=newRedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
returnredisMessageListenerContainer;
}

@Bean
publicKeyExpirationEventMessageListenerredisKeyExpirationListener(RedisMessageListenerContainerredisMessageListenerContainer){
returnnewKeyExpirationEventMessageListener(redisMessageListenerContainer);
}

}

KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听

f4060884-c3b1-11ed-bfe3-dac502259ad0.png

当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件

f4560ff0-c3b1-11ed-bfe3-dac502259ad0.png

所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息。

对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener

@Component
publicclassMyRedisKeyExpiredEventListenerimplementsApplicationListener{

@Override
publicvoidonApplicationEvent(RedisKeyExpiredEventevent){
byte[]body=event.getSource();
System.out.println("获取到延迟消息:"+newString(body));
}

}

整个工程目录也简单

f4a5f2f4-c3b1-11ed-bfe3-dac502259ad0.png

代码写好,启动应用

之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s

setsanyoutask

expiresanyou5

如果上面都理论都正确,不出意外的话,5s后MyRedisKeyExpiredEventListener应该可以监听到sanyou这个key过期的消息,也就相当于拿到了延迟任务,控制台会打印出获取到延迟消息:sanyou。

于是我满怀希望,静静地等待了5s。。

5、4、3、2、1,时间一到,我查看控制台,但是控制台并没有按照预期打印出上面那句话。

为什么会没打印出?难道是代码写错了?正当我准备检查代码的时候,官网的一段话道出了真实原因。

f4dd25ee-c3b1-11ed-bfe3-dac502259ad0.png

我给大家翻译一下上面这段话讲的内容。

上面这段话主要讨论的是key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略,就是面试八股文常背的两种:

惰性清除。当这个key过期之后,访问时,这个Key才会被清除

定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除

再后面那段话是核心,意思是说,key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布。

到这我终于明白了,上面的例子中即使我设置了5s的过期时间,但是当5s过去之后,只要两种清除策略都不满足,没人访问sanyou这个key,后台的定时清理的任务也没扫描到sanyou这个key,那么就不会发布key过期的事件,自然而然也就监听不到了。

至于后台的定时清理的任务什么时候能扫到,这个没有固定时间,可能一到过期时间就被扫到,也可能等一定时间才会被扫到,这就可能会造成了客户端从发布到监听到的消息时间差会大于等于过期时间,从而造成一定时间消息的延迟,这就着实有点坑了。。

5、坑

除了上面测试demo的时候遇到的坑之外,在我深入研究之后,还发现了一些更离谱的坑。

丢消息太频繁

Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息。

Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。

所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。

消息消费只有广播模式

Redis的发布订阅模式消息消费只有广播模式一种。

所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。

f500e1e6-c3b1-11ed-bfe3-dac502259ad0.png

如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息。

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。

接收到所有key的某个事件

这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题。

当消费者监听了以__keyevent@__:开头的消息,那么会导致所有的key发生了事件都会被通知给消费者。

举个例子,某个消费者监听了__keyevent@*__:expired这个channel,那么只要key过期了,不管这个key是张三还会李四,消费者都能收到。

所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务。

所以,综上能够得出一个非常重要的结论,那就是监听Redis过期Key这种方式实现延迟队列,不稳定,坑贼多!

那有没有比较靠谱的延迟队列的实现方案呢?这就不得不提到我研究的第二种方案了。

Redisson实现延迟队列

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。

先来个demo,后面再来说说这种实现的原理。

1、demo

引入pom


org.redisson
redisson
3.13.1

封装了一个RedissonDelayQueue类

@Component
@Slf4j
publicclassRedissonDelayQueue{

privateRedissonClientredissonClient;

privateRDelayedQueuedelayQueue;
privateRBlockingQueueblockingQueue;

@PostConstruct
publicvoidinit(){
initDelayQueue();
startDelayQueueConsumer();
}

privatevoidinitDelayQueue(){
Configconfig=newConfig();
SingleServerConfigserverConfig=config.useSingleServer();
serverConfig.setAddress("redis://localhost:6379");
redissonClient=Redisson.create(config);

blockingQueue=redissonClient.getBlockingQueue("SANYOU");
delayQueue=redissonClient.getDelayedQueue(blockingQueue);
}

privatevoidstartDelayQueueConsumer(){
newThread(()->{
while(true){
try{
Stringtask=blockingQueue.take();
log.info("接收到延迟任务:{}",task);
}catch(Exceptione){
e.printStackTrace();
}
}
},"SANYOU-Consumer").start();
}

publicvoidofferTask(Stringtask,longseconds){
log.info("添加延迟任务:{}延迟时间:{}s",task,seconds);
delayQueue.offer(task,seconds,TimeUnit.SECONDS);
}

}

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓。

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。

添加任务的时候是通过RDelayedQueue的offer方法添加的。

controller类,通过接口添加任务,延迟时间为5s

@RestController
publicclassRedissonDelayQueueController{

@Resource
privateRedissonDelayQueueredissonDelayQueue;

@GetMapping("/add")
publicvoidaddTask(@RequestParam("task")Stringtask){
redissonDelayQueue.offerTask(task,5);
}

}

启动项目,添加任务

静静等待5s,成功获取到任务。

f510d5e2-c3b1-11ed-bfe3-dac502259ad0.png

2、实现原理

如下图就是上面demo中,一个延迟队列会在Redis内部使用到的channel和数据类型

f51a509a-c3b1-11ed-bfe3-dac502259ad0.png

SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要

redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。

SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的

redisson_delay_queue_channel:SANYOU,是一个channel,用来通知客户端开启一个延迟任务

有了这些概念之后,再来看看整体的运行原理图

f534b340-c3b1-11ed-bfe3-dac502259ad0.png

生产者在提交任务的时候将任务放到redisson_delay_queue_timeout:SANYOU中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳

客户端会有一个延迟任务,为了区分,后面我都说是客户端延迟任务。这个延迟任务会向Redis Server发送一段lua脚本,Redis执行lua脚本中的命令,并且是原子性的

f5718518-c3b1-11ed-bfe3-dac502259ad0.png

这段lua脚本主要干了两件事:

将到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列

获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期时间的延迟任务的到期时间戳,然后发布到redisson_delay_queue_channel:SANYOU这个channel中

当客户端监听到redisson_delay_queue_channel:SANYOU这个channel的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到过期时间的延迟任务的到期时间戳)- 当前时间戳,这个时间其实也就是redisson_delay_queue_channel:SANYOU中最早到过期时间的任务还剩余的延迟时间。

此处可以等待10s,好好想想。。

这样,一旦时间来到了上面说的最早到过期时间任务的到期时间戳,redisson_delay_queue_timeout:SANYOU中上面说的最早到过期时间的任务已经到期了,客户端的延迟任务也同时到期,于是开始执行lua脚本操作,及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中,如此循环往复,一直运行下去,保证redisson_delay_queue_timeout:SANYOU中到期的数据能及时放到目标队列中。

所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列。

这里再补充两个特殊情况,图中没有画出:

第一个就是如果redisson_delay_queue_timeout:SANYOU是新添加的任务(队列之前有或者没有任务)是队列中最早需要被执行的,也会发布消息到channel,之后就按时上面说的流程走了。

添加任务代码如下,也是通过lua脚本来的

f5ad6d94-c3b1-11ed-bfe3-dac502259ad0.png

第二种特殊情况就是项目启动的时候会执行一次客户端延迟任务。项目在重启时,由于没有客户端延迟任务的执行,可能会出现redisson_delay_queue_timeout:SANYOU队列中有到期但是没有被放到目标队列的可能,重启就执行一次就是为了保证到期的数据能被及时放到目标队列中。

3、与第一种方案比较

现在来比较一下第一种方案和Redisson的这种方案,看看有没有第一种方案的那些坑。

第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟。

第二个丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据。

第三个广播消费任务的问题,这个是不会出现的,因为每个客户端都是从同一个目标队列中获取任务的。

第四个问题是Redis内部channel发布事件的问题,跟这种方案不沾边,就更不可能存在了。

所以,通过上面的对比可以看出,Redisson这种实现方案就显得更加的靠谱了。






审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据库
    +关注

    关注

    7

    文章

    3591

    浏览量

    63371
  • lua脚本
    +关注

    关注

    0

    文章

    21

    浏览量

    7529
  • Redis
    +关注

    关注

    0

    文章

    362

    浏览量

    10496

原文标题:用 Redis 实现延迟队列,我研究了两种方案,发现并不简单

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    延迟队列实现方式

    延迟任务 最近有一个需求,基于消息队列对数据消费,并根据多次消费的结果对数据进行重新组装,如果在指定时间内,需要的数据全部到达,则进行数据组装以及后续逻辑。简单的说,设置一个超时时间,如果在该时间内
    的头像 发表于 09-30 11:17 614次阅读

    MySQL与Redis延迟双删策略

    中,并且如果数据库中的数据发生了改变则需要同步到redis中,同步过程中需要保证 MySQL与redis数据一致性问题,在这个同步过程中出现短暂的数据延迟也是正常现象,但是最终需要保证mysql与缓存中的一致性。 //我们通常使
    的头像 发表于 09-25 14:28 587次阅读
    MySQL与<b class='flag-5'>Redis</b><b class='flag-5'>延迟</b>双删策略

    Redis实战篇-26.Redis消息队列-基于List实现

    Redis
    电子学习
    发布于 :2023年01月07日 17:09:44

    Redis Stream应用案例

    的基本使用介绍和设计理念可以看我之前的一篇文章(Redis Stream简介)。Redis Stream本质上是在Redis内核上(非Redis Module)
    发表于 06-26 17:15

    FreeRtos中消息队列API的调用该怎样去实现

    消息队列是什么?消息队列有何作用?FreeRtos中消息队列API的调用该怎样去实现
    发表于 01-20 07:04

    如何去实现一种队列程序的设计

    队列的原理是什么?队列有何作用?如何去实现一种队列程序的设计
    发表于 02-25 07:50

    环形队列的操作如何去实现

    环形队列结构的定义是什么?环形队列的操作如何去实现
    发表于 02-25 06:35

    redis工作原理

    Redis作为内存数据库,拥有非常高的性能,单个实例的QPS能够达到10W左右。但我们在使用Redis时,经常时不时会出现访问延迟很大的情况,如果你不知道Redis的内部
    的头像 发表于 09-24 15:57 3381次阅读

    Redis 延时队列,一次性搞明白

    所谓延时队列就是延时的消息队列,下面说一下一些业务场景 实践场景 订单支付失败,每隔一段时间提醒用户 用户并发量的情况,可以延时2分钟给用户发短信 先来看看Redis实现普通的消息
    的头像 发表于 10-30 16:34 1931次阅读
    <b class='flag-5'>Redis</b> 延时<b class='flag-5'>队列</b>,一次性搞明白

    Springboot+redis操作多种实现

    操作。Jedis客户端实例不是线程安全的,需要通过连接池来使用Jedis。 1.2、Redisson 优点点:分布式锁,分布式集合,可通过Redis支持延迟队列。 1.3、
    的头像 发表于 09-22 10:48 1601次阅读
    Springboot+<b class='flag-5'>redis</b>操作多种<b class='flag-5'>实现</b>

    实现一个双端队列的步骤简析

    队列是非常基础且重要的数据结构,双端队列属于队列的升级。很多的算法都是基于队列实现,例如搜索中的bfs,图论中的spfa,计算几何中的me
    的头像 发表于 10-27 18:11 1087次阅读

    什么是 Redis

    其他用例中变得可行,包括发布-订阅机制、流(streaming)和队列。 主要来说,Redis 是一个内存数据库,用作另一个“真实”数据
    的头像 发表于 05-22 15:32 645次阅读
    什么是 <b class='flag-5'>Redis</b>

    何用Springboot整合Redis

    本篇文件我们来介绍如何用Springboot整合Redis。 1、Docker 安装 Redis 1.1 下载镜像 docker pull redis: 6 . 2 . 6 1.2 创
    的头像 发表于 10-08 14:56 327次阅读
    如<b class='flag-5'>何用</b>Springboot整合<b class='flag-5'>Redis</b>

    Java redis锁怎么实现

    在Java中实现Redis锁涉及到以下几个方面:Redis的安装配置、Redis连接池的使用、Redis数据结构的选择、
    的头像 发表于 12-04 10:47 347次阅读

    redis数据结构的底层实现

    Redis是一种内存键值数据库,常用于缓存、消息队列、实时数据分析等场景。它的高性能得益于其精心设计的数据结构和底层实现。本文将详细介绍Redis常用的数据结构和它们的底层
    的头像 发表于 12-05 10:14 312次阅读