0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

RabbitMQ中的发布订阅模型

科技绿洲 来源:Java技术指北 作者:Java技术指北 2023-09-25 14:30 次阅读

上一篇文章中,简单的介绍了一下RabbitMQ的work模型。这篇文章来学习一下RabbitMQ中的发布订阅模型。

发布订阅模型(Publish/Subscribe):简单的说就是队列里面的消息会被多个消费者同时接受到,消费者接收到的信息一致。

发布订阅模型适合于做模块之间的异步通信

图片
img

适用场景

  1. 发送并记录日志信息
  2. springcloud的config组件里面通知配置自动更新
  3. 缓存同步
  4. 微信订阅号

演示

生产者

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 发送消息到交换机
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, ("发布订阅模型的第 " + i + " 条消息").getBytes());
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}

消费者

// 消费者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_publish_1";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
// 消费者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_publish_2";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将队列绑定到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("队列2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}

测试

先启动2个消费者,再启动生产者

图片

图片

可以看出来消费者1和消费者2接收到的消息是一模一样的 ,每个消费者都收到了生产者发送的消息;

发布订阅模型,用到了一个新的东西-交换机,这里也解释一下相关方法的参数

// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 该方法的最多参数的重载方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
                                    BuiltinExchangeType type,
                                    boolean durable,
                                    boolean autoDelete,
                                    boolean internal,
                                    Map< String, Object > arguments) throws IOException;

/**
 *  param1:exchange,交换机名称
 *  param2:type,交换机类型;直接写 string效果一致;内置了4种交换机类型:
 *   direct(路由模式)、fanout(发布订阅模式)、
 *   topic(topic模式-模糊匹配)、headers(标头交换,由Headers的参数分配,不常用)
 *  param3:durable,是否持久化交换机   false:默认值,不持久化
 *  param4:autoDelete,没有消费者使用时,是否自动删除交换机   false:默认值,不删除
 *  param5:internal,是否内置,如果设置 为true,则表示是内置的交换器, 客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器的方式  false:默认值,允许外部直接访问
 *  param6:arguments,交换机的一些其他属性,默认值为 null
 */
// 将队列绑定到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
 *  param1:destination,目的地,队列的名字
 *  param2:source,资源,交换机的名字
 *  param3:routingKey,路由键(目前没有用到routingKey,填 "" 即可)
 */

小结

本文到这里就结束了,介绍了RabbitMQ通信模型中的发布订阅模型,适合于做模块之间的异步通信。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 交换机
    +关注

    关注

    19

    文章

    2438

    浏览量

    95668
  • 缓存
    +关注

    关注

    1

    文章

    220

    浏览量

    26444
  • 模型
    +关注

    关注

    1

    文章

    2704

    浏览量

    47697
  • springcloud
    +关注

    关注

    0

    文章

    17

    浏览量

    1492
  • rabbitmq
    +关注

    关注

    0

    文章

    17

    浏览量

    974
收藏 人收藏

    评论

    相关推荐

    RabbitMQ是什么

    在工作中经常会用到消息队列处理各种问题,今天指北君带领大家来学一个很常用到的技术-RabbitMQ;接下来还会有关于RabbitMQ的系列教程,对你有帮助的话记得关注哦~ RabbitMQ
    的头像 发表于 09-25 14:36 554次阅读
    <b class='flag-5'>RabbitMQ</b>是什么

    RabbitMq入门教程

    RabbitMQ是一个开源的,在AMQP基础上完整的,可复用的企业消息系统。
    的头像 发表于 12-04 11:10 272次阅读
    <b class='flag-5'>RabbitMq</b>入门教程

    MQTT协议介绍之一:发布/订阅

    ,MQTT被正式批准为OASIS标准。 MQTT 3.1.1现在是该协议的最新版本。发布/订阅模式发布/订阅模式(pub / sub)是传统客户端 - 服务器
    发表于 08-25 19:58

    Redis的发布订阅机制

    Redis之发布订阅机制
    发表于 06-11 13:21

    MQTT的通信模型及消息

     MQTT通信模型    MQTT协议是基于客户端-服务器模型,在协议主要有三种身份:发布者(Publisher)、服务器(Broker) 以及
    发表于 01-19 15:57

    NodeMCU实现订阅发布主题

    NodeMCU实现订阅发布主题。1、要点扫盲1.1 MQTT《MQTT协议--MQTT协议简介及原理》《MQTT协议--MQTT协议解析》1.2 OneNET《NodeMCU学习(十)--发送数据
    发表于 11-01 08:37

    请问esp32c3,ble mesh怎么向订阅的分组发布消息?

    发布消息,为什么vnd_models模型不可以.有没有更加简单的api,直接传订阅分组地址就可以发布消息的?
    发表于 02-13 06:47

    请问esp32c3 ble mesh怎么向订阅的分组发布消息?

    发布消息,为什么vnd_models模型不可以.有没有更加简单的api,直接传订阅分组地址就可以发布消息的?
    发表于 03-06 08:36

    基于SOA的发布/订阅系统设计

    企业电子商务的迅猛发展已经改变了分布式系统的规模,传统的基于请求/应答的点对 点、同步通信已不能满足大规模动态分布式应用环境。基于SOA 的发布/订阅系统模型
    发表于 07-08 08:42 21次下载

    RabbitMQ-CN RabbitMQ中文文档

    RabbitMQ_into_Chinese.zip
    发表于 04-19 10:51 0次下载
    <b class='flag-5'>RabbitMQ</b>-CN <b class='flag-5'>RabbitMQ</b>中文文档

    rabbitmq是什么?rabbitmq安装、原理、部署

    rabbitmq是什么? MQ的全称是Messagee Queue,因为消息的队列是队列,所以遵循FIFO 先进先出的原则是上下游传递信息的跨过程通信机制。 RabbitMQ是一套开源(MPL
    的头像 发表于 07-19 13:50 719次阅读

    RocketMQ和RabbitMQ的区别

    RocketMQ和RabbitMQ的区别: 架构设计:RocketMQ是基于主题(Topic)的发布/订阅模式,而RabbitMQ则是基于队列(Queue)的消息代理系统。 语言支持
    的头像 发表于 07-24 13:39 1.1w次阅读

    Topic 模型的使用

    RabbitMQ 是一个流行的开源消息队列软件,它提供了多种通信模型,例如发布/订阅模型、路由模型
    的头像 发表于 09-25 11:30 338次阅读

    RabbitMQ中的路由模型(direct)

    路由模型 RabbitMQ 提供了五种不同的通信模型,上一篇文章中,简单的介绍了一下RabbitMQ发布
    的头像 发表于 09-25 11:32 282次阅读

    redis和rabbitMQ的区别

    Redis和RabbitMQ之间的区别。 架构设计: Redis是一个内存存储系统,它将数据存储在内存中,以提供快速的读写访问。因此,Redis的存储能力受到内存大小的限制。它使用发布/订阅模式来处理消息队列,
    的头像 发表于 12-04 14:48 347次阅读