0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

高并发场景下请求合并

jf_ro2CN3Fa 来源:CSDN 2023-10-09 16:05 次阅读

前言

请求合并到底有什么意义呢?我们来看下图。

4c4d5f44-666c-11ee-939d-92fbcf53809c.png

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

4c5f409c-666c-11ee-939d-92fbcf53809c.png

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

publicinterfaceUserService{

MapqueryUserByIdBatch(ListuserReqs);
}
@Service
publicclassUserServiceImplimplementsUserService{

@Resource
privateUsersMapperusersMapper;

@Override
publicMapqueryUserByIdBatch(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据
result.put(val.getRequestId(),null);
}
});
returnresult;
}
}

合并请求的实现

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方
**/
@Service
publicclassUserWrapBatchService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id唯一
StringrequestId;
//参数
LonguserId;
//TODOJava8的CompletableFuture并没有timeout机制
CompletableFuturecompletableFuture;

publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicCompletableFuturegetCompletableFuture(){
returncompletableFuture;
}

publicvoidsetCompletableFuture(CompletableFuturecompletableFuture){
this.completableFuture=completableFuture;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatch(userReqs);
//将处理结果返回各自的请求
for(Requestrequest:list){
Usersresult=response.get(request.requestId);
request.completableFuture.complete(result);//completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
}
},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
CompletableFuturefuture=newCompletableFuture<>();
request.completableFuture=future;
//将对象传入队列
queue.offer(request);
//如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
try{
returnfuture.get();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(ExecutionExceptione){
e.printStackTrace();
}
returnnull;
}
}

控制层调用

/***
*请求合并
**/
@RequestMapping("/merge")
publicCallablemerge(LonguserId){
returnnewCallable(){
@Override
publicUserscall()throwsException{
returnuserBatchService.queryUser(userId);
}
};
}

Callable是什么可以参考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

packagecom.springboot.sample;

importorg.springframework.web.client.RestTemplate;

importjava.util.Random;
importjava.util.concurrent.CountDownLatch;

publicclassTestBatch{
privatestaticintthreadCount=30;

privatefinalstaticCountDownLatchCOUNT_DOWN_LATCH=newCountDownLatch(threadCount);//为保证30个线程同时并发运行

privatestaticfinalRestTemplaterestTemplate=newRestTemplate();

publicstaticvoidmain(String[]args){


for(inti=0;i< threadCount; i++) {//循环开30个线程
newThread(newRunnable(){
publicvoidrun(){
COUNT_DOWN_LATCH.countDown();//每次减一
try{
COUNT_DOWN_LATCH.await();//此处等待状态,为了让30个线程同时进行
}catch(InterruptedExceptione){
e.printStackTrace();
}

for(intj=1;j<= 3;j++){
intparam=newRandom().nextInt(4);
if(param<=0){
param++;
}
StringresponseBody=restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId="+param,String.class);
System.out.println(Thread.currentThread().getName()+"参数"+param+"返回值"+responseBody);
}
}
}).start();

}
}
}

测试效果

4c6bfaa8-666c-11ee-939d-92fbcf53809c.png4c7c9dfe-666c-11ee-939d-92fbcf53809c.png

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方,使用queue解决超时问题
**/
@Service
publicclassUserWrapBatchQueueService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id
StringrequestId;

//参数
LonguserId;
//队列,这个有超时机制
LinkedBlockingQueueusersQueue;


publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicLinkedBlockingQueuegetUsersQueue(){
returnusersQueue;
}

publicvoidsetUsersQueue(LinkedBlockingQueueusersQueue){
this.usersQueue=usersQueue;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatchQueue(userReqs);
for(RequestuserReq:userReqs){
//这里再把结果放到队列里
Usersusers=response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
LinkedBlockingQueueusersQueue=newLinkedBlockingQueue<>();
request.usersQueue=usersQueue;
//将对象传入队列
queue.offer(request);
//取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
try{
returnusersQueue.poll(3000,TimeUnit.MILLISECONDS);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnnull;
}
}
...省略..

@Override
publicMapqueryUserByIdBatchQueue(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
//数据分组
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据,这里要new,不然加入队列会空指针
result.put(val.getRequestId(),newUsers());
}
});
returnresult;
}

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 服务器
    +关注

    关注

    12

    文章

    8116

    浏览量

    82509
  • JAVA
    +关注

    关注

    19

    文章

    2904

    浏览量

    102994
  • 数据库
    +关注

    关注

    7

    文章

    3591

    浏览量

    63369

原文标题:高并发场景下请求合并

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    基于阿里云Serverless架构函数计算的最新应用场景详解(一)

    分配资源。3、具备真正意义上的高度扩容和弹性。4、按需使用,按需计费。根据Serverless的这些通用特点,归纳出下面几种典型使用场景,供大家参考。事件请求场景定制图片网店中的商品图片维护,根据商品
    发表于 01-25 11:06

    从服务端视角看并发难题

    `所谓服务器大流量并发指的是:在同时或极短时间内,有大量的请求到达服务端,每个请求都需要服务端耗费资源进行处理,并做出相应的反馈。 从服务端视角看
    发表于 11-02 15:11

    消息队列的应用场景

    CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。  小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢
    发表于 06-23 10:19

    如何去实现一种基于SpringMVC的电商并发秒杀系统设计

    参考博客Java并发秒杀系统API目录业务场景要解决的问题Redis的使用业务场景首页倒计时秒杀活动,抢购商品要解决的问题
    发表于 01-03 07:50

    Lite Actor:方舟Actor并发模型的轻量级优化

    并发模型是用来实现不同应用场景并发任务的编程模型,通过合理地使用多线程,可以缩减应用程序的开发和维护成本,同时还能更好地提升应用程序在多核设备中的运行性能。随着IoT时代应用
    发表于 07-18 12:00

    ATC'22顶会论文RunD:高密并发的轻量级 Serverless 安全容器运行时 | 龙蜥技术

    和 FireCracker 都提供了实现这种安全容器的实践经验。(图1/目前主流的安全容器模型,以及对应的软件栈架构)在 Serverless 场景,函数的轻量级和短期运行的特性使得高密度容器部署和
    发表于 09-05 15:18

    HarmonyOS如何使用异步并发能力进行开发

    并发是指异步代码在执行到一定程度后会被暂停,以便在未来某个时间点继续执行,这种情况,同一时间只有一段代码在执行。 ● 多线程并发允许在同一时间段内同时执行多段代码。在主线程继续响应用户操作和更新
    发表于 09-22 17:35

    P2P流媒体系统中并发请求的数据分发算法

    大规模并发请求是流媒体直播系统面临的一个挑战,也是视频点播系统中亟待解决的一个问题。本文针对相同数据的并发请求问题,提出了一种高效,低带宽消耗、低延迟的数据
    发表于 12-30 14:21 14次下载

    轻松上手 无线上网使用之双剑合并发短信

    轻松上手 无线上网使用之双剑合并发短信 由于手机本身的体积限制而没有专用键盘,目前来说,许多短信一族都只好去练就一身“一
    发表于 01-26 11:04 634次阅读

    IO请求在block layer的来龙去脉

    所谓请求合并就是将进程内或者进程间产生的在物理地址上连续的多个IO请求合并成单个IO请求一并处理,从而提升IO
    的头像 发表于 06-06 15:39 8104次阅读
    IO<b class='flag-5'>请求</b>在block layer的来龙去脉

    缓存一致性问题及缓存并发问题

    在高并发场景下,如果某一个key被高并发访问,没有被命中,出于对容错性考虑,会尝试去从后端数据库中获取,从而导致了大量请求达到数据库,而当该key对应的数据本身就是空的情况下,这就导致
    的头像 发表于 08-09 15:52 5072次阅读
    缓存一致性问题及缓存<b class='flag-5'>并发</b>问题

    一文汇总并发http请求最快的几种实现方式用

    假如有一个文件,里面有 10 万个 url,需要对每个 url 发送 http 请求,并打印请求结果的状态码,如何编写代码尽可能快的完成这些任务呢? Python 并发编程有很多方法,多线程的标准
    的头像 发表于 10-20 14:36 3910次阅读
    一文汇总<b class='flag-5'>并发</b>http<b class='flag-5'>请求</b>最快的几种实现方式用

    效率加倍,高并发场景下的接口请求合并方案

    假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?
    的头像 发表于 01-13 10:09 590次阅读

    Linux 6.3内核的合并窗口已开启

    Linus 认为,如果你不能解释清楚一个合并请求,那么就不要提交,这是很简单的道理。如果不解释提交合并请求的原因,那就是在生产垃圾。在这种情况下,Linus 觉得这种合并请求根本就不应该存在。
    的头像 发表于 02-27 10:04 413次阅读

    工业物联网平台如何应对高并发应用场景

    面对的巨大挑战。对此,数之能提供高并发、官翻机接入的工业物联网平台,可以适应高并发场景应用需求。 高并发是一种在“同一时间点或极短时间内出现大量的操作
    的头像 发表于 09-06 14:21 296次阅读