0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

效率加倍,高并发场景下的接口请求合并方案

jf_ro2CN3Fa 来源:芋道源码 2023-01-13 10:09 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群


前言

请求合并到底有什么意义呢?我们来看下图。

281af012-92e3-11ed-bfe3-dac502259ad0.png

假设我们3个用户(用户id分别是1、2、3),现在他们都要查询自己的基本信息,请求到服务器,服务器端请求数据库,发出3次请求。我们都知道数据库连接资源是相当宝贵的,那么我们怎么尽可能节省连接资源呢?

这里把数据库换成被调用的远程服务,也是同样的道理。

我们改变下思路,如下图所示。

28311838-92e3-11ed-bfe3-dac502259ad0.png

我们在服务器端把请求合并,只发出一条SQL查询数据库,数据库返回后,服务器端处理返回数据,根据一个唯一请求ID,把数据分组,返回给对应用户。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

技术手段

  • LinkedBlockQueue 阻塞队列
  • ScheduledThreadPoolExecutor 定时任务线程池
  • CompleteableFuture future 阻塞机制(Java 8 的 CompletableFuture 并没有 timeout 机制,后面优化,使用了队列替代)

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

代码实现

查询用户的代码

publicinterfaceUserService{

MapqueryUserByIdBatch(ListuserReqs);
}
@Service
publicclassUserServiceImplimplementsUserService{

@Resource
privateUsersMapperusersMapper;

@Override
publicMapqueryUserByIdBatch(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据
result.put(val.getRequestId(),null);
}
});
returnresult;
}
}

合并请求的实现

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方
**/
@Service
publicclassUserWrapBatchService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id唯一
StringrequestId;
//参数
LonguserId;
//TODOJava8的CompletableFuture并没有timeout机制
CompletableFuturecompletableFuture;

publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicCompletableFuturegetCompletableFuture(){
returncompletableFuture;
}

publicvoidsetCompletableFuture(CompletableFuturecompletableFuture){
this.completableFuture=completableFuture;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatch(userReqs);
//将处理结果返回各自的请求
for(Requestrequest:list){
Usersresult=response.get(request.requestId);
request.completableFuture.complete(result);//completableFuture.complete方法完成赋值,这一步执行完毕,下面future.get()阻塞的请求可以继续执行了
}
},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
CompletableFuturefuture=newCompletableFuture<>();
request.completableFuture=future;
//将对象传入队列
queue.offer(request);
//如果这时候没完成赋值,那么就会阻塞,直到能够拿到值
try{
returnfuture.get();
}catch(InterruptedExceptione){
e.printStackTrace();
}catch(ExecutionExceptione){
e.printStackTrace();
}
returnnull;
}
}

控制层调用

/***
*请求合并
**/
@RequestMapping("/merge")
publicCallablemerge(LonguserId){
returnnewCallable(){
@Override
publicUserscall()throwsException{
returnuserBatchService.queryUser(userId);
}
};
}

Callable是什么可以参考:

  • https://blog.csdn.net/baidu_19473529/article/details/123596792

模拟高并发查询的代码

packagecom.springboot.sample;

importorg.springframework.web.client.RestTemplate;

importjava.util.Random;
importjava.util.concurrent.CountDownLatch;

publicclassTestBatch{
privatestaticintthreadCount=30;

privatefinalstaticCountDownLatchCOUNT_DOWN_LATCH=newCountDownLatch(threadCount);//为保证30个线程同时并发运行

privatestaticfinalRestTemplaterestTemplate=newRestTemplate();

publicstaticvoidmain(String[]args){


for(inti=0;i< threadCount; i++) {//循环开30个线程
newThread(newRunnable(){
publicvoidrun(){
COUNT_DOWN_LATCH.countDown();//每次减一
try{
COUNT_DOWN_LATCH.await();//此处等待状态,为了让30个线程同时进行
}catch(InterruptedExceptione){
e.printStackTrace();
}

for(intj=1;j<= 3;j++){
intparam=newRandom().nextInt(4);
if(param<=0){
param++;
}
StringresponseBody=restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId="+param,String.class);
System.out.println(Thread.currentThread().getName()+"参数"+param+"返回值"+responseBody);
}
}
}).start();

}
}
}

测试效果

283d32f8-92e3-11ed-bfe3-dac502259ad0.png284d3d4c-92e3-11ed-bfe3-dac502259ad0.png

要注意的问题

  • Java 8 的 CompletableFuture 并没有 timeout 机制
  • 后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行(本例中加了MAX_TASK_NUM判断)

使用队列的超时解决Java 8 的 CompletableFuture 并没有 timeout 机制

核心代码

packagecom.springboot.sample.service.impl;

importcom.springboot.sample.bean.Users;
importcom.springboot.sample.service.UserService;
importorg.springframework.stereotype.Service;

importjavax.annotation.PostConstruct;
importjavax.annotation.Resource;
importjava.util.*;
importjava.util.concurrent.*;

/***
*zzq
*包装成批量执行的地方,使用queue解决超时问题
**/
@Service
publicclassUserWrapBatchQueueService{
@Resource
privateUserServiceuserService;

/**
*最大任务数
**/
publicstaticintMAX_TASK_NUM=100;


/**
*请求类,code为查询的共同特征,例如查询商品,通过不同id的来区分
*CompletableFuture将处理结果返回
*/
publicclassRequest{
//请求id
StringrequestId;

//参数
LonguserId;
//队列,这个有超时机制
LinkedBlockingQueueusersQueue;


publicStringgetRequestId(){
returnrequestId;
}

publicvoidsetRequestId(StringrequestId){
this.requestId=requestId;
}

publicLonggetUserId(){
returnuserId;
}

publicvoidsetUserId(LonguserId){
this.userId=userId;
}

publicLinkedBlockingQueuegetUsersQueue(){
returnusersQueue;
}

publicvoidsetUsersQueue(LinkedBlockingQueueusersQueue){
this.usersQueue=usersQueue;
}
}

/*
LinkedBlockingQueue是一个阻塞的队列,内部采用链表的结果,通过两个ReenTrantLock来保证线程安全
LinkedBlockingQueue与ArrayBlockingQueue的区别
ArrayBlockingQueue默认指定了长度,而LinkedBlockingQueue的默认长度是Integer.MAX_VALUE,也就是无界队列,在移除的速度小于添加的速度时,容易造成OOM。
ArrayBlockingQueue的存储容器是数组,而LinkedBlockingQueue是存储容器是链表
两者的实现队列添加或移除的锁不一样,ArrayBlockingQueue实现的队列中的锁是没有分离的,即添加操作和移除操作采用的同一个ReenterLock锁,
而LinkedBlockingQueue实现的队列中的锁是分离的,其添加采用的是putLock,移除采用的则是takeLock,这样能大大提高队列的吞吐量,
也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
*/
privatefinalQueuequeue=newLinkedBlockingQueue();

@PostConstruct
publicvoidinit(){
//定时任务线程池,创建一个支持定时、周期性或延时任务的限定线程数目(这里传入的是1)的线程池
ScheduledExecutorServicescheduledExecutorService=Executors.newScheduledThreadPool(1);

scheduledExecutorService.scheduleAtFixedRate(()->{
intsize=queue.size();
//如果队列没数据,表示这段时间没有请求,直接返回
if(size==0){
return;
}
Listlist=newArrayList<>();
System.out.println("合并了["+size+"]个请求");
//将队列的请求消费到一个集合保存
for(inti=0;i< size; i++) {
                //后面的SQL语句是有长度限制的,所以还要做限制每次批量的数量,超过最大任务数,等下次执行
if(i< MAX_TASK_NUM) {
                    list.add(queue.poll());
                }
            }
            //拿到我们需要去数据库查询的特征,保存为集合
ListuserReqs=newArrayList<>();
for(Requestrequest:list){
userReqs.add(request);
}
//将参数传入service处理,这里是本地服务,也可以把userService看成RPC之类的远程调用
Mapresponse=userService.queryUserByIdBatchQueue(userReqs);
for(RequestuserReq:userReqs){
//这里再把结果放到队列里
Usersusers=response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}

},100,10,TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性执行schedule是延迟执行initialDelay是初始延迟period是周期间隔后面是单位
//这里我写的是初始化后100毫秒后执行,周期性执行10毫秒执行一次
}

publicUsersqueryUser(LonguserId){
Requestrequest=newRequest();
//这里用UUID做请求id
request.requestId=UUID.randomUUID().toString().replace("-","");
request.userId=userId;
LinkedBlockingQueueusersQueue=newLinkedBlockingQueue<>();
request.usersQueue=usersQueue;
//将对象传入队列
queue.offer(request);
//取出元素时,如果队列为空,给定阻塞多少毫秒再队列取值,这里是3秒
try{
returnusersQueue.poll(3000,TimeUnit.MILLISECONDS);
}catch(InterruptedExceptione){
e.printStackTrace();
}
returnnull;
}
}
...省略..

@Override
publicMapqueryUserByIdBatchQueue(ListuserReqs){
//全部参数
ListuserIds=userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapperqueryWrapper=newQueryWrapper<>();
//用in语句合并成一条SQL,避免多次请求数据库的IO
queryWrapper.in("id",userIds);
Listusers=usersMapper.selectList(queryWrapper);
Map>userGroup=users.stream().collect(Collectors.groupingBy(Users::getId));
HashMapresult=newHashMap<>();
//数据分组
userReqs.forEach(val->{
ListusersList=userGroup.get(val.getUserId());
if(!CollectionUtils.isEmpty(usersList)){
result.put(val.getRequestId(),usersList.get(0));
}else{
//表示没数据,这里要new,不然加入队列会空指针
result.put(val.getRequestId(),newUsers());
}
});
returnresult;
}

...省略...

小结

请求合并,批量的办法能大幅节省被调用系统的连接资源,本例是以数据库为例,其他RPC调用也是类似的道理。缺点就是请求的时间在执行实际的逻辑之前增加了等待时间,不适合低并发的场景。

代码地址

  • https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5

参考

  • https://www.cnblogs.com/oyjg/p/13099998.html


审核编辑 :李倩


声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接口
    +关注

    关注

    33

    文章

    9443

    浏览量

    156134
  • 服务器
    +关注

    关注

    13

    文章

    10094

    浏览量

    90876
  • 数据库
    +关注

    关注

    7

    文章

    3993

    浏览量

    67732

原文标题:效率加倍,高并发场景下的接口请求合并方案

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    Hi8000大电流高效率升压恒压驱动芯片智芯一级代理聚能芯半导体原厂技术支持赋能多场景电源设计

    模式切换功能,可根据负载大小自动切换 PWM、PFM 和 BURST 模式,确保轻、中、重负载全场景均保持电源系统效率,避免传统芯片在轻载时能效下滑的痛点。低待机关机功能同样亮眼,
    发表于 12-04 17:09

    Temu跨境电商按关键字搜索Temu商品API的应用及接口请求示例

    Temu跨境电商按关键字搜索Temu商品API的应用及接口请求示例 Temu跨境电商按关键字搜索Temu商品API的应用场景 Temu跨境电商平台按关键字搜索Temu商品API的主要应用场景包括但不
    的头像 发表于 11-29 15:08 90次阅读

    工业物联网数据中台的并发性有什么作用

    工业物联网数据中台的并发性是保障其在复杂工业场景稳定运行的核心能力之一。它的核心作用是确保大量设备同时接入和数据传输时,系统依然能高效处理、不卡顿、不丢失数据,能够在单位时间内高效
    的头像 发表于 10-28 11:28 182次阅读
    工业物联网数据中台的<b class='flag-5'>高</b><b class='flag-5'>并发</b>性有什么作用

    订单拆单合并处理接口设计与实现

    处理接口能显著提升系统性能,降低运营开销。本文将逐步介绍该接口的核心设计、实现细节和使用场景,帮助开发者快速上手。 1. 接口核心功能 该接口
    的头像 发表于 10-16 14:47 323次阅读
    订单拆单<b class='flag-5'>合并</b>处理<b class='flag-5'>接口</b>设计与实现

    CS、IU系列单声道音频功放 —— 覆盖从低功耗到功率的全场景解决方案

    CS、IU系列单声道音频功放 —— 覆盖从低功耗到功率的全场景解决方案 在音频应用中,不同场景对功放的要求差异巨大:从便携式设备的低功耗,到家庭影院、专业音响的
    发表于 09-16 09:07

    有没有针对特定行业或场景的装置数据验证效率提升方案

    )、工业制造(汽车 / 化工)、数据中心、医疗行业四大典型场景,提供定制化的验证效率提升方案,兼顾 “效率” 与 “行业合规性”。 一、新能源场站(光伏 / 风电):解决 “地理分散、
    的头像 发表于 09-04 17:47 457次阅读
    有没有针对特定行业或<b class='flag-5'>场景</b>的装置数据验证<b class='flag-5'>效率</b>提升<b class='flag-5'>方案</b>?

    Nginx并发优化方案

    作为一名在生产环境中摸爬滚打多年的运维工程师,我见过太多因为Nginx配置不当导致的性能瓶颈。今天分享一套完整的Nginx并发优化方案,帮助你的系统从10万QPS突破到百万级别。
    的头像 发表于 08-13 15:51 667次阅读

    NVMe高速传输之摆脱XDMA设计19:PCIe请求模块设计(

    组装写请求的TLP报头,并将报头通过axis_rq接口发送。当axis_rq接口握手时跳转到WR_DATA状态。WR_DATA:请求写TLP数据发送状态。该状态
    发表于 08-11 15:24

    NVMe高速传输之摆脱XDMA设计13:PCIe请求模块设计(

    避免异常情况的出现。 WR_HEAD:请求写TLP头发送状态。该状态根据请求类型、请求地址组装写请求的TLP报头,并将报头通过axis
    发表于 08-04 16:39

    NVMe高速传输之摆脱XDMA设计13:PCIe请求模块设计(

    在接收到请求总线接口请求事务后,当请求类型的值为0时,表示通过PCIE硬核的配置管理接口发送请求
    的头像 发表于 08-04 16:35 371次阅读
    NVMe高速传输之摆脱XDMA设计13:PCIe<b class='flag-5'>请求</b>模块设计(<b class='flag-5'>下</b>)

    鸿蒙5开发宝藏案例分享---应用并发设计

    ?** 鸿蒙并发编程实战指南:解锁ArkTS多线程黑科技** 嘿,开发者朋友们! 今天给大家扒一扒鸿蒙官方文档里藏着的并发编程宝藏—— 100+实战场景解决方案 !从金融理财到游戏开发
    发表于 06-12 16:19

    Ingress网关并发请求的解决方案

    当 Ingress 网关面临高并发请求(如 QPS 超过 10万+)时,可能导致服务崩溃、响应延迟激增或资源耗尽。
    的头像 发表于 05-14 11:52 656次阅读

    TurMass™ 如何帮助解决 UWB 定位系统大规模终端标签并发通信冲突问题?

    在大容量定位终端数据并发场景中,现有通信技术因信号冲突、系统容量受限等问题,难以满足需求。TurMass™ 通信技术通过多信道设计、时隙划分、定位与通信一体化等创新方案,有效解决了
    的头像 发表于 03-17 14:38 784次阅读
    TurMass™ 如何帮助解决 UWB 定位系统大规模终端标签<b class='flag-5'>高</b><b class='flag-5'>并发</b>通信冲突问题?

    华为支付-平台类商户合单支付场景准备

    PayMercAuth对象内的入参排序拼接进行签名。请参考排序拼接和签名示例代码。 2. 3.构建合单订单信息参数orderStr并返回给客户端。业务接口请求示例代码可参考业务接口请求。 (二)拉起
    发表于 02-11 10:40

    华为支付-免密支付接入支付并签约场景

    参考排序拼接和签名示例代码。 2. 3.构建订单信息参数orderStr返回给客户端,业务接口请求示例代码可参考业务接口请求。 (二)拉起华为支付收银台(端侧开发) 使用orderStr调用
    发表于 02-10 09:55