0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

RocketMQ生产者为什么需要负载均衡?

马哥Linux运维 来源:稀土掘金 2023-11-13 11:04 次阅读

RocketMQ生产者为什么需要负载均衡?

在RocketMQ中,队列是消息发送的基本单位。每个Topic下可能存在多个队列,因此一个生产者实例可以向不同的队列发送消息。当生产者发送消息时,如果不能均衡的将消息发送到不同的队列,那么会导致队列里的消息分布不均衡,这样最终会导致消息性能下降,因此生产者负载均衡机制也是非常重要的。

RocketMQ生产者原理分析

既然生产者负载均衡如此重要,我们看下是如何实现的。

我们通常使用如下方法发送消息:

构建消息
Message msg = new Message("TopicTest",
    "TagA",
    "OrderID188",
    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送消息    
SendResult sendResult = producer.send(msg);

RocketMQ发送消息的核心逻辑在DefaultMQProducerImpl类sendDefaultImpl。

9ad25470-81c0-11ee-939d-92fbcf53809c.jpg

在发送消息流程利里面有一行非常关键的逻辑,selectOneMessageQueue,看方法名称就可以知道其含义,选择一个消息队列。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {

        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

里面是通过策略类来实现的。

9aee8ece-81c0-11ee-939d-92fbcf53809c.jpg

策略类最终通过org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String) 实现。

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        //生产者第一次发消息
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            //非第一次,重试发消息的情况,
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int index = this.sendWhichQueue.incrementAndGet();
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                //重试的情况,不取上一个broker的队列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
第一次发消息选择队列核心逻辑在 org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()


//线程安全的index
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();


public MessageQueue selectOneMessageQueue() {
        //获取一个基础索引,每次自增1 这个全局存在TopicPublishInfo 每一个topic
        int index = this.sendWhichQueue.getAndIncrement();
        // 基础索引和 消息写队列大小 进行取模 用来实现轮训的算法
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
            
        return this.messageQueueList.get(pos);
    }

哈哈,这里就是生产者负载均衡轮询机制的核心逻辑了,使用到了ThreadLocal技术,sendWhichQueue为每个生产者线程维护一个自己的下标索引。

基础索引计算器,使用ThreadLocal技术针对不同的生产者线程第一次随机,后面递增,可以更加负载均衡。

public class ThreadLocalIndex {
    //关键技术
    private final ThreadLocal threadLocalIndex = new ThreadLocal();
    private final Random random = new Random();


    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            //第一次随机
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }
        //第二次索引位置开始自增1
        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;


        this.threadLocalIndex.set(index);
        return index;
    }
}

哈哈,有没有觉得这个实现非常巧妙了。不同的生产者线程都拥有自己的索引因子,分配队列更加均衡。

总结

本文分析了RocketMQ生产者底层的实现,设计地方有巧妙之处,值得我们学习,上面是发送非顺序消息的场景, 如果是顺序消息,我们作为使用者可以指定负载均衡策略。

编辑:黄飞

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 负载均衡
    +关注

    关注

    0

    文章

    97

    浏览量

    12186
  • 线程
    +关注

    关注

    0

    文章

    489

    浏览量

    19495
  • 消息队列
    +关注

    关注

    0

    文章

    31

    浏览量

    2920

原文标题:RocketMQ生产者负载均衡(轮询机制)核心原理

文章出处:【微信号:magedu-Linux,微信公众号:马哥Linux运维】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    每日一教labview视频教程【12.12】生产者和消费循环中

    内容提要:融合事件结构的生产者/消费循环多消费结构生产者和消费循环:[hide] [/hide]
    发表于 12-12 09:52

    labviEW一个生产者,多个消费问题

    大家好,我的程序的出发点是希望实现一个生产者,十六个消费模块的形式。即生产者循环中的事件结构有十六个处理分支,对应每一个分支,它产生一个“开始”元素入队列,相应的消费模块中元素出队
    发表于 04-05 16:42

    生产者与消费循环相关问题

    我是labview初学者,想请问一下各位大神,如果采集卡有缓存那还需要生产者与消费循环吗?
    发表于 10-21 14:05

    生产者与消费注册时间的应用

    生产者与消费注册时间的应用
    发表于 03-29 15:02

    生产者消费模式(事件结构)

    现小弟学习生产者消费的事件结构模式(用队列传递消息),在生产者中用事件结构,但是当我点击其中一个按钮响应事件后就无法再点击其它的按钮了,这是怎么搞的,请大侠贴出图片让小弟看看是什么情况。
    发表于 12-23 14:14

    生产者与消费循环结构当生产者停止发送数据为什么消费还要循环两次?

    各位大神: 今天用生产者与消费结构做一个程序,需要消费循环每执行一次计数+1。但是发现当生产者停止发送数据后,消费
    发表于 09-17 23:08

    生产者是怎么把要发送的信息传送到生产者模式里面的?

    谁有关于生产者与消费模式的讲解,就是生产者是怎么把要发送的信息传送到生产者模式里面的,就是谁可以讲解下,或是哪里有历程的视频讲解。先行谢过。
    发表于 10-28 20:57

    生产者消费的事件结构模式(用队列传递消息)

    现小弟学习生产者消费的事件结构模式(用队列传递消息),在生产者中用事件结构,但是当我点击其中一个按钮响应事件后,再点击其它的按钮了需要点两次,这是怎么搞的,请大侠贴出图片让小弟看看
    发表于 01-17 14:53

    生产者消费循环

    有木有大神知道生产者消费循环中队列的大小,默认值一般为多少?此外这个大小能否改变?
    发表于 11-28 19:59

    生产者与消费循环程序

    生产者与消费循环程序
    发表于 12-02 19:57

    生产者与消费

    生产者与消费
    发表于 12-22 20:46

    labview的生产者/消费模式

    生产者/消费模式以前在没有学习队列这块,看到生产者/消费模式的时候总认为很困难。今天仔细学习了队列后,回头再看着块时就不是多么难理解。这个编程模式使用到了队列的函数。首先,字面理解
    发表于 05-05 09:36

    生产者消费循环的问题

    如果将生产者消费循环中的一个生产者同时对应两个消费的时候,会有一些问题。如图所示,生产者循环将一个数据入列,然后下面是两个消费
    发表于 03-25 10:02

    基于生产者消费完整测试程序

    [hide][url=]基于生产者消费完整测试 ...[/url] [/hide]
    发表于 11-01 17:13

    RocketMQ协议是什么?RocketMQ协议特点

    分布式消息系统中生产者和消费者之间的高效可靠通信。它支持同步和异步消息传递模式,可以实现灵活和响应迅速的通信方式。 RocketMQ协议基于发布-订阅消息模式,生产者将消息发布到特定的主题,消费者订阅这些主题以接收消息。该协议通
    的头像 发表于 01-03 16:11 474次阅读