0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

RabbitMQ通信模型中的work模型

科技绿洲 来源:Java技术指北 作者:Java技术指北 2023-09-25 14:34 次阅读

上一篇文章中,简单的介绍了一下RabbitMQ,以及安装和hello world。

有的小伙伴留言说看不懂其中的方法参数,这里先解释一下几个基本的方法参数。

// 声明队列方法
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * param1:queue 队列的名字
 * param2:durable 是否持久化;比如现在发送到队列里面的消息,如果没有持久化,重启这个队列后数 据会丢失(false) true:重启之后数据依然在
 * param3:exclusive 是否排外(是否是当前连接的专属队列),排外的意思是:
 *            1:连接关闭之后 这个队列是否自动删除(false:不自动删除)
 *            2:是否允许其他通道来进行访问这个数据(false:不允许) 
 * param4:autoDelete 是否自动删除
 *            就是当最后一个连接断开的时候,是否自动删除这个队列(false:不删除)
 * param5:arguments(map) 声明队列的时候,附带的一些参数
 */
// 发送数据到队列
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一个队列消息...".getBytes());
/**
 * param1:exchange  交换机  没有就设置为 "" 值就可以了
 * param2:routingKey 路由的key  现在没有设置key,直接使用队列的名字
 * param3:BasicProperties 发送数据到队列的时候,是否要带一些参数。
 *      MessageProperties.PERSISTENT_TEXT_PLAIN表示没有带任何参数
 * param4:body 向队列中发送的消息数据
 */

Work模型

work模型称为工作队列或者竞争消费者模式,多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的削峰。

图片
img

演示

写个简单的测试:

  1. 生产者

    public class Producer {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes());
            }
            channel.close();
            connection.close();
        }
    
    }
    
  2. 消费者

    // 消费者1
    public class Consumer {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.basicQos(0, 1, false);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(System.currentTimeMillis() + "消费者1接收到信息:" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    
    }
    
    // 消费者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_work_1";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.basicQos(0, 1, false);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println(System.currentTimeMillis() + "消费者2接收到信息:" + new String(body));
                    channel.basicAck(envelope.getDeliveryTag(), false);
                    // 这里加了个延迟,表示处理业务时间
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    }
    
  3. 结果
    图片
    image-20221229210012145

图片
image-20221229210046184

可以看出来:100条消息,消费者之间是平分的,消费者1 几乎是瞬间完成,消费者2 则是慢慢吞吞的运行完毕,消费者1大量时间处于空闲状态,消费者2则一直忙碌。这显然是不适用于实际开发中。

我们需要遵从一个原则,就是 能者多劳 ,消费越快的人,消费的越多;

现在我们把消费者1和2的代码中 // channel.basicQos(0, 1, false); 这行代码取消注释,再次运行;

图片
image-20221229211317632

图片
image-20221229211335782

现在的结果就比较符合能者多劳,虽然你干的多,但是工资是一样的呀~

work模型的一个主要的方法是basicQos();这里也解释一下其参数:

// 设置限流机制
channel.basicQos(0, 1, false);
/**  
 *  param1: prefetchSize,消息本身的大小 如果设置为0  那么表示对消息本身的大小不限制
 *  param2: prefetchCount,告诉rabbitmq不要一次性给消费者推送大于N个消息
 *  param3:global,是否将上面的设置应用于整个通道,false表示只应用于当前消费者
 */

小结

本文到这里就结束了,主要介绍了RabbitMQ通信模型中的work模型,适用于限流、削峰等应用场景。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 数据
    +关注

    关注

    8

    文章

    6514

    浏览量

    87610
  • 通信
    +关注

    关注

    18

    文章

    5706

    浏览量

    134415
  • 模型
    +关注

    关注

    1

    文章

    2707

    浏览量

    47706
  • Work
    +关注

    关注

    0

    文章

    9

    浏览量

    8896
  • rabbitmq
    +关注

    关注

    0

    文章

    17

    浏览量

    974
收藏 人收藏

    评论

    相关推荐

    RabbitMQ中的发布订阅模型

    上一篇文章中,简单的介绍了一下RabbitMQwork模型。这篇文章来学习一下RabbitMQ中的发布订阅模型。 发布订阅
    的头像 发表于 09-25 14:30 324次阅读
    <b class='flag-5'>RabbitMQ</b>中的发布订阅<b class='flag-5'>模型</b>

    CAN总线通信协议模型概述 CAN总线通信模型作用

    参照 ISO/OSI 标准模型,CAN 总线的通信参考模型如图 9-1 所示。这 4 层结构的功能如下:• 物理层规定了节点的全部电气特性,在一个网络里,要实现不同节点间的数据传输,所有节点的物理层
    发表于 12-14 14:17

    MQTT的通信模型及消息

     MQTT通信模型    MQTT协议是基于客户端-服务器模型,在协议主要有三种身份:发布者(Publisher)、服务器(Broker) 以及订阅者(Subscriber)。 并且消息发布者可以
    发表于 01-19 15:57

    基于VxWorks实时操作系统的通信模型该怎样去设计?

    多任务实时操作系统VxWorks是什么?与传统通信机制相比,模块间通信模型有什么优势?基于VxWorks实时操作系统的通信模型该怎样去设计?
    发表于 04-26 06:25

    移动Agent位置透明通信模型的设计

    提出一种高效可靠的移动Agent通信模型――D-C通信模型,结合域名字解析器和移动Agent系统中的Communicator实现移动Agent之间的通信。通过引入一种基于全局的、与位置无关的命名方法
    发表于 04-16 08:53 26次下载

    数据网格中基于优化机制的通信模型

    针对基于多计算机机群构成的网格的大规模并行计算的需要,对多级分组通信模型的单一机群分组通信进行了研究。探讨了在单一机群内的主动节点、被动节点个数和各个计算节点
    发表于 06-25 13:52 12次下载

    基于VxWorks的通信模型设计

    本文提出了一种任务间的通信模型,将用于网络通信的UDP方式引进到任务间的通信中,使通信更加灵活和便于管理,改善了整个系统的性能。
    发表于 06-01 10:07 925次阅读
    基于VxWorks的<b class='flag-5'>通信模型</b>设计

    网络通信模型

    网络通信模型,在基础讲解的前提下,建立数学模型来分析。
    发表于 03-15 13:56 9次下载

    一种基于Kademlia的P2P语音通信模型

    一种基于Kademlia的P2P语音通信模型_陈立全
    发表于 01-07 16:52 3次下载

    基于Zigbee的无线智能输液通信模型设计杨艳

    基于Zigbee的无线智能输液通信模型设计_杨艳
    发表于 03-16 08:00 3次下载

    Topic 模型的使用

    RabbitMQ 是一个流行的开源消息队列软件,它提供了多种通信模型,例如发布/订阅模型、路由模型work
    的头像 发表于 09-25 11:30 339次阅读

    RabbitMQ中的路由模型(direct)

    路由模型 RabbitMQ 提供了五种不同的通信模型,上一篇文章中,简单的介绍了一下RabbitMQ的发布订阅模型模型。这篇文章来学习一下
    的头像 发表于 09-25 11:32 282次阅读

    什么是通信模型DDS

    来完成的,它相当于是ROS机器人系统中的神经网络。 通信模型 DDS的核心是通信,能够实现通信模型和软件框架非常多,这里我们列出常用的四种模型
    的头像 发表于 11-24 17:50 618次阅读

    机器人通信模型有哪些

    用到——那就是动作。从这个名字上就可以很好理解这个概念的含义,这种通信机制的目的就是便于对机器人某一完整行为的流程进行管理。 通信模型 举个例子,比如我们想让机器人转个圈,这肯定不是一下就可以完成的,机器人得一点一点旋
    的头像 发表于 11-27 17:05 236次阅读
    机器人<b class='flag-5'>通信模型</b>有哪些

    通信模型的多对多通信、异步通信和消息接口是什么

    者的数量并不是唯一的,可以称之为是多对多的通信模型。 因为话题是多对多的模型,发布控制指令的摇杆可以有一个,也可以有2个、3个,订阅控制指令的机器人可以有1个,也可以有2个、3个。 大家可以想象一下这个画面,似乎还是挺魔性的
    的头像 发表于 11-27 17:42 245次阅读
    <b class='flag-5'>通信模型</b>的多对多<b class='flag-5'>通信</b>、异步<b class='flag-5'>通信</b>和消息接口是什么