概述
在文章Paho - MQTT C Cient的实现中,我介绍了如何使用Paho开源项目创建MQTTClient_pulish客户端。但只是简单的介绍了使用方法,而且客户端的结果与之前介绍的并不吻合,今天我就结合新的例子,给大家讲解一下Paho使用MQTT客户端的主要过程。
如同前面介绍的,MQTT客户端分为同步客户端和异步客户端。今天主要讲解的是同步客户端,结构还是如同步客户端中介绍的:
1.创建一个客户端对象;
2.设置连接MQTT服务器的选项;
3.如果多线程(异步模式)操作被使用则设置回调函数(详见 Asynchronous >vs synchronous client applications);
4.订阅客户端需要接收的任意话题;
5.重复以下操作直到结束:
a.发布客户端需要的任意信息;
b.处理所有接收到的信息;
6.断开客户端连接;
7.释放客户端使用的所有内存。
实现
好,直接上代码,MQTT简单的同步客户端。
#include <pthread.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include "MQTTClient.h"#if !defined(WIN32)#include <unistd.h>#else#include <windows.h>#endif#define NUM_THREADS 2#define ADDRESS "tcp://localhost:1883" //更改此处地址#define CLIENTID "aaabbbccc_pub" //更改此处客户端ID#define SUB_CLIENTID "aaabbbccc_sub" //更改此处客户端ID#define TOPIC "topic01" //更改发送的话题#define PAYLOAD "Hello Man, Can you see me ?!" //#define QOS 1#define TIMEOUT 10000L#define USERNAME "test_user"#define PASSWORD "jim777"#define DISCONNECT "out"int CONNECT = 1;volatile MQTTClient_deliveryToken deliveredtoken;void delivered(void *context, MQTTClient_deliveryToken dt){ printf("Message with token value %d delivery confirmed\n", dt); deliveredtoken = dt;}int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message){ int i; char* payloadptr; printf("Message arrived\n"); printf(" topic: %s\n", topicName); printf(" message: "); payloadptr = message->payload; if(strcmp(payloadptr, DISCONNECT) == 0){ printf(" \n out!!"); CONNECT = 0; } for(i=0; i<message->payloadlen; i++) { putchar(*payloadptr++); } printf("\n"); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1;}void connlost(void *context, char *cause){ printf("\nConnection lost\n"); printf(" cause: %s\n", cause);}void *subClient(void *threadid){ long tid; tid = (long)threadid; printf("Hello World! It's me, thread #%ld!\n", tid); MQTTClient client; MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; int rc; int ch; MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = USERNAME; conn_opts.password = PASSWORD; MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered); if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n" "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS); MQTTClient_subscribe(client, TOPIC, QOS); do { ch = getchar(); } while(ch!='Q' && ch != 'q'); MQTTClient_unsubscribe(client, TOPIC); MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client); pthread_exit(NULL);}void *pubClient(void *threadid){ long tid; tid = (long)threadid; int count = 0; printf("Hello World! It's me, thread #%ld!\n", tid);//声明一个MQTTClient MQTTClient client; //初始化MQTT Client选项 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 } MQTTClient_message pubmsg = MQTTClient_message_initializer; //声明消息token MQTTClient_deliveryToken token; int rc; //使用参数创建一个client,并将其赋值给之前声明的client MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; conn_opts.username = USERNAME; conn_opts.password = PASSWORD; //使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0; while(CONNECT){ MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); usleep(3000000L); } MQTTClient_disconnect(client, 10000); MQTTClient_destroy(&client);}int main(int argc, char* argv[]){ pthread_t threads[NUM_THREADS]; long t; pthread_create(&threads[0], NULL, subClient, (void *)0); pthread_create(&threads[1], NULL, pubClient, (void *)1); pthread_exit(NULL);}
在代码中,我创建了两个线程,分别用来处理订阅客户端和发布客户端。
整体详解
接下来我讲解一下这个简单的客户端,其中,大体的流程如下:
大体的流程如图所示,在客户端启动之后,会启动线程,创建一个订阅客户端,它会监听消息的到达,在消息到达之后会触发相应的回调函数以对消息进行处理;后在启动一个线程,创建一个发送客户端,用来发送消息的,每次发送消息之前会判断是否要掉线,如CONNECT=0则会掉线,否则发送消息给topic01。
订阅客户端详解
-------
以下函数完成的是订阅的功能。
void *subClient(void *threadid)
过程大概如下:
第一步:声明客户端,并通过函数给其赋值;
MQTTClient client;MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:设置连接MQTT服务器的选项;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:设置回调函数;
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);//相应的回调函数connlost,msgarrvd,delivered我的代码中都有
第四步:使用客户端和连接选项连接服务器;
MQTTClient_connect(client, &conn_opts))
第五步订阅话题;
MQTTClient_subscribe(client, TOPIC, QOS);
第六步一直等待,知道输入'Q' 或'q';
do { ch = getchar(); } while(ch!='Q' && ch != 'q');
第六步一直等待,直到输入'Q' 或'q';
do { ch = getchar(); } while(ch!='Q' && ch != 'q');
第七步取消订阅;
MQTTClient_unsubscribe(client, TOPIC);
第八步.断开客户端连接;
MQTTClient_disconnect(client, 10000);
第九步.释放客户端使用的所有内存;
MQTTClient_destroy(&client);
至此,订阅客户端就结束了。一般订阅客户端的大体结构都是这样。不同的是回调函数的个性化上。
发送客户端详解
-------
以下函数完成的是发送的功能。
void *pubClient(void *threadid)
过程大概如下:
第一步:声明客户端,并通过函数给其赋值;
MQTTClient client;MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
第二步:设置连接MQTT服务器的选项;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
第三步:使用客户端和连接选项连接服务器;
MQTTClient_connect(client, &conn_opts)
第四步设置发送消息的属性;
pubmsg.payload = PAYLOAD; pubmsg.payloadlen = strlen(PAYLOAD); pubmsg.qos = QOS; pubmsg.retained = 0;
第五步循环发送消息;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
第六步一直等待,当CONNECT=0时退出该客户端;
第七步.断开客户端连接;
MQTTClient_disconnect(client, 10000);
第八步.释放客户端使用的所有内存;
MQTTClient_destroy(&client);
至此,发送客户端就结束了。一般的发送客户端大体结构也如此,但异步客户端可能有些许不同,无非就是设计回调函数,然后在连接,断开连接等时可以使用回调函数做一些操作而已,具体的可以自己研究。
为了让大家能够更深入了解,我把自己学到的一些函数和结构体大致在下面讲解了一下。
相关结构体
-----
MQTTClient
定义:typedef void* MQTTClient;
含义:代表MQTT客户端的句柄。成功调用MQTTClient_create()后,可以得到有效的客户端句柄。
MQTTClient_connectOptions
定义:
typedef struct{char struct_id[4];//结构体的识别序列,必须为MQTCint struct_version;//结构体版本/**在0,1,2,3,4,5中取值:0-表示没有SSL选项且没有serverURIs;1-表示没有serverURIs;2-表示没有MQTTVersion3-表示没有返回值;4-表示没有二进制密码选项*/int keepAliveInterval;/**在这段时间内没有数据相关的消息时,客户端发送一个非常小的MQTT“ping”消息,服务器将会确认这个消息*/int cleansession;/**当cleansession为true时,会话状态信息在连接和断开连接时被丢弃。 将cleansession设置为false将保留会话状态信息*/int reliable;/*将该值设置为true意味着必须完成发布的消息(已收到确认),才能发送另一个消息*/MQTTClient_willOptions* will;/*如果程序不使用最后的意愿和遗嘱功能,请将此指针设置为NULL。*/const char* username;//用户名const char* password;//密码int connectTimeout;//允许尝试连接的过时时间int retryInterval;//尝试重连的时间MQTTClient_SSLOptions* ssl;/*如果程序不使用最后的ssl,请将此指针设置为NULL。*/int serverURIcount;char* const* serverURIs;/*连接服务器的url,以protocol:// host:port为格式*/int MQTTVersion;/*MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4) */struct{const char* serverURI; int MQTTVersion; int sessionPresent; } returned; struct { int len; const void* data; } binarypwd;} MQTTClient_connectOptions;
含义:用来设置MQTTClient的连接选项的结构体。
MQTTClient_message
定义:
typedef struct{ char struct_id[4];//结构体的识别序列,必须为MQTM int struct_version;//结构体的版本,必须为0 int payloadlen;//MQTT信息的长度 void* payload;//指向消息负载的指针 int qos;//服务质量 int retained;//保留标志 int dup;dup//标志指示这个消息是否是重复的。 只有在收到QoS1消息时才有意义。 如果为true,则客户端应用程序应采取适当的措施来处理重复的消息。 int msgid;//消息标识符通常保留供MQTT客户端和服务器内部使用。} MQTTClient_message;
含义:代表MQTT信息的结构体。
相关函数详解
------MQTTClient_create
定义:
DLLExport int MQTTClient_create( MQTTClient * handle, const char * serverURI, const char * clientId, int persistence_type, void * persistence_context )
作用:该函数创建了一个用于连接到特定服务器,使用特定持久存储的MQTT客户端。
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。