0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在Python中使用MQTT

瑞科慧联(RAK) 2022-12-22 10:41 次阅读
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

Python 是一种跨平台的计算机程序设计语言,是ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。

MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。

本文主要介绍如何在 Python 项目中使用paho-mqtt客户端库 ,实现客户端与MQTT服务器的连接、订阅、取消订阅、收发消息等功能。

一、项目准备

本项目使用 Python 3.10进行开发测试。

用户可用以下命令来确认 Python的版本:

python3 --version

Python 3.10.9

测试设备:

瑞科慧联(RAK)网关RAK7268 V2、带温湿度传感器的数据采集器Sensor Hub

二、选择 MQTT 客户端库

paho-mqtt是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。

三、Pip 安装 Paho MQTT 客户端

Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、连接 MQTT 服务器

本文将使用瑞科慧联LoRaWAN®网关提供的内置 MQTT服务,该服务基于 Mosquitto的开源消息代理。服务器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、导入 Paho MQTT客户端

from paho.mqtt import client as mqtt

3、设置 MQTT Broker 连接参数

设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Pythonrandom.randint函数随机生成 MQTT 客户端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、编写 MQTT 连接函数

编写连接回调函数 on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据rc来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到broker.emqx.io

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

5、发布消息

定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端publish函数向/python/mqtt主题发送消息。

ddefon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

6、订阅消息

编写消息回调函数 on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代码

消息订阅代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦订阅到消息, 回调此方法"""

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解码前的data为: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解码后的data为: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("温度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("湿度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("湿度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息发布代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要发布的消息内容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

测试

消息发布

运行 MQTT消息发布代码,将看到客户端连接成功,并且成功将消息发布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息订阅

通过瑞科慧联带温湿度传感器的 Sensor hub进行数据传输,订阅并解析数据结果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、总结

至此,我们完成了使用paho-mqtt客户端连接到LoRaWAN®网关内置 MQTT服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。

与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 物联网
    +关注

    关注

    2939

    文章

    47317

    浏览量

    407815
  • python
    +关注

    关注

    57

    文章

    4857

    浏览量

    89577
  • MQTT
    +关注

    关注

    5

    文章

    720

    浏览量

    24783
收藏 人收藏
加入交流群
微信小助手二维码

扫码添加小助手

加入工程师交流群

    评论

    相关推荐
    热点推荐

    何在AMD Vitis Unified IDE中使用系统设备树

    您将在这篇博客中了解系统设备树 (SDT) 以及如何在 AMD Vitis Unified IDE 中使用 SDT 维护来自 XSA 的硬件元数据。本文还讲述了如何对 SDT 进行操作,以便在 Vitis Unified IDE 中实现更灵活的使用场景。
    的头像 发表于 11-18 11:13 2809次阅读
    如<b class='flag-5'>何在</b>AMD Vitis Unified IDE<b class='flag-5'>中使</b>用系统设备树

    物联网MQTT网关是什么

    物联网MQTT网关是一种采用MQTT物联网协议的智能设备或软件组件,其核心功能是连接不同通信协议的物联网设备与消息代理服务器,实现设备间的数据交换与集中管理,同时支持边缘计算、安全防护和协议转换
    的头像 发表于 08-29 15:24 649次阅读

    请问如何在 Keil μVision 或 IAR EWARM 中使用观察点进行调试?

    何在 Keil μVision 或 IAR EWARM 中使用观察点进行调试?
    发表于 08-20 06:29

    第二十三章 W55MH32 MQTT_OneNET示例

    本文讲解了如何在 W55MH32 芯片上实现 MQTT 协议并连接 OneNET 平台,通过实战例程展示了从准备工作、连接配置到消息订阅、发布及接收处理的完整过程。文章详细介绍了 MQTT 协议
    的头像 发表于 07-24 14:59 695次阅读
    第二十三章 W55MH32 <b class='flag-5'>MQTT</b>_OneNET示例

    精通 MQTT:消息队列遥测传输指南!

    ,解释了其关键组件,并演示了如何使用Python实现MQTT客户端。MQTT代理MQTT系统的核心是代理,它负责管理客户端之间的消息交换。MQTT
    的头像 发表于 06-16 16:56 828次阅读
    精通 <b class='flag-5'>MQTT</b>:消息队列遥测传输指南!

    何在MQTT中发布和订阅实体

    MQTT中发布和订阅实体(主题)是MQTT通信的核心操作,下面将详细介绍其原理、步骤以及示例代码,帮助你全面理解这一过程。 一、MQTT发布与订阅的基本概念 发布(Publish):客户端将
    的头像 发表于 05-20 17:21 1014次阅读

    ​如何在虚拟环境中使Python,提升你的开发体验~

    RaspberryPiOS预装了Python,你需要使用其虚拟环境来安装包。今天出版的最新一期《TheMagPi》杂志刊登了我们文档负责人NateContino撰写的一篇实用教程,帮助你入门
    的头像 发表于 03-25 09:34 628次阅读
    ​如<b class='flag-5'>何在</b>虚拟环境<b class='flag-5'>中使</b>用 <b class='flag-5'>Python</b>,提升你的开发体验~

    零基础入门:如何在树莓派上编写和运行Python程序?

    在这篇文章中,我将为你简要介绍Python程序是什么、Python程序可以用来做什么,以及如何在RaspberryPi上编写和运行一个简单的Python程序。什么是
    的头像 发表于 03-25 09:27 1523次阅读
    零基础入门:如<b class='flag-5'>何在</b>树莓派上编写和运行<b class='flag-5'>Python</b>程序?

    MQTT物联网平台有哪些?有哪些功能?

    MQTT(Message Queuing Telemetry Transport)是一种基于客户端-服务器架构的发布/订阅模式的消息传输协议,它广泛应用于机器与机器的通信(M2M)以及物联网环境
    的头像 发表于 03-15 14:23 1218次阅读
    <b class='flag-5'>MQTT</b>物联网平台有哪些?有哪些功能?

    创建了用于OpenVINO™推理的自定义C++和Python代码,从C++代码中获得的结果与Python代码不同是为什么?

    创建了用于OpenVINO™推理的自定义 C++ 和 Python* 代码。 在两个推理过程中使用相同的图像和模型。 从 C++ 代码中获得的结果与 Python* 代码不同。
    发表于 03-06 06:22

    何在USB视频类(UVC)框架中使用EZ-USB™FX3实现图像传感器接口USB视频类(UVC)

    电子发烧友网站提供《如何在USB视频类(UVC)框架中使用EZ-USB™FX3实现图像传感器接口USB视频类(UVC).pdf》资料免费下载
    发表于 02-28 17:36 2次下载

    何在MATLAB中使用DeepSeek模型

    在 DeepSeek-R1(https://github.com/deepseek-ai/DeepSeek-R1) AI 模型横空出世后,人们几乎就立马开始询问如何在 MATLAB 中使用这些模型
    的头像 发表于 02-13 09:20 4061次阅读
    如<b class='flag-5'>何在</b>MATLAB<b class='flag-5'>中使</b>用DeepSeek模型

    MQTT测试程序上机实验

    mqtt_test、paho.mqtt.c.tar.bz2放到Ubuntu上同一个目录下。
    的头像 发表于 02-11 13:35 1085次阅读
    <b class='flag-5'>MQTT</b>测试程序上机实验

    使用Python实现xgboost教程

    使用Python实现XGBoost模型通常涉及以下几个步骤:数据准备、模型训练、模型评估和模型预测。以下是一个详细的教程,指导你如何在Python中使用XGBoost。 1. 安装XG
    的头像 发表于 01-19 11:21 2213次阅读

    何在Windows中使用MTP协议

    、图片等)的通信协议,它被广泛用于Android设备。以下是如何在Windows中使用MTP协议的详细步骤: 1. 确保设备支持MTP 首先,你需要确认你的设备支持MTP协议。大多数现代Android
    的头像 发表于 01-03 10:26 4409次阅读