0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

如何在Python中使用MQTT

瑞科慧联(RAK) 2022-12-22 10:41 次阅读

Python 是一种跨平台的计算机程序设计语言,是ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。

MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。

本文主要介绍如何在 Python 项目中使用paho-mqtt客户端库 ,实现客户端与MQTT服务器的连接、订阅、取消订阅、收发消息等功能。

一、项目准备

本项目使用 Python 3.10进行开发测试。

用户可用以下命令来确认 Python的版本:

python3 --version

Python 3.10.9

测试设备:

瑞科慧联(RAK)网关RAK7268 V2、带温湿度传感器的数据采集器Sensor Hub

二、选择 MQTT 客户端库

paho-mqtt是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。

三、Pip 安装 Paho MQTT 客户端

Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、连接 MQTT 服务器

本文将使用瑞科慧联LoRaWAN®网关提供的内置 MQTT服务,该服务基于 Mosquitto的开源消息代理。服务器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、导入 Paho MQTT客户端

from paho.mqtt import client as mqtt

3、设置 MQTT Broker 连接参数

设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Pythonrandom.randint函数随机生成 MQTT 客户端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、编写 MQTT 连接函数

编写连接回调函数 on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据rc来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到broker.emqx.io

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

5、发布消息

定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端publish函数向/python/mqtt主题发送消息。

ddefon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

6、订阅消息

编写消息回调函数 on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代码

消息订阅代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦订阅到消息, 回调此方法"""

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主题"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解码前的data为: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解码后的data为: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("温度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("湿度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("湿度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    mqttClient.on_message=on_message # 返回订阅消息回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_subscribe():

    """订阅主题:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息发布代码

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

网关通过mqtt发出数据

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦连接成功, 回调此方法"""

    rc_status= ["连接成功","协议版本错误","无效的客户端标识","服务器无法使用","用户名或密码错误","无授权"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """连接MQTT服务器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回连接状态的回调函数

    MQTT_HOST=MQTT_SERVER_IP # MQTT服务器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服务器账号密码

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 启用线程连接

    returnmqttClient

defon_publish():

    # 发布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 发布的主题,订阅时需要使用这个主题才能订阅此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要发布的消息内容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}条消息发送成功'.format(msg_count))

        else:

            print('第{}条消息发送失败'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

测试

消息发布

运行 MQTT消息发布代码,将看到客户端连接成功,并且成功将消息发布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息订阅

通过瑞科慧联带温湿度传感器的 Sensor hub进行数据传输,订阅并解析数据结果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、总结

至此,我们完成了使用paho-mqtt客户端连接到LoRaWAN®网关内置 MQTT服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。

与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 物联网
    +关注

    关注

    2870

    文章

    41639

    浏览量

    358410
  • python
    +关注

    关注

    51

    文章

    4677

    浏览量

    83473
  • MQTT
    +关注

    关注

    5

    文章

    538

    浏览量

    21978
收藏 人收藏

    评论

    相关推荐

    Raspberry Pi树莓派使用Python实现MQTT通信设计

    这次的例子,主要讲述如何基于PYTHONMQTT 客户端的使用方法
    的头像 发表于 03-14 11:45 333次阅读
    Raspberry Pi树莓派使用<b class='flag-5'>Python</b>实现<b class='flag-5'>MQTT</b>通信设计

    何在测试中使用ChatGPT

    Dimitar Panayotov 在 2023 年 QA Challenge Accepted 大会 上分享了他如何在测试中使用 ChatGPT。
    的头像 发表于 02-20 13:57 377次阅读

    何在DAVE IDE中使用XMC7200?

    能否在 DAVE IDE 中为 XMC 7200 EVK KIT 构建应用程序。我尝试打开一个项目但它最多只能显示 XMC48000。如何在 DAVE IDE 中使用 XMC7200 请帮忙。
    发表于 01-26 06:32

    何在Linux中使用htop命令

    本文介绍如何在 Linux 中使用 htop 命令。
    的头像 发表于 12-04 14:45 554次阅读
    如<b class='flag-5'>何在</b>Linux<b class='flag-5'>中使</b>用htop命令

    何在Python中使用Scapy进行抓包操作

    文章将介绍如何使用 Python 来进行简单的抓包操作。 2. Python 中的抓包库 在 Python 中,有很多优秀的抓包库,例如 Scapy、dpkt、pcapy 等等。在本文中,我们将以
    的头像 发表于 11-01 14:47 2069次阅读

    何在Windows下使用 Supervisor 重新拉起崩溃的Python程序

    我们用Python定时跑一些自动化程序的时候会出现程序崩溃的情况。此时如果你本人不在电脑面前,或者没有留意到程序的崩溃,没有及时重新拉起程序,会造成或大或小的损失。 本文将教你如何在 Windows
    的头像 发表于 10-21 11:23 1173次阅读
    如<b class='flag-5'>何在</b>Windows下使用 Supervisor 重新拉起崩溃的<b class='flag-5'>Python</b>程序

    请问如何在单片机里实现MQTT协议?

    何在单片机里实现MQTT协议?
    发表于 10-19 07:30

    SE5如何在Python中使用SAIL?

    SE5已经预装在/system/lib下,只需要设置好环境变量,然后就可以在python中使用SAIL了: # SE5设置环境变量export PATH=$PATH:/system
    发表于 09-18 06:22

    何在biquads 1.0版中使用CMSIS-DSP的Python包装

    本指南提供了一个简单的示例,说明如何使用CMSIS-DSP Python包装器以及如何用Python语言表示CMSIS-DSPAPI。 嵌入式系统上的信号处理算法通常通过使用在科学计算环境中开发
    发表于 08-28 06:30

    何在Vitis HLS GUI中使用库函数?

    Vitis™ HLS 2023.1 支持新的 L1 库向导,本文将讲解如何下载 L1 库、查看所有可用功能以及如何在 Vitis HLS GUI 中使用库函数。
    的头像 发表于 08-16 10:26 623次阅读
    如<b class='flag-5'>何在</b>Vitis HLS GUI<b class='flag-5'>中使</b>用库函数?

    何在Arduino中使用APDS9960手势传感器

    电子发烧友网站提供《如何在Arduino中使用APDS9960手势传感器.zip》资料免费下载
    发表于 06-28 16:01 0次下载
    如<b class='flag-5'>何在</b>Arduino<b class='flag-5'>中使</b>用APDS9960手势传感器

    何在Arduino中使用20x4 I2C字符LCD显示器

    电子发烧友网站提供《如何在Arduino中使用20x4 I2C字符LCD显示器.zip》资料免费下载
    发表于 06-28 15:57 0次下载
    如<b class='flag-5'>何在</b>Arduino<b class='flag-5'>中使</b>用20x4 I2C字符LCD显示器

    何在 Python 中安装和使用顶级聚类算法

    有许多聚类算法可供选择,对于所有情况,没有单一的最佳聚类算法。相反,最好探索一系列聚类算法以及每种算法的不同配置。在本教程中,你将发现如何在 python 中安装和使用顶级聚类算法。
    的头像 发表于 05-22 09:13 350次阅读
    如<b class='flag-5'>何在</b> <b class='flag-5'>Python</b> 中安装和使用顶级聚类算法

    何在OpenCV中使用基于深度学习的边缘检测?

    在这篇文章中,我们将学习如何在OpenCV中使用基于深度学习的边缘检测,它比目前流行的canny边缘检测器更精确。
    的头像 发表于 05-19 09:52 1676次阅读
    如<b class='flag-5'>何在</b>OpenCV<b class='flag-5'>中使</b>用基于深度学习的边缘检测?

    何在Linux命令行中运行Python脚本

    Python 脚本。 在本文中,我们将详细介绍如何在 Linux 命令行中运行 Python 脚本。我们将讨论以下主题:
    的头像 发表于 05-12 14:49 1373次阅读