0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

Topic 模型的使用

科技绿洲 来源:了不起 作者:了不起 2023-09-25 11:30 次阅读

RabbitMQ 是一个流行的开源消息队列软件,它提供了多种通信模型,例如发布/订阅模型、路由模型、work模型等。在前面的文章中我们已经介绍了前四种模型,本文将会学习 RabbitMQ 中的 Topic 模型;接下来还会有关于RabbitMQ的系列教程,对你有帮助的话记得关注哦~

Topic 模型

Topic 模型是 RabbitMQ 的高级模型之一,Topic 模型使用了通配符的概念,可以匹配更灵活的路由规则。topic模式相当于是对路由模式的一个升级,topic模式主要就是在匹配的规则上可以实现模糊匹配。

在 Topic 模型中,生产者将消息发送到交换机,交换机根据消息的 routing key 将消息转发到对应的队列中。与 Direct 模型不同的是,Topic 模型中 routing key 支持通配符匹配,其中 ' ' 可以匹配一个单词,'#' 可以匹配多个单词。例如,"order. " 可以匹配 "order.create","order.delete" 等消息,而 "order.#" 可以匹配 "order.create.one","order.delete.two" 等消息。

适用场景

Topic 模型适用于需要灵活的消息路由规则的场景,例如:

  1. 新闻网站订阅分类消息;
  2. 电商网站订阅商品分类消息;
  3. 金融机构订阅股票市场消息等。

演示

  1. 生产者

    // 生产者
    public class Producer {
        private static final String EXCHANGE_NAME = "exchange_topic_1";
        private static final String EXCHANGE_ROUTING_KEY1 = "topic.km";
        private static final String EXCHANGE_ROUTING_KEY2 = "topic.km.001";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            for (int i = 0; i < 100; i++) {
                // topic在路由模型的基础上,只有路由的key发生改变,其余的都不变
                if (i % 2 == 0) {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型发送的第 " + i + " 条信息").getBytes());
                } else {
                    channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, MessageProperties.PERSISTENT_TEXT_PLAIN, ("topic模型发送的第 " + i + " 条信息").getBytes());
                }
            }
            channel.close();
            connection.close();
        }
    }
    
  2. 消费者

    // 消费者1
    public class Consumer1 {
        private static final String QUEUE_NAME = "queue_topic_1";
        private static final String EXCHANGE_NAME = "exchange_topic_1";
        private static final String EXCHANGE_ROUTING_KEY = "topic.*";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者1接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    
    // 消费者2
    public class Consumer2 {
        private static final String QUEUE_NAME = "queue_topic_2";
        private static final String EXCHANGE_NAME = "exchange_topic_1";
        private static final String EXCHANGE_ROUTING_KEY = "topic.#";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = ConnectionUtils.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消费者2接收到的消息是:" + new String(body));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    

测试

先启动2个消费者,再启动生产者

消费者1订阅的是 "order.*" 的消息,消费者2订阅的是 "order.#" 的消息,可以得到以下结果:

消费者1接收到的消息是:"Topic 模型发送的偶数条消息"

消费者2接收到的消息是:"Topic 模型发送的全部消息"

小结

本文介绍了 RabbitMQ 通信模型中的 Topic 模型的使用,通过交换机和 routing key 实现更灵活的消息路由。在实际使用过程中,需要注意以下几点:

  1. 路由键的格式应该是多个单词组成,用 '.' 分隔;
  2. '#' 匹配多个单词,'*' 匹配一个单词;
  3. 一个队列可以绑定多个 routing key;
  4. 如果交换机没有匹配到任何一个队列,则会抛弃该消息。
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 交换机
    +关注

    关注

    19

    文章

    2438

    浏览量

    95668
  • 模型
    +关注

    关注

    1

    文章

    2704

    浏览量

    47697
  • rabbitmq
    +关注

    关注

    0

    文章

    17

    浏览量

    974
收藏 人收藏

    评论

    相关推荐

    LabVIEW编写的简单3D模型预览工具

    ` 本帖最后由 ewb_topic 于 2018-8-7 16:29 编辑 最低兼容LabVIEW 8.6版本,支持ase、stl、wrl三种3D模型文件。小工具一个Pass:tlatlatla`
    发表于 08-06 10:55

    IoT物联网平台通信用Topic梳理

    简介: 基于设备实践梳理出来的Topic总结1.前言IoT物联网平台基于MQTT协议的Pub/Sub通信,那么topic和payload设计就很重要。我们可以定义出不同topic来处理不同业务场景
    发表于 11-07 14:08

    人机交互Topic推荐-AMiner 精选资料分享

    数据和实验平台。必读论文:https://www.aminer.cn/topic人机交互、人机互动(英文:...
    发表于 09-10 07:16

    Topic通讯小结

    ROS实操入门系列(三)1、Topic通讯小结2、编写ros驱动实现电机控制和编码器读取Topic通讯小结控制板驱动代码实现及工具调试完整代码Topic通讯小结Topic通讯特点通讯
    发表于 09-16 09:11

    想问一下有沒有MQTT SUBSCRIBE topic之後接收MESSAGE的例子?

    想問一下有沒有MQTT SUBSCRIBE topic 之後接收MESSAGE的例子?我只找到Code: Select all case MQTT_EVENT_DATA: {ESP_LOGI(TAG
    发表于 02-17 09:04

    请问一下MQTT_topic_t 结构在哪里呢?

    ;MQTT_EVENT_CONNECTED"); mqtt_topic_t *mqtt_topic = get_mqtt_topic(); msg_id = esp_mqtt_client_publish(client
    发表于 03-02 06:30

    ESP32将数据发到物联网平台topic后怎么获取数据?

    ESP32将数据发到物联网平台topic后怎么获取数据?还是说可以发到物联网平台的其他地方?谢谢
    发表于 03-13 08:13

    集成电路工艺专题实验 Topic Experiments o

    集成电路工艺专题实验 Topic Experiments of Integrate Circuit Techniques 实验一 硅单晶(或多晶)薄膜的沉积………………………………………………………3实验二 硅单晶外延层的质
    发表于 03-06 14:06 5次下载

    TEP(Topic Embedded Products) 产品携手基于Zynq的开发工具和系统模块加速嵌入式设计

    Dyplo 开发系统可以让设计工程师很容易使FPGA进行硬件加速 作者:Steve Leibson, 赛灵思战略营销与业务规划总监 荷兰的Topic 公司帮助客户开发了不同应用领域的嵌入式产品,包括
    发表于 02-08 08:13 256次阅读

    基于ESCM的动态主题情感混合模型

    针对现有模型无法进行微博主题情感演化分析的问题,提出一种基于主题情感混合模型( TSCM)和情感周期性理论的主题情感演化模型动态主题情感混合模型( DTSCM)。DTSCM通过捕获不同
    发表于 01-02 10:38 0次下载

    Point Topic宣布:全球固网宽带用户已经突破10亿

    近日,市场研究公司Point Topic宣布,全球固网宽带用户已经突破10亿,其中80%是光纤连接用户。基于ADSL的宽带用户数同比去年同期减少了8%。
    发表于 10-31 09:01 1241次阅读

    Topic医疗开发平台的最新进展

    Xilinx医疗产品营销经理Kamran Khan和Topic Embedded Products首席执行官Rieny Rijnen分享了由Xilinx Zynq All Programmable SoC实现的Topic医疗开发平台的最新进展,共同解决了这一问题。
    的头像 发表于 11-22 06:51 3118次阅读

    如何在LoRaWAN网关的网页上设置MQTT的订阅的Topic

    当我们将LoRaWAN网关设置为NS模式时可参见文章《将LoRaWAN网关设置为NS模式的操作方法》,我们就可以在LoRaWAN网关的网页上进行操作,以设置该网关的MQTT订阅的topic,从而
    发表于 06-12 17:29 1339次阅读

    面向微博热点话题的改进BBTM模型

    针对目前基于主题模型的微博短文本热点话题发现存在特征稀疏、高维度以及需要人工指定主题数目等问题,提出一种基于改进突发词对主题模型( bursty biterm topic model,BBTM
    发表于 06-09 14:20 3次下载

    如何在LoRaWAN网关上设置MQTT的Topic

    当我们将LoRaWAN网关设置为NS模式时,可参见文章如何将LoRaWAN网关设置为NS模式,我们就可以在LoRaWAN网关的网页上进行操作,以设置该网关的MQTT订阅的topic,从而为接下来采用MQTT订阅获取到LoRa节点数据建立基础。
    发表于 08-20 10:10 1044次阅读