0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

通过paho-mqtt软件包入门rt-thread的sal

冬至子 来源:happycode999 作者:happycode999 2023-08-09 15:37 次阅读

一、paho-mqtt软件包程序流程

1.1 paho_mqtt_start
在rt_wlan_register_event_handler函数注册好RT_WLAN_EVT_READY的回调函数paho_mqtt_start,当wifi准备好后调用mq_start启动mqtt。在mq_start中,初始化MQTTClient结构体,设置mqtt连接的参数:mqtt的uri、mqtt的用户名(username)和密码(password)、mqtt发布和订阅的主题Topic、消息质量等级QoS,最后调用paho_mqtt_start创建处理mqtt的线程paho_mqtt_thread。

static void mq_start(void)
{
/* init condata param by using MQTTPacket_connectData_initializer /
MQTTPacket_connectData condata = MQTTPacket_connectData_initializer;
static char cid[20] = { 0 };
static int is_started = 0;
if (is_started)
{
return;
}
/
config MQTT context param /
{
client.isconnected = 0;
client.uri = MQTT_URI;
/
generate the random client ID /
rt_snprintf(cid, sizeof(cid), "rtthread%d", rt_tick_get());
/
config connect param /
memcpy(&client.condata, &condata, sizeof(condata));
client.condata.clientID.cstring = cid;
client.condata.keepAliveInterval = 60;
client.condata.cleansession = 1;
client.condata.username.cstring = MQTT_USERNAME;
client.condata.password.cstring = MQTT_PASSWORD;
/
config MQTT will param. /
client.condata.willFlag = 1;
client.condata.will.qos = 1;
client.condata.will.retained = 0;
client.condata.will.topicName.cstring = MQTT_PUBTOPIC;
client.condata.will.message.cstring = MQTT_WILLMSG;
/
malloc buffer. /
client.buf_size = client.readbuf_size = 1024;
client.buf = malloc(client.buf_size);
client.readbuf = malloc(client.readbuf_size);
if (!(client.buf && client.readbuf))
{
LOG_E("no memory for MQTT client buffer!");
goto _exit;
}
/
set event callback function /
client.connect_callback = mqtt_connect_callback;
client.online_callback = mqtt_online_callback;
client.offline_callback = mqtt_offline_callback;
/
set subscribe table and event callback /
client.messageHandlers[0].topicFilter = MQTT_SUBTOPIC;
client.messageHandlers[0].callback = mqtt_sub_callback;
client.messageHandlers[0].qos = QOS1;
/
set default subscribe event callback /
client.defaultMessageHandler = mqtt_sub_default_callback;
}
/
run mqtt client /
paho_mqtt_start(&client);
is_started = 1;
_exit:
return;
}
rt_wlan_register_event_handler(RT_WLAN_EVT_READY, (void (
)(int, struct rt_wlan_buff *, void *))mq_start, RT_NULL);

1.2 paho_mqtt_thread
在paho_mqtt_thread中调用paho-mqtt提供的接口rt-thread的sal的接口完成与mqtt服务器的交互,包括以下几个方面:与服务器的连接、订阅主题、向服务器发送心跳包、处理服务器发送下来的消息(CONNACK、PUBACK、SUBACK、PUBLISH、PUBREC、PUBCOMP、PINGRESP)、回环服务器通过topic发送下来的消息。

static void paho_mqtt_thread(void *param)
{
MQTTClient *c = (MQTTClient )param;
int i, rc, len;
int rc_t = 0;
c->pub_sock = socket(AF_INET, SOCK_DGRAM, 0);
if (c->pub_sock == -1)
{
debug_printf("create pub_sock error!n");
goto _mqtt_exit;
}
/
bind publish socket. */
{
struct sockaddr_in pub_server_addr;
c->pub_port = pub_port;
pub_port ++;
pub_server_addr.sin_family = AF_INET;
pub_server_addr.sin_port = htons((c->pub_port));
pub_server_addr.sin_addr.s_addr = INADDR_ANY;
memset(&(pub_server_addr.sin_zero), 0, sizeof(pub_server_addr.sin_zero));
rc = bind(c->pub_sock, (struct sockaddr *)&pub_server_addr, sizeof(struct sockaddr));
if (rc == -1)
{
debug_printf("pub_sock bind error!n");
goto _mqtt_exit;
}
}
_mqtt_start:
if (c->connect_callback)
{
c->connect_callback(c);
}
rc = net_connect(c);
if (rc != 0)
{
goto _mqtt_restart;
}
rc = MQTTConnect(c);
if (rc != 0)
{
goto _mqtt_restart;
}
for (i = 0; i < MAX_MESSAGE_HANDLERS; i++)
{
const char topic = c->messageHandlers[i].topicFilter;
if(topic == RT_NULL)
continue;
rc = MQTTSubscribe(c, topic, QOS2);
debug_printf("Subscribe #%d %s %s!n", i, topic, (rc < 0) ? ("fail") : ("OK"));
if (rc != 0)
{
goto _mqtt_disconnect;
}
}
if (c->online_callback)
{
c->online_callback(c);
}
c->tick_ping = rt_tick_get();
while (1)
{
int res;
rt_tick_t tick_now;
fd_set readset;
struct timeval timeout;
tick_now = rt_tick_get();
if (((tick_now - c->tick_ping) / RT_TICK_PER_SECOND) > (c->keepAliveInterval - 5))
{
timeout.tv_sec = 1;
//debug_printf("tick close to ping.n");
}
else
{
timeout.tv_sec = c->keepAliveInterval - 10 - (tick_now - c->tick_ping) / RT_TICK_PER_SECOND;
//debug_printf("timeount for ping: %dn", timeout.tv_sec);
}
timeout.tv_usec = 0;
FD_ZERO(&readset);
FD_SET(c->sock, &readset);
FD_SET(c->pub_sock, &readset);
/
int select(maxfdp1, readset, writeset, exceptset, timeout); /
res = select(((c->pub_sock > c->sock) ? c->pub_sock : c->sock) + 1,
&readset, RT_NULL, RT_NULL, &timeout);
if (res == 0)
{
len = MQTTSerialize_pingreq(c->buf, c->buf_size);
rc = sendPacket(c, len);
if (rc != 0)
{
debug_printf("[%d] send ping rc: %d n", rt_tick_get(), rc);
goto _mqtt_disconnect;
}
/
wait Ping Response. /
timeout.tv_sec = 5;
timeout.tv_usec = 0;
FD_ZERO(&readset);
FD_SET(c->sock, &readset);
res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout);
if (res <= 0)
{
debug_printf("[%d] wait Ping Response res: %dn", rt_tick_get(), res);
goto _mqtt_disconnect;
}
} /
res == 0: timeount for ping. */
if (res < 0)
{
debug_printf("select res: %dn", res);
goto _mqtt_disconnect;
}
if (FD_ISSET(c->sock, &readset))
{
//debug_printf("sock FD_ISSETn");
rc_t = MQTT_cycle(c);
//debug_printf("sock FD_ISSET rc_t : %dn", rc_t);
if (rc_t < 0) goto _mqtt_disconnect;
continue;
}
if (FD_ISSET(c->pub_sock, &readset))
{
struct sockaddr_in pub_client_addr;
uint32_t addr_len = sizeof(struct sockaddr);
MQTTMessage *message;
MQTTString topic = MQTTString_initializer;
//debug_printf("pub_sock FD_ISSETn");
len = recvfrom(c->pub_sock, c->readbuf, c->readbuf_size, MSG_DONTWAIT,
(struct sockaddr *)&pub_client_addr, &addr_len);
if (pub_client_addr.sin_addr.s_addr != *((uint32_t )(&netif_default->ip_addr)))
{
#if 1
char client_ip_str[16]; /
###.###.###.### */
strcpy(client_ip_str,
inet_ntoa(*((struct in_addr *) & (pub_client_addr.sin_addr))));
debug_printf("pub_sock recvfrom len: %s, skip!n", client_ip_str);
#endif
continue;
}
if (len < sizeof(MQTTMessage))
{
c->readbuf[len] = '�';
debug_printf("pub_sock recv %d byte: %sn", len, c->readbuf);
if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0)
{
debug_printf("DISCONNECTn");
goto _mqtt_disconnect_exit;
}
continue;
}
message = (MQTTMessage *)c->readbuf;
message->payload = c->readbuf + sizeof(MQTTMessage);
topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen;
//debug_printf("pub_sock topic:%s, payloadlen:%dn", topic.cstring, message->payloadlen);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char *)message->payload, message->payloadlen);
if (len <= 0)
{
debug_printf("MQTTSerialize_publish len: %dn", len);
goto _mqtt_disconnect;
}
if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet
{
debug_printf("MQTTSerialize_publish sendPacket rc: %dn", rc);
goto _mqtt_disconnect;
}
} /* pbulish sock handler. */
} /* while (1) */
_mqtt_disconnect:
MQTTDisconnect(c);
_mqtt_restart:
if (c->offline_callback)
{
c->offline_callback(c);
}
net_disconnect(c);
rt_thread_delay(RT_TICK_PER_SECOND * 5);
debug_printf("restart!n");
goto _mqtt_start;
_mqtt_disconnect_exit:
MQTTDisconnect(c);
net_disconnect(c);
_mqtt_exit:
debug_printf("thread exitn");
return;
}

二、与mqtt broker的交互
paho-mqtt软件包提供了两种发布消息到mqtt broker的方式:udp和管道。在MQTTClient结构体中有三个成员与通信有关:sock、pub_sock、pub_pipe,其中sock是与mqtt broker通信的套接字,pub_sock和pub_pipe是两种不同的发布方式:pub_sock是通过udp的方式发布消息;pub_pipe是通过管道,最终由sock发布消息。如下面的代码所示,使用哪种方式可以通过宏来配置。下面展开描述这两种方式如何与mqtt broker交互的。

/* publish interface */

#if defined(RT_USING_POSIX) && (defined(RT_USING_DFS_NET) || defined(SAL_USING_POSIX))
int pub_pipe[2];
#else
int pub_sock;
int pub_port;
#endif

2.1 管道(pipe)方式
在paho_mqtt_pipe.c中的paho_mqtt_thread,下面的代码完成了发布消息、接收订阅消息、处理心跳包的工作。下面以三个点细说。

当需要发布消息时,应用层需要调用MQTTPublish,这个函数会调用write向管道的写端pub_pipe[1]写入待发送的数据。而管道的读端pub_pipe[0]在select中被监听,当MQTTPublish被调用时,select可以往下执行,首先调用read从管道中读取数据,接着调用MQTTSerialize_publish将数据封包,最后调用sendPacket将数据发送出去。

当接收到订阅的消息时,select会往下执行,接着调用MQTT_cycle读取并解析出数据。
select的超时时间是50s,如果50s没有消息处理,则向broker发送心跳包。

FD_ZERO(&readset);
FD_SET(c->sock, &readset);
FD_SET(c->pub_pipe[0], &readset);
/* int select(maxfdp1, readset, writeset, exceptset, timeout); /
res = select(((c->pub_pipe[0] > c->sock) ? c->pub_pipe[0] : c->sock) + 1,
&readset, RT_NULL, RT_NULL, &timeout);
if (res == 0)
{
len = MQTTSerialize_pingreq(c->buf, c->buf_size);
rc = sendPacket(c, len);
if (rc != 0)
{
LOG_E("[%d] send ping rc: %d ", rt_tick_get(), rc);
goto _mqtt_disconnect;
}
/
wait Ping Response. /
timeout.tv_sec = 5;
timeout.tv_usec = 0;
FD_ZERO(&readset);
FD_SET(c->sock, &readset);
res = select(c->sock + 1, &readset, RT_NULL, RT_NULL, &timeout);
if (res <= 0)
{
LOG_E("[%d] wait Ping Response res: %d", rt_tick_get(), res);
goto _mqtt_disconnect;
}
} /
res == 0: timeount for ping. */
if (res < 0)
{
LOG_E("select res: %d", res);
goto _mqtt_disconnect;
}
if (FD_ISSET(c->sock, &readset))
{
//LOG_D("sock FD_ISSET");
rc_t = MQTT_cycle(c);
//LOG_D("sock FD_ISSET rc_t : %d", rc_t);
if (rc_t < 0) goto _mqtt_disconnect;
continue;
}
if (FD_ISSET(c->pub_pipe[0], &readset))
{
MQTTMessage *message;
MQTTString topic = MQTTString_initializer;
//LOG_D("pub_sock FD_ISSET");
len = read(c->pub_pipe[0], c->readbuf, c->readbuf_size);
if (len < sizeof(MQTTMessage))
{
c->readbuf[len] = '�';
LOG_D("pub_sock recv %d byte: %s", len, c->readbuf);
if (strcmp((const char *)c->readbuf, "DISCONNECT") == 0)
{
LOG_D("DISCONNECT");
goto _mqtt_disconnect_exit;
}
continue;
}
message = (MQTTMessage *)c->readbuf;
message->payload = c->readbuf + sizeof(MQTTMessage);
topic.cstring = (char *)c->readbuf + sizeof(MQTTMessage) + message->payloadlen;
//LOG_D("pub_sock topic:%s, payloadlen:%d", topic.cstring, message->payloadlen);
len = MQTTSerialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char *)message->payload, message->payloadlen);
if (len <= 0)
{
LOG_D("MQTTSerialize_publish len: %d", len);
goto _mqtt_disconnect;
}
if ((rc = sendPacket(c, len)) != PAHO_SUCCESS) // send the subscribe packet
{
LOG_D("MQTTSerialize_publish sendPacket rc: %d", rc);
goto _mqtt_disconnect;
}
}

2.2 udp方式
udp方式中,处理流程与管道方式基本相似。下面说明一下这种方式两个套接字的工作流程。
MQTTClient结构体中有两个socket,一个是基于tcp的负责控制与服务器连接的sock,另一个是基于udp协议的负责消息发布的pub_sock。

2.2.1 sock
连接:在net_connect调用socket、connet函数建立与服务器的tcp连接。
处理:sock接收到服务器的数据后,在MQTT_cycle中处理来自服务器的CONNACK、PUBACK、SUBACK、PUBLISH、PUBREC、PUBCOMP、PINGRESP消息。
断开连接:在net_disconnect函数中调用closesocket关闭与服务器的tcp连接。

2.2.2 pub_sock
连接:分为pub_sock的绑定和mqtt连接的建立
1、调用socket创建pub_sock,之后调用bind绑定pub_sock到udp端口
2、在MQTTConnect函数中,通过sock发送connect消息给服务器,建立mqtt连接。
处理:先recvfrom将接受的数据拷贝到MQTTClient的readbuf,再将数据回环发布到服务器。
断开连接:通过sock向服务器发送DISCONNECT消息,断开mqtt连接。

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 处理器
    +关注

    关注

    68

    文章

    18290

    浏览量

    222194
  • WLAN技术
    +关注

    关注

    0

    文章

    23

    浏览量

    9221
  • RT-Thread
    +关注

    关注

    31

    文章

    1149

    浏览量

    38906
  • MQTT
    +关注

    关注

    5

    文章

    538

    浏览量

    21981
  • MQTT协议
    +关注

    关注

    0

    文章

    90

    浏览量

    5224
  • TCP通信
    +关注

    关注

    0

    文章

    145

    浏览量

    4138
收藏 人收藏

    评论

    相关推荐

    paho-mqtt模块实战

    的编程语言,它在系统编程和网络编程方面有着很好的表现。在本教程中,我们将介绍如何使用Rust语言和paho-mqtt模块实现MQTT协议的应用。 前面写过3篇介绍 rumqttc 的教程,本篇开始使用
    的头像 发表于 09-20 11:33 796次阅读

    RT-Thread Studio添加软件包报错怎么解决?

    RT-Thread Studio添加软件包报错ImportError: No module named psutil
    发表于 03-01 08:41

    如何利用RT-Thread开发的PahoMQTT软件包MQTT服务器进行通信?

    本教程就是介绍如何利用 RT-Thread 开发的 Paho MQTT 软件包MQTT 服务器进行通信的。
    发表于 03-30 08:09

    RT-Thread OneNET软件包的功能特点是什么?

    有哪位大神能否介绍一下OneNET 平台 。 RT-Thread OneNET 软件包功能特点是什么?
    发表于 04-02 06:39

    介绍RT-Thread软件包

    学习要点介绍 RT-Thread软件包;简介 nRF24L01 软件包的使用,讲解如何使用此软件包将数据正确发送和接收;学习线程间的通信,IPC 的使用,即获取温度的线程 A 与无
    发表于 07-27 06:07

    WIZnet软件包对接RT-Thread SAL套接字抽象层实现对BSD Socket APIs的支持

    RT-Thread SAL 套接字抽象层,实现对标准 BSD Socket APIs 的支持,完美的兼容多种软件包和网络功能实现,提高 WIZnet 设备兼容性。1.1 目录结构WIZnet
    发表于 05-17 17:00

    使用menuconfig配置基于RT-Thread的NimBLE软件包

    最近在学习 RT-Thread 中的 NimBLE 软件包,使用 menuconfig 配置选中 NimBLE 软件包,设置各种选项后,成功通过编译并且运行起来。不过这仅仅只是按照文档
    发表于 06-27 11:18

    基于Eclipse paho-mqtt源码MQTT客户端设计(上)

    ├───tests // mqtt 功能测试程序│ LICENSE // 软件包许可证│ README.md // 软件包使用说明└───SConscript // RT-Thread
    发表于 08-04 16:28

    基于Eclipse paho-mqtt源码MQTT客户端设计(下)

    MQTT 使用说明准备工作首先需要下载 MQTT 软件包,并将软件包加入到项目中。在 BSP 目录下使用 menuconfig 命令打开 env 配置界面,在
    发表于 08-04 16:33

    新手求助MQTT选择哪个软件包比较合适呢?

    大概看了下当前有 umqtt、umqtt 和 paho-mqtt 几个软件包,除了貌似 paho 的那个比较拉跨,其他的两个有什么优劣么?另外,内网环境下面的 MQTT 的传输延迟一般
    发表于 11-17 10:21

    RT-Thread中的mymqtt软件包添加步骤与使用方法

    文章 RT-Thread中Lan8720和lwip协议栈的使用的工程基础上添加mymqtt软件包。使能mqtt example和mqtt test,保存,等待下载更新
    发表于 02-13 14:58

    使用RT-Thread Studio开发CH32V307实现按键软件包使用

    使用上篇帖子创建的工程,使用RT-Thread Studio软件包添加工具,快速实现MultiButton的使用在RT-Thread Studio工程内打开RT-Thread Sett
    发表于 04-15 21:16

    UIoT RT-Thread软件包介绍

    UIoT RT-Thread 软件包实现了 IoT 设备与 UCloud UIoT Core 物联网通信云平台连接,包含设备注册、MQTT、设备影子、物模型、OTA、文件上传等功能,开发者进行灵活裁剪。
    发表于 09-26 07:22

    RT-Thread软件包定义和使用

    RT-Thread软件包是运行于RT-Thread物联网操作系统平台上,面向不同应用领域的通用软件组件 。RT-Thread 同时提供了开放
    的头像 发表于 05-21 11:29 9674次阅读
    <b class='flag-5'>RT-Thread</b><b class='flag-5'>软件包</b>定义和使用

    RT-Thread中mymqtt软件包的使用方法

    在上一篇文章 RT-Thread中Lan8720和lwip协议栈的使用的工程基础上添加mymqtt软件包。 使能mqtt example和mqtt test,保存,等待下载更新
    的头像 发表于 10-13 10:44 528次阅读
    <b class='flag-5'>RT-Thread</b>中mymqtt<b class='flag-5'>软件包</b>的使用方法