0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何解决回到MQ的消息顺序问题

数据分析与开发 来源:微观技术 作者:微观技术 2021-11-18 16:07 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

为了系统间解耦,我们通常会引入MQ框架,大家各司其职共同完成上下游的业务流程。

大致过程:

生产端,创建一条消息,通过网络发送到MQ Server

MQ将 消息存储在topic 的一个分区里

消费端,从分区中拉取消息,消费处理

但现实往往不一样!MQ 架构设计要满足高并发、高性能、高可用等指标

单分区,达不到我们的吞吐量要求,我们考虑采用多分区架构设计,正所谓 ”三个臭皮匠赛过一个诸葛亮“,多分区可以有效分摊全局压力,提升整体系统性能。

两台 MQ机器,组成一个集群,原先一个分区存储6条消息,现在分摊到两个分区,每个分区各存储3条消息,性能比上面那个提升一倍。

貌似可以满足我们的需求,但任何事情都有两面性!

我们看看下面业务场景:

一个用户在电商网站上下订单到交易完成,中间会经历一系列动作,订单的状态也会随之变化,一个订单会产生多条MQ消息,下单、付款、发货、买家确认收货,消费端需要严格按照业务状态机的顺序处理,否则,就会出现业务问题。

我们发现,消息带上了状态,不再是一个个独立的个体,有了上下文依赖关系!

对于这个问题,突然想到HTTP协议,其本身也是无状态的,也就是说前后两次请求没有关联,但有些业务功能有登录要求,那怎么解决?

引入Cookie机制,每次请求客户端额外传输一些数据,来达到上下文关联。

回到MQ的消息顺序问题,我们要如何解决?

答案:各退一步,保证局部有序。

比如上面的电商例子,只要保证一个订单的多条状态消息在同一个分区,便可以满足业务需求,这个方案可以覆盖大部分的业务场景。

这里面只需要有一个路由策略组件,由它决定消息该放到哪个分区中!

考虑到市面MQ开源框架很多,常见的如:Kafka、Pulsar、RabbitMQ、RocketMQ 等,API方法略有区别,但设计思路是相通的。

接下来,我们以 RocketMQ 为例:

生产端提供了一个接口 MessageQueueSelector

public interface MessageQueueSelector {

MessageQueue select(final List《MessageQueue》 mqs, final Message msg, final Object arg);

}

接口内定义一个select方法,具体参数含义:

mqs:该Topic下所有的队列分片

msg:待发送的消息

arg:发送消息时传递的参数

关于MessageQueueSelector接口,RocketMQ 框架提供了三个默认实现类:

1、SelectMessageQueueByHash:

arg参数的hashcode的绝对值,然后对mqs.size()取余,得到目标队列在mqs的下标

2、SelectMessageQueueByRandom:

对mqs.size()值取随机数作为目标队列在mqs的下标

3、SelectMessageQueueByMachineRoom

返回null

特别注意:

虽然保证了单个分片的消息有序,但每个分片的消费者只能是单线程处理,因为多线程无法控制消费顺序。这个可能会损失一些性能。

这里又引出另一个问题,如何保证一个队列只能有一个消费端呢?

1、

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance

0e1ca392-4837-11ec-b939-dac502259ad0.jpg

遍历一个topic下所有的MessageQueue

isOrder && !this.lock(mq) 尝试对它加锁,确保一个MessageQueue只能被一个消费者处理

2、将PullRequest对象放入PullMessageService的pullRequestQueue队列中

public void dispatchPullRequest(List《PullRequest》 pullRequestList) {

for (PullRequest pullRequest : pullRequestList) {

this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);

log.info(“doRebalance, {}, add a new pull request {}”, consumerGroup, pullRequest);

}

}

3、org.apache.rocketmq.client.impl.consumer.PullMessageService#run

0e55332e-4837-11ec-b939-dac502259ad0.jpg

PullMessageService 是一个Runnable线程任务

无限循环,从队列中拉取、处理消息

另一个问题,如何保证一个队列,只有一个线程在处理消息呢?

1、 DefaultMQPushConsumerImpl#pullMessage

0e9b4c24-4837-11ec-b939-dac502259ad0.jpg

ConsumeMessageService 中有两个实现类,因为我们有消费顺序要求,会选择ConsumeMessageOrderlyService来处理业务

2、 ConsumeMessageOrderlyService.ConsumeRequest

0ed789f0-4837-11ec-b939-dac502259ad0.jpg

从ConcurrentMap中获取messageQueue对应的锁对象

通过 synchronized 关键字,线程来抢占锁,互斥关系,从而保证了一个MessageQueue只能有一个线程并发处理

继续往下看,如果扩容了怎么办?

原来有6个分区,order_id_1的消息在MessageQueue6 中,此时扩容一倍,现在12个分区,order_id_1订单后面产生的消息可能路由到了MessageQueue8 中,同一个订单的消息分布在两个分区中,无法保证顺序。

我们能做的是,先将存量消息处理完,再扩容。如果是在线业务,可以搞个临时topic,先将消息暂时堆积,待扩容后,按新的路由规则重新发送。

顺序消息,如果某条失败了怎么办?会不会一直阻塞?

1、如果失败,不会提交消费位移,系统会自动重试(有重试上限),此时会阻塞后面的消息消费,直到这条消息处理完

2、如果这个消息达到重试上限,依然失败,会进入死信队列,可以继续处理后面的消息

责任编辑:haq

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    7315

    浏览量

    93988
  • 框架
    +关注

    关注

    0

    文章

    404

    浏览量

    18319

原文标题:面试官问: 如何保证 MQ 消息是有序的?

文章出处:【微信号:DBDevs,微信公众号:数据分析与开发】欢迎添加关注!文章转载请注明出处。

收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    广和通小尺寸低功耗Cat.M模组MQ771-GL实现送样,专注资产追踪应用

    11月,广和通宣布Cat.M模组MQ771-GL正式进入工程送样阶段。MQ771-GL凭借极致尺寸、超低功耗、全球频段覆盖和稳定网络兼容性四大核心优势,为资产追踪等物联网场景提供高性价比的连接
    的头像 发表于 11-20 10:59 130次阅读
    广和通小尺寸低功耗Cat.M模组<b class='flag-5'>MQ</b>771-GL实现送样,专注资产追踪应用

    MPN12AD160-MQ:替代ADI/TI/TOREX电源芯片

    MPN12AD160-MQ是Cyntec(乾坤)推出的大电流微型POL DC-DC电源模块,专为 AI/GPU 板卡等高功耗、高密度场景打造,集成两颗 DC-DC 芯片,采用双相 60A 并联
    发表于 11-20 10:09

    rt_mq_recv函数中timeout作用是什么?

    请参看附件和图片, rt_mq_recv() 函数while (mq-&gt;entry == 0)代码块中会去检查timeout值,重新计算timeout,但timeout重新计算
    发表于 09-29 06:27

    rt_msgqueue rt_mq_recv()接收卡死的原因?

    在使用消息队列rt_mq_recv时候卡死 static struct rt_messagequeue TX_CanMsg_mq; __attribute__((aligned (4
    发表于 09-10 07:47

    Texas Instruments LMR43606MQ3EVM-2M评估模块数据手册

    Texas Instruments LMR43606MQ3EVM-2M评估模块是一个经过全面组装和测试的电路,用于评估LMR43606-Q1降压型电压转换器。该模块的输入电压范围为3.5V至36V
    的头像 发表于 08-02 09:52 916次阅读
    Texas Instruments LMR43606<b class='flag-5'>MQ</b>3EVM-2M评估模块数据手册

    单片机实例项目:MQ系列模块资料

    单片机实例项目:MQ系列模块资料,推荐下载!
    发表于 06-03 21:11

    设备与电源滤波器连接时,接线顺序有的基本要求

    电源滤波器是现代电子设备的必备组件,连接时应遵循接线顺序,确保输入、输出端的区分和正确连接地线。接线前需确保设备和电源滤波器完全断电、外观完好无损,工具准备充分。接线顺序包括输入端与输出端的区分、输入端接线顺序和输出端接线
    的头像 发表于 04-08 17:44 1256次阅读
    设备与电源滤波器连接时,接线<b class='flag-5'>顺序</b>有的基本要求

    广和通发布小尺寸Cat.M模组MQ780-GL

    近日,在2025世界移动通信大会(MWC Barcelona 2025)期间,广和通发布基于高通E51 4G调制解调器及射频方案的小尺寸Cat.M模组MQ780-GL。MQ780-GL凭借极致尺寸
    的头像 发表于 03-12 09:13 953次阅读

    网线顺序怎么排

    网线的顺序排列主要遵循TIA/EIA-568A和TIA/EIA-568B这两种主流标准,其中TIA/EIA-568B标准更为常用。以下是关于网线顺序排列的详细解释: 一、TIA/EIA-568B标准
    的头像 发表于 03-07 10:36 7936次阅读

    DLP3010投影乱序如何解决?

    是对的,所以相机捕获正常,由于相机捕获的东西是运动的,从运动轨迹可以判定相机采集的顺序是对的。 所以请教一下该如何解决?
    发表于 02-26 07:05

    网线的顺序颜色排位

    网线的顺序颜色排位通常遵循TIA/EIA 568B或TIA/EIA 568A标准,以下是这两种标准的详细颜色排位: TIA/EIA 568B标准 这是最常用的网线颜色排位标准,具体顺序如下: 橙白线
    的头像 发表于 02-20 09:46 6981次阅读

    网线水晶头排位顺序

    网线水晶头的排位遵循国际标准TIA/EIA 568A和TIA/EIA 568B,这两种标准规定了网线中8根线芯的排列顺序。 TIA/EIA 568A标准 从左到右的排列顺序为: 白绿 绿 白橙 蓝
    的头像 发表于 02-14 10:04 2264次阅读

    PSMN2RO-30YLE n沟道30V 2 mQ逻辑电平MOSFET规格书

    电子发烧友网站提供《PSMN2RO-30YLE n沟道30V 2 mQ逻辑电平MOSFET规格书.pdf》资料免费下载
    发表于 01-23 16:33 0次下载
    PSMN2RO-30YLE n沟道30V 2 <b class='flag-5'>mQ</b>逻辑电平MOSFET规格书

    超6类双绞线接线顺序

    超6类双绞线的接线顺序通常采用T568B标准,具体顺序如下: 白橙(橙白) 橙 白绿(绿白) 蓝 白蓝(蓝白) 绿 白棕(棕白) 棕 在接线时,需要注意以下几点: 剥线长度:使用剥线钳轻轻剥开
    的头像 发表于 01-17 09:53 3882次阅读

    【CW32模块使用】MQ-4甲烷检测传感器

    MQ-4气体传感器所使用的气敏材料是在清洁空气中电导率较低的二氧化锡(SnO2)。当传感器所处环境中存在燃料气体时,传感器的电导率随空气中可燃气体浓度的增加而增大。使用简单的电路即可将电导率的变化
    的头像 发表于 12-30 11:36 1528次阅读
    【CW32模块使用】<b class='flag-5'>MQ</b>-4甲烷检测传感器