0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Redis工具集的实现和使用

jf_ro2CN3Fa 来源:稀土掘金技术社区 2023-12-03 17:32 次阅读

1 前言

Redis 基本上是互联网公司必备的工具了,Redis的应用场景实在太多了,但是有很多相似的功能如果每个项目都要实现一遍就显得太麻烦了,所以为了方便,我打算开发一个基于 Redis 的工具集,尽量做到开箱即用。

2 目前实现功能

这个工具集并没有开发完成,实现了部分功能,如下图

8577182e-91bd-11ee-939d-92fbcf53809c.png

简单介绍下已经实现的模块:

common : 整个项目公共模块,比如AOP工具等;

delay: Redis实现的延迟队列;

lock: Redis实现的分布式锁;

mq: Redis实现消息队列;

query: Redis实现分页模糊查询;

web: Redis实现web相关的功能;

duplicate :防止重复提交;

以上的这些模块都是已经实现的了,还有 社交、限流、幂等相关功能后面会陆续实现。

3 如何使用

1.引入 Maven 依赖

目前可以下载代码上传到自己的私服或者本地仓库,后面会推到 Maven 中央仓库


cn.org.wangchangjiu
redis-util-spring-boot-starter
1.0.0-SNAPSHOT

2.配置文件(application.yaml)开启各模块功能开关

redis:
util:
mq:
enable:true
delay:
enable:true

3.实现消息发送者

MQ消息发送:

858caa4a-91bd-11ee-939d-92fbcf53809c.png

延迟消息发送:

85a23932-91bd-11ee-939d-92fbcf53809c.png

4.实现消息监听器

MQ消息监听器:

85c4f9fe-91bd-11ee-939d-92fbcf53809c.png

延迟消息监听器:

85d4b9d4-91bd-11ee-939d-92fbcf53809c.png

4 MQ和delay实现细节

MQ实现细节

容器启动时,简单来说就是通过springboot自动装配,创建一些Bean,如下图:

85f415c2-91bd-11ee-939d-92fbcf53809c.png

值得注意的是,springboot3.X 自动装配方式有点变化,需要创建文件 META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports 文件,文件内容就直接写 自动配置类

8611a2cc-91bd-11ee-939d-92fbcf53809c.png

RedisUtilAutoConfiguration 主自动装配类会 import 各个模块的自动装配类:

8621012c-91bd-11ee-939d-92fbcf53809c.png

我们以 RedisStreamAutoConfiguration 为例:

864536d2-91bd-11ee-939d-92fbcf53809c.png

该装配类生效需要显示打开,然后就是创建各种Bean。

最主要的Bean有:

RedisMessageConsumerManager:

865c99da-91bd-11ee-939d-92fbcf53809c.png

该Bean实现了 BeanPostProcessor 接口,主要作用是,获取被注解 RedisMessageListener 修饰的方法,把信息封装在 RedisMessageConsumerContainer 对象里,方便后面反射调用。

8668f98c-91bd-11ee-939d-92fbcf53809c.png

StreamMessageListenerContainer:

这个Bean主要是做 redis MQ 的配置,比如配置:一次最多获取多少条消息、没有消息时阻塞时间、执行任务的executor、错误处理器、以及消费组、是否自动ACK等配置,具体代码如下:

@Bean(initMethod="start",destroyMethod="stop")
@DependsOn("redisMessageConsumerManager")
@ConditionalOnMissingBean
publicStreamMessageListenerContainer>streamMessageListenerContainer(@AutowiredRedisMessageConsumerManagerredisMessageConsumerManager,
@AutowiredRedisConnectionFactoryredisConnectionFactory,
@AutowiredErrorHandlererrorHandler){
MyRedisStreamProperties.Optionsoptions=myRedisStreamProperties.getOptions();
StreamMessageListenerContainer.StreamMessageListenerContainerOptions>containerOptions=
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
//一次最多获取多少条消息
.batchSize(options.getBatchSize())
//运行Stream的polltask
.executor(getStreamMessageListenerExecutor())
//Stream中没有消息时,阻塞多长时间,需要比`spring.redis.timeout`的时间小
.pollTimeout(options.getPollTimeout())
//获取消息的过程或获取到消息给具体的消息者处理的过程中,发生了异常的处理
.errorHandler(errorHandler)
.build();

StreamMessageListenerContainer>streamMessageListenerContainer=
StreamMessageListenerContainer.create(redisConnectionFactory,containerOptions);

//获取被RedisMessageListener注解修饰的bean
MapconsumerContainerGroups=
redisMessageConsumerManager.getConsumerContainerGroups();

//循环遍历,创建消费组
consumerContainerGroups.forEach((groupQueue,redisMessageConsumerContainer)->{
String[]groupQueues=groupQueue.split("#");

//创建消费组
createGroups(groupQueues);

RedisMessageListenerredisMessageListener=redisMessageConsumerContainer.getRedisMessageListener();
if(!redisMessageListener.useGroup()){
//独立消费不使用组
streamMessageListenerContainer.receive(StreamOffset.fromStart(groupQueues[1]),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}else{
//消费组消费
if(redisMessageListener.autoAck()){
//自动ACK
streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()),
StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}else{
//手动ACK
streamMessageListenerContainer.receive(Consumer.from(groupQueues[0],"consumer:"+UUID.randomUUID()),
StreamOffset.create(groupQueues[1],ReadOffset.lastConsumed()),newDefaultGroupStreamListener(redisMessageConsumerContainer));
}
}
});
returnstreamMessageListenerContainer;
}

/**
*创建消费组
*@paramgroupQueues
*/
privatevoidcreateGroups(String[]groupQueues){
//判断是否存在队列Key
if(stringRedisTemplate.hasKey(groupQueues[1])){
//获取消费组没有则创建
StreamInfo.XInfoGroupsgroups=stringRedisTemplate.opsForStream().groups(groupQueues[1]);
if(groups.isEmpty()){
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}else{
AtomicBooleanexists=newAtomicBoolean(false);
groups.forEach(xInfoGroup->{
if(xInfoGroup.groupName().equals(groupQueues[0])){
exists.set(true);
}
});
if(!exists.get()){
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}
}
}else{
stringRedisTemplate.opsForStream().createGroup(groupQueues[1],groupQueues[0]);
}
}

//todo后面这个线程池也可以交由用户配置
privateExecutorgetStreamMessageListenerExecutor(){
AtomicIntegerindex=newAtomicInteger(1);
intprocessors=Runtime.getRuntime().availableProcessors();
ThreadPoolExecutorexecutor=newThreadPoolExecutor(processors,processors,0,TimeUnit.SECONDS,
newLinkedBlockingDeque<>(),r->{
Threadthread=newThread(r);
thread.setName("async-stream-consumer-"+index.getAndIncrement());
thread.setDaemon(true);
returnthread;
});
returnexecutor;
}

发送消息流程:

8680490c-91bd-11ee-939d-92fbcf53809c.png

redis 延迟队列的实现原理和这个差不多,主要是 redission延迟队列 + 自定义注解 + 反射,代码都差不多。

审核编辑:汤梓红
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 代码
    +关注

    关注

    30

    文章

    4555

    浏览量

    66771
  • 队列
    +关注

    关注

    1

    文章

    46

    浏览量

    10849
  • Redis
    +关注

    关注

    0

    文章

    362

    浏览量

    10496

原文标题:为了方便开发,我打算实现一个Redis 工具集

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    redis分布式锁场景实现

    redis的键值对: set ticket 10 准备 postman、JMeter 等模拟高并发请求的工具 核心代码 @Service public class TicketServiceImpl
    的头像 发表于 09-25 17:09 454次阅读

    Redis Stream应用案例

    的基本使用介绍和设计理念可以看我之前的一篇文章(Redis Stream简介)。Redis Stream本质上是在Redis内核上(非Redis Module)
    发表于 06-26 17:15

    centos7 redis的安装

    centos7 redis 使用,查看Redis工具(安装、添加权限验证、添加开机自启)
    发表于 05-14 17:13

    Redis Cluster的基本原理及实现细节

    Redis Cluster的基本原理和架构 Redis Cluster是分布式Redis实现。随着Redis版本的更替,以及各种已知bug
    发表于 09-28 19:09 0次下载
    <b class='flag-5'>Redis</b> Cluster的基本原理及<b class='flag-5'>实现</b>细节

    Java 使用Redis缓存工具的详细解说

    本文是关于Java 使用Redis缓存工具的详细解说。详细步骤请看下文
    的头像 发表于 02-09 14:10 7636次阅读
    Java 使用<b class='flag-5'>Redis</b>缓存<b class='flag-5'>工具</b>的详细解说

    Windows环境下使用Redis缓存工具的图文详细方法

    Windows环境下使用Redis缓存工具的图文详细方法。Redis 是一个高性能的key-value数据库。redis的出现,很大程度补偿了memcached这类key/value存
    的头像 发表于 02-09 14:25 4542次阅读
    Windows环境下使用<b class='flag-5'>Redis</b>缓存<b class='flag-5'>工具</b>的图文详细方法

    redis设计与实现

    redis
    发表于 06-20 14:44 0次下载

    谈谈Redis怎样配置实现主从复制?

    之前总结过redis的持久化机制:深度剖析Redis持久化机制,持久化机制主要解决redis数据单机备份问题;redis的高可用需要考虑数据的多机备份,多机备份通过主从复制来
    发表于 01-31 11:31 467次阅读

    Redis实现限流的三种方式分享

    当然,限流有许多种实现的方式,Redis具有很强大的功能,我用Redis实践了三种的实现方式,可以较为简单的实现其方式。
    的头像 发表于 02-22 09:52 634次阅读

    Redis官方可视化工具功能强大

    RedisInsight 是一个高颜值,直观高效的 Redis GUI 管理工具,它可以对 Redis 的内存、连接数、命中率以及正常运行时间进行监控
    的头像 发表于 04-23 09:55 624次阅读
    <b class='flag-5'>Redis</b>官方可视化<b class='flag-5'>工具</b>功能强大

    redis分布式锁如何实现

    Redis分布式锁是一种基于Redis实现的机制,可以用于多个进程或多台服务器之间对共享资源的并发访问控制。在分布式系统中,由于多个进程或多台服务器同时访问共享资源,可能会发生数据竞争和资源冲突
    的头像 发表于 11-16 11:29 300次阅读

    Java redis锁怎么实现

    在Java中实现Redis锁涉及到以下几个方面:Redis的安装配置、Redis连接池的使用、Redis数据结构的选择、
    的头像 发表于 12-04 10:47 347次阅读

    redis集群性能测试工具有哪些

    Redis是一种高性能的内存键值存储系统,它被广泛应用于各种互联网应用和大规模的数据存储中。为了评估Redis在不同场景下的性能,我们需要使用一些性能测试工具来对Redis集群进行基准
    的头像 发表于 12-04 11:36 291次阅读

    redis hash底层实现原理

    数据结构是如何实现的呢?本文将详细介绍Redis哈希底层的实现原理。 在Redis中,每个哈希都是由一个类似于字典(Dictionary)的结构实现
    的头像 发表于 12-04 16:27 252次阅读

    redis数据结构的底层实现

    Redis是一种内存键值数据库,常用于缓存、消息队列、实时数据分析等场景。它的高性能得益于其精心设计的数据结构和底层实现。本文将详细介绍Redis常用的数据结构和它们的底层实现
    的头像 发表于 12-05 10:14 310次阅读