https://github.com/jiejieTop/MQTTclient

mqttclient

一个高性能、高稳定性的跨平台MQTT客户端

一个高性能、高稳定性的跨平台MQTT客户端,基于socket API之上开拓,可以在嵌入式设备(FreeRTOS/LiteOS/RT-Thread/TencentOS tiny)、Linux、Windows、Mac上利用,拥有非常简洁的API接口,以极少的资源实现QOS2的做事质量,并且无缝衔接了mbedtls加密库。

phpmqtt开源一个高机能高稳固性的跨平台MQTT客户端 jQuery

上风:基于标准BSD socket之上开拓,只假如兼容BSD socket的系统均可利用。
稳定:无论是掉线重连,丢包重发,都是严格遵照MQTT协议标准实行,除此之外对大数据量的测试无论是收是发,都是非常稳定(一次发送135K数据,3秒一次),高频测试也是非常稳定(7个主题同时收发,每秒一次,也便是1秒14个mqtt报文,做事质量QoS0、QoS1、QoS2都有)。
由于作者以极少的资源设计了记录机制,对采取QoS1做事质量的报文必须担保到达一次,当发布的主题(qos1、qos2都适用)没有被做事器收到时会自动重发,而对QoS2做事质量的报文担保有且只有处理一次(如果不相信它稳定性的同学可以自己去修正源码,专门为QoS2做事质量去做测试,故意不回答PUBREC包,让做事看重发QoS2报文,且看看客户端是否有且只有处理一次),而对付掉线重连的稳定性,这种则是基本操作了,没啥好说的,在自动重连后还会自动重新订阅主题,担保主题不会丢失,因此在测试中稳定性极好。
轻量级:全体代码工程极其大略,不该用mbedtls情形下,占用资源极少,作者曾利用esp8266模组与云端通信,全体工程代码花费的RAM不敷15k(包括系统占用的开销,对数据的处理开销,而这次还是未优化的情形下,还依旧完美保留了掉线重连的稳定性,但是对应qos1、qos2做事质量的报文则未做测试,由于STM32F103C8T6芯片资源实在是太少了,折腾不起)。
无缝衔接mbedtls加密传输,让网络传输更加安全,而且接口层完备不须要用户理会,无论是否加密,mqttclient对用户供应的API接口是没有变革的,这就很好的兼容了一套代运用层的码可以加密传输也可以不加密传输。
拥有极简的API接口,总的来说,mqttclient的配置都有默认值,基本无需配置都能利用的,也可以随意配置,对配置都有健壮性检测,这样子设计的API接口也是非常大略。
有非常好的代码风格与思想:全体代码采取分层式设计,代码实现采取异步处理的思想,降落耦合,提高性能,详细表示在什么地方呢?很大略,目前市情上很多MQTT客户端发布主题都是要壅塞等待ack,这是非常暴力的行为,壅塞当前哨程等待做事器的应答,那如果我想要发送数据怎么办,或者我要重复检测数据怎么办,你可能会说,指定壅塞韶光等待,那如果网络延迟,ack迟迟不来,我就白等了吗,对付qos1、qos2的做事质量怎么办,以是说这种还是要异步处理的思想,我发布主题,那我发布出去就好了,不须要等待,对付qos1、qos2做事质量的MQTT报文,如果做事器没收到,那我重发就可以,这种重发也是异步的处理,完备不会壅塞当前哨程。
MQTT协议支持主题通配符“#”、“+”。
订阅的主题与处理完备分离,让编程逻辑更加大略易用,用户无需理会错综繁芜的逻辑关系。
mqttclient内部已实现保活处理机制,无需用户过多关心理会,用户只需专心处理运用功能即可。
无缝衔接salof:它是一个同步异步日志输出框架,在空闲时候输出对应的日志信息,也可以将信息写入flash中保存,方便调试。
不对外产生依赖。
利用 paho mqtt 库整体框架

拥有非常明确的分层框架。

整体框架

整体框架

目前已实现了Linux、TencentOS tiny、FreeRTOS、RT-Thread平台(已做成软件包,名字为kawaii-mqtt),除此之外TencentOS tiny的AT框架亦可以利用(RAM花费不敷15K),并且稳定性极好!

版本

问题

欢迎以 GitHub Issues 的形式提交问题和bug报告

版权和容许

mqttclient 遵照 Apache License v2.0 开源协议。
鼓励代码共享和尊重原作者的著作权,可以自由的利用、修正源代码,也可以将修正后的代码作为开源或闭源软件发布,但必须保留原作者版权声明。

linux平台下测试利用安装cmake:

sudoapt-getinstallcmake配置

在mqttclient/test/test.c文件中修正以下内容:

init_params.connect_params.network_params.network_ssl_params.ca_crt=test_ca_get();/CA证书/init_params.connect_params.network_params.addr="xxxxxxx";/做事器域名/init_params.connect_params.network_params.port="8883";/做事器端口号/init_params.connect_params.user_name="xxxxxxx";/用户名/init_params.connect_params.password="xxxxxxx";/密码/init_params.connect_params.client_id="xxxxxxx";/客户端id/mbedtls

默认打开mbedtls。

salof 全称是:Synchronous Asynchronous Log Output Framework(同步异步日志输出框架),它是一个同步异步日志输出框架,在空闲时候输出对应的日志信息,并且该库与mqttclient无缝衔接。

配置对应的日志输出级别:

#defineBASE_LEVEL(0)#defineASSERT_LEVEL(BASE_LEVEL+1)/日志输出级别:断言级别(非常高优先级)/#defineERR_LEVEL(ASSERT_LEVEL+1)/日志输出级别:缺点级别(高优先级)/#defineWARN_LEVEL(ERR_LEVEL+1)/日志输出级别:警告级别(中优先级)/#defineINFO_LEVEL(WARN_LEVEL+1)/日志输出级别:信息级别(低优先级)/#defineDEBUG_LEVEL(INFO_LEVEL+1)/日志输出级别:调试级别(更低优先级)/#defineLOG_LEVELWARN_LEVEL/日志输出级别/

日志其他选项:

终端带颜色韶光戳标签mqttclient的配置

配置mqtt等待应答列表的最大值,对付qos1 qos2做事质量有哀求的可以将其设置大一点,当然也必须资源跟得上,它紧张是担保qos1 qos2的mqtt报文能准确到达做事器。

#defineMQTT_ACK_HANDLER_NUM_MAX64

选择MQTT协议的版本,默认为4,表示利用MQTT 3.1.1版本,而3则表示为MQTT 3.1版本。

#defineMQTT_VERSION4//4ismqtt3.1.1

设置默认的保活韶光,它紧张是担保MQTT客户端与做事器的保持活性连接,单位为 秒 ,比如MQTT客户端与做事器100S没有发送数据了,有没有吸收到数据,此时MQTT客户端会发送一个ping包,确认一下这个会话是否存在,如果收到做事器的应答,那么解释这个会话还是存在的,可以随时收发数据,而如果不存在了,就打消会话。

#defineMQTT_KEEP_ALIVE_INTERVAL100//unit:second

默认的命令超时,它紧张是用于socket读写超时,在MQTT初始化时可以指定:

#defineMQTT_DEFAULT_CMD_TIMEOUT4000

默认主题的长度,主题是支持通配符的,如果主题太长则会被截断:

#defineMQTT_TOPIC_LEN_MAX64

默认的算法数据缓冲区的大小,如果要发送大量数据则修正大一些,在MQTT初始化时可以指定:

#defineMQTT_DEFAULT_BUF_SIZE1024

线程干系的配置,如线程栈,线程优先级,线程韶光片等:在linux环境下可以是不须要理会这些参数的,而在RTOS平台则须要配置,如果不该用mbedtls,线程栈2048字节已足够,而利用mbedtls加密后,须要配置4096字节以上。

#defineMQTT_THREAD_STACK_SIZE2048//线程栈#defineMQTT_THREAD_PRIO5//线程优先级#defineMQTT_THREAD_TICK50//线程韶光片

默认的重连韶光间隔,当发生掉线时,会以这个韶光间隔考试测验重连:

#defineMQTT_RECONNECT_DEFAULT_DURATION1000

其他不须要怎么配置的东西:

#defineMQTT_MAX_PACKET_ID(0xFFFF-1)//mqtt报文id#defineMQTT_MAX_CMD_TIMEOUT20000//最大的命令超时参数#defineMQTT_MIN_CMD_TIMEOUT1000//最小的命令超时参数

ps:以上参数基本不须要怎么配置的,直接用即可~

编译 & 运行

./build.sh

运行build.sh脚本后会在 ./build/bin/目录下天生可实行文件mqtt-client,直接运行即可。

编译成动态库libmqttclient.so

./make-libmqttclient.sh

运行make-libmqttclient.sh脚本后会在 ./libmqttclient/lib目录下天生一个动态库文件libmqttclient.so,并安装到系统的/usr/lib目录下,干系头文件已经拷贝到./libmqttclient/include目录下,编译运用程序的时候只须要链接动态库即可-lmqttclient,动态库的配置文件根据./test/mqtt_config.h配置的。

设计思想整体采取分层式设计,代码实现采取异步设计办法,降落耦合。
的处理利用回调的办法处理:用户指定[订阅的主题]与指定[的处理函数]不对外产生依赖API

mqttclient拥有非常简洁的api接口

intmqtt_keep_alive(mqtt_client_tc);intmqtt_init(mqtt_client_tc,client_init_params_tinit);intmqtt_release(mqtt_client_tc);intmqtt_connect(mqtt_client_tc);intmqtt_disconnect(mqtt_client_tc);intmqtt_subscribe(mqtt_client_tc,constchartopic_filter,mqtt_qos_tqos,message_handler_tmsg_handler);intmqtt_unsubscribe(mqtt_client_tc,constchartopic_filter);intmqtt_publish(mqtt_client_tc,constchartopic_filter,mqtt_message_tmsg);intmqtt_list_subscribe_topic(mqtt_client_tc);intmqtt_set_interceptor_handler(mqtt_client_tc,interceptor_handler_thandler);核心

mqtt_client_t 构造

typedefstructmqtt_client{unsignedshortpacket_id;unsignedcharping_outstanding;unsignedcharack_handler_number;unsignedcharread_buf;unsignedcharwrite_buf;unsignedintcmd_timeout;unsignedintread_buf_size;unsignedintwrite_buf_size;unsignedintreconnect_try_duration;voidreconnect_date;reconnect_handler_treconnect_handler;client_state_tclient_state;platform_mutex_twrite_lock;platform_mutex_tglobal_lock;mqtt_list_tmsg_handler_list;mqtt_list_tack_handler_list;network_tnetwork;platform_thread_tthread;platform_timer_treconnect_timer;platform_timer_tlast_sent;platform_timer_tlast_received;connect_params_tconnect_params;interceptor_handler_tinterceptor_handler;}mqtt_client_t;

该构造紧张掩护以下内容:

读写数据缓冲区read_buf、write_buf命令超时时间cmd_timeout(紧张是读写壅塞韶光、等待相应的韶光、重连等待韶光)掩护ack链表ack_handler_list,这是异步实现的核心,所有等待相应的报文都会被挂载到这个链表上掩护处理列表msg_handler_list,这是mqtt协议必须实现的内容,所有来自做事器的publish报文都会被处理(条件是订阅了对应的)掩护一个网卡接口network掩护一个内部线程thread,所有来自做事器的mqtt包都会在这里被处理!
两个定时器,分别是掉线重连定时器与保活定时器reconnect_timer、last_sent、last_received一些连接的参数connect_paramsmqttclient实现

以下是全体框架的实现办法,方便大家更随意马虎理解mqttclient的代码与设计思想,让大家能够修正源码与利用,还可以提交pr或者issues,开源的天下期待各位大神的参与,感谢!

除此之外以下代码的记录机制与其超时处理机制是非常好的编程思想,大家有兴趣一定要看源代码!

初始化

intmqtt_init(mqtt_client_tc,client_init_params_tinit)

紧张是配置mqtt_client_t构造的干系信息,如果没有指定初始化参数,则系统会供应默认的参数。
但连接部分的参数则必须指定:

init_params.connect_params.network_params.addr="[你的mqtt做事器IP地址或者是域名]";init_params.connect_params.network_params.port=1883;//端口号init_params.connect_params.user_name="jiejietop";init_params.connect_params.password="123456";init_params.connect_params.client_id="clientid";mqtt_init(&client,&init_params);连接做事器

intmqtt_connect(mqtt_client_tc);

参数只有 mqtt_client_t 类型的指针,字符串类型的主题(支持通配符"#" "+"),主题的做事质量,以及收到报文的处理函数,如不指定则有默认处理函数。
连接做事器则是利用非异步的办法设计,由于必须等待连接上做事器才能进行下一步操作。

过程如下:

调用底层的连接函数连接上做事器:

c->network->connect(c->network);序列化mqtt的CONNECT报文并且发送

MQTTSerialize_connect(c->write_buf,c->write_buf_size,&connect_data)mqtt_send_packet(c,len,&connect_timer)等待来自做事器的CONNACK报文

mqtt_wait_packet(c,CONNACK,&connect_timer)连接成功后创建一个内部线程mqtt_yield_thread,并在得当的时候启动它:

platform_thread_init("mqtt_yield_thread",mqtt_yield_thread,c,MQTT_THREAD_STACK_SIZE,MQTT_THREAD_PRIO,MQTT_THREAD_TICK)if(NULL!=c->thread){mqtt_set_client_state(c,CLIENT_STATE_CONNECTED);platform_thread_startup(c->thread);platform_thread_start(c->thread);/startrunmqttthread/}而对付重连来说则不会重新创建线程,直接改变客户端状态为连接状态即可:

mqtt_set_client_state(c,CLIENT_STATE_CONNECTED);订阅报文

intmqtt_subscribe(mqtt_client_tc,constchartopic_filter,mqtt_qos_tqos,message_handler_thandler)

订阅报文利用异步设计来实现的:过程如下:

序列化订阅报文并且发送给做事器

MQTTSerialize_subscribe(c->write_buf,c->write_buf_size,0,mqtt_get_next_packet_id(c),1,&topic,(int)&qos)mqtt_send_packet(c,len,&timer)创建对应的处理节点,这个节点在收到做事器的SUBACK订阅应答报文后会挂载随处置列表msg_handler_list上

mqtt_msg_handler_create(topic_filter,qos,handler)在发送了报文给做事器那就要等待做事器的相应了,先记录这个等待SUBACK

mqtt_ack_list_record(c,SUBACK,mqtt_get_next_packet_id(c),len,msg_handler)取消订阅

与订阅报文的逻辑基本差不多的~

序列化订阅报文并且发送给做事器

MQTTSerialize_unsubscribe(c->write_buf,c->write_buf_size,0,packet_id,1,&topic)mqtt_send_packet(c,len,&timer)创建对应的处理节点,这个节点在收到做事器的UNSUBACK取消订阅应答报文后将处理列表msg_handler_list上的已经订阅的主题节点销毁

mqtt_msg_handler_create((constchar)topic_filter,QOS0,NULL)在发送了报文给做事器那就要等待做事器的相应了,先记录这个等待UNSUBACK

mqtt_ack_list_record(c,UNSUBACK,packet_id,len,msg_handler)发布报文

intmqtt_publish(mqtt_client_tc,constchartopic_filter,mqtt_message_tmsg)

参数只有 mqtt_client_t 类型的指针,字符串类型的主题(支持通配符),要发布的(包括做事质量、主体)。

mqtt_message_tmsg;msg.qos=2;msg.payload=(void)buf;mqtt_publish(&client,"testtopic1",&msg);

核心思想都差不多,过程如下:

先序列化发布报文,然后发送到做事器

MQTTSerialize_publish(c->write_buf,c->write_buf_size,0,msg->qos,msg->retained,msg->id,topic,(unsignedchar)msg->payload,msg->payloadlen);mqtt_send_packet(c,len,&timer)对付QOS0的逻辑,不做任何处理,对付QOS1和QOS2的报文则须要记录下来,在没收到做事器应答的时候进行重发

if(QOS1==msg->qos){rc=mqtt_ack_list_record(c,PUBACK,mqtt_get_next_packet_id(c),len,NULL);}elseif(QOS2==msg->qos){rc=mqtt_ack_list_record(c,PUBREC,mqtt_get_next_packet_id(c),len,NULL);}还有非常主要的一点,重发报文的MQTT报文头部须要设置DUP标志位,这是MQTT协议的标准,因此,在重发的时候作者直接操作了报文的DUP标志位,由于修正DUP标志位的函数我没有从MQTT库中找到,以是我封装了一个函数,这与LwIP中的交叉存取思想是一个道理,它假设我知道MQTT报文的所有操作,以是我可以操作它,这样子可以提高很多效率:

mqtt_set_publish_dup(c,1);/mayresendthisdata,settheudpflaginadvance/内部线程

staticvoidmqtt_yield_thread(voidarg)

紧张是对mqtt_yield函数的返回值做处理,比如在disconnect的时候销毁这个线程。

核心的处理函数数据包的处理mqtt_packet_handle

staticintmqtt_packet_handle(mqtt_client_tc,platform_timer_ttimer)

对不同的包利用不一样的处理:

switch(packet_type){case0:/timedoutreadingpacket/break;caseCONNACK:break;casePUBACK:casePUBCOMP:rc=mqtt_puback_and_pubcomp_packet_handle(c,timer);break;caseSUBACK:rc=mqtt_suback_packet_handle(c,timer);break;caseUNSUBACK:rc=mqtt_unsuback_packet_handle(c,timer);break;casePUBLISH:rc=mqtt_publish_packet_handle(c,timer);break;casePUBREC:casePUBREL:rc=mqtt_pubrec_and_pubrel_packet_handle(c,timer);break;casePINGRESP:c->ping_outstanding=0;break;default:gotoexit;}

并且做保活的处理:

mqtt_keep_alive(c)

当发生超时后

if(platform_timer_is_expired(&c->last_sent)||platform_timer_is_expired(&c->last_received))

序列号一个心跳包并且发送给做事器

MQTTSerialize_pingreq(c->write_buf,c->write_buf_size);mqtt_send_packet(c,len,&timer);

当再次发生超时后,表示与做事器的连接已断开,须要重连的操作,设置客户端状态为断开连接

mqtt_set_client_state(c,CLIENT_STATE_DISCONNECTED);ack链表的扫描,当收到做事器的报文时,对ack列表进行扫描操作

mqtt_ack_list_scan(c);

当超时后就销毁ack链表节点:

mqtt_ack_handler_destroy(ack_handler);

当然下面这几种报文则须要重发操作:(PUBACK 、PUBREC、 PUBREL 、PUBCOMP,担保QOS1 QOS2的做事质量)

if((ack_handler->type==PUBACK)||(ack_handler->type==PUBREC)||(ack_handler->type==PUBREL)||(ack_handler->type==PUBCOMP))mqtt_ack_handler_resend(c,ack_handler);保持活性的韶光过去了,可能掉线了,须要重连操作

mqtt_try_reconnect(c);

重连成功后考试测验重新订阅报文,担保恢复原始状态~

mqtt_try_resubscribe(c)发布应答与发布完成报文的处理

staticintmqtt_puback_and_pubcomp_packet_handle(mqtt_client_tc,platform_timer_ttimer)反序列化报文

MQTTDeserialize_ack(&packet_type,&dup,&packet_id,c->read_buf,c->read_buf_size)取消对应的ack记录

mqtt_ack_list_unrecord(c,packet_type,packet_id,NULL);订阅应答报文的处理

staticintmqtt_suback_packet_handle(mqtt_client_tc,platform_timer_ttimer)反序列化报文

MQTTDeserialize_suback(&packet_id,1,&count,(int)&granted_qos,c->read_buf,c->read_buf_size)取消对应的ack记录

mqtt_ack_list_unrecord(c,packet_type,packet_id,NULL);安装对应的订阅处理函数,如果是已存在的则不会安装

mqtt_msg_handlers_install(c,msg_handler);取消订阅应答报文的处理

staticintmqtt_unsuback_packet_handle(mqtt_client_tc,platform_timer_ttimer)反序列化报文

MQTTDeserialize_unsuback(&packet_id,c->read_buf,c->read_buf_size)取消对应的ack记录,并且获取到已经订阅的处理节点

mqtt_ack_list_unrecord(c,UNSUBACK,packet_id,&msg_handler)销毁对应的订阅处理函数

mqtt_msg_handler_destory(msg_handler);来自做事器的发布报文的处理

staticintmqtt_publish_packet_handle(mqtt_client_tc,platform_timer_ttimer)反序列化报文

MQTTDeserialize_publish(&msg.dup,&qos,&msg.retained,&msg.id,&topic_name,(unsignedchar)&msg.payload,(int)&msg.payloadlen,c->read_buf,c->read_buf_size)对付QOS0、QOS1的报文,直接去处理

mqtt_deliver_message(c,&topic_name,&msg);对付QOS1的报文,还须要发送一个PUBACK应答报文给做事器

MQTTSerialize_ack(c->write_buf,c->write_buf_size,PUBACK,0,msg.id);而对付QOS2的报文则须要发送PUBREC报文给做事器,除此之外还须要记录PUBREL到ack链表上,等待做事器的发布开释报文,末了再去处理这个

MQTTSerialize_ack(c->write_buf,c->write_buf_size,PUBREC,0,msg.id);mqtt_ack_list_record(c,PUBREL,msg.id+1,len,NULL)mqtt_deliver_message(c,&topic_name,&msg);

解释:一旦注册到ack列表上的报文,当具有重复的报文是不会重新被注册的,它会通过mqtt_ack_list_node_is_exist函数判断这个节点是否存在,紧张是依赖等待相应的类型与msgid。

发布收到与发布开释报文的处理

staticintmqtt_pubrec_and_pubrel_packet_handle(mqtt_client_tc,platform_timer_ttimer)反序列化报文

MQTTDeserialize_ack(&packet_type,&dup,&packet_id,c->read_buf,c->read_buf_size)产生一个对应的应答报文

mqtt_publish_ack_packet(c,packet_id,packet_type);取消对应的ack记录

mqtt_ack_list_unrecord(c,UNSUBACK,packet_id,&msg_handler)开源地址

https://github.com/jiejieTop/mqttclient