[聚合文章] Paho -物联网 MQTT C Cient的实现和详解

消息系统 2018-01-05 24 阅读

概述

  在文章Paho - MQTT C Cient的实现中,我介绍了如何使用Paho开源项目创建MQTTClient_pulish客户端。但只是简单的介绍了使用方法,而且客户端的结果与之前介绍的并不吻合,今天我就结合新的例子,给大家讲解一下Paho使用MQTT客户端的主要过程。
  如同前面介绍的,MQTT客户端分为同步客户端和异步客户端。今天主要讲解的是同步客户端,结构还是如同步客户端中介绍的:

  1.创建一个客户端对象;
  2.设置连接MQTT服务器的选项;
  3.如果多线程(异步模式)操作被使用则设置回调函数(详见 Asynchronous >vs synchronous client applications);
  4.订阅客户端需要接收的任意话题;
  5.重复以下操作直到结束:
    a.发布客户端需要的任意信息;
    b.处理所有接收到的信息;
  6.断开客户端连接;
  7.释放客户端使用的所有内存。

实现

  好,直接上代码,MQTT简单的同步客户端。

#include <pthread.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include "MQTTClient.h"#if !defined(WIN32)#include <unistd.h>#else#include <windows.h>#endif#define NUM_THREADS 2#define ADDRESS     "tcp://localhost:1883" //更改此处地址#define CLIENTID    "aaabbbccc_pub" //更改此处客户端ID#define SUB_CLIENTID    "aaabbbccc_sub" //更改此处客户端ID#define TOPIC       "topic01"  //更改发送的话题#define PAYLOAD     "Hello Man, Can you see me ?!" //#define QOS         1#define TIMEOUT     10000L#define USERNAME    "test_user"#define PASSWORD    "jim777"#define DISCONNECT  "out"int CONNECT = 1;volatile MQTTClient_deliveryToken deliveredtoken;void delivered(void *context, MQTTClient_deliveryToken dt){    printf("Message with token value %d delivery confirmed\n", dt);    deliveredtoken = dt;}int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message){    int i;    char* payloadptr;    printf("Message arrived\n");    printf("     topic: %s\n", topicName);    printf("   message: ");    payloadptr = message->payload;    if(strcmp(payloadptr, DISCONNECT) == 0){        printf(" \n out!!");        CONNECT = 0;    }        for(i=0; i<message->payloadlen; i++)    {        putchar(*payloadptr++);    }    printf("\n");        MQTTClient_freeMessage(&message);    MQTTClient_free(topicName);    return 1;}void connlost(void *context, char *cause){    printf("\nConnection lost\n");    printf("     cause: %s\n", cause);}void *subClient(void *threadid){   long tid;   tid = (long)threadid;   printf("Hello World! It's me, thread #%ld!\n", tid);       MQTTClient client;    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;    int rc;    int ch;    MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,        MQTTCLIENT_PERSISTENCE_NONE, NULL);    conn_opts.keepAliveInterval = 20;    conn_opts.cleansession = 1;    conn_opts.username = USERNAME;    conn_opts.password = PASSWORD;        MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)    {        printf("Failed to connect, return code %d\n", rc);        exit(EXIT_FAILURE);    }    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"           "Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);    MQTTClient_subscribe(client, TOPIC, QOS);    do     {        ch = getchar();    } while(ch!='Q' && ch != 'q');    MQTTClient_unsubscribe(client, TOPIC);    MQTTClient_disconnect(client, 10000);    MQTTClient_destroy(&client);      pthread_exit(NULL);}void *pubClient(void *threadid){   long tid;   tid = (long)threadid;   int count = 0;   printf("Hello World! It's me, thread #%ld!\n", tid);//声明一个MQTTClient    MQTTClient client;    //初始化MQTT Client选项    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;    //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }    MQTTClient_message pubmsg = MQTTClient_message_initializer;    //声明消息token    MQTTClient_deliveryToken token;    int rc;    //使用参数创建一个client,并将其赋值给之前声明的client    MQTTClient_create(&client, ADDRESS, CLIENTID,        MQTTCLIENT_PERSISTENCE_NONE, NULL);    conn_opts.keepAliveInterval = 20;    conn_opts.cleansession = 1;    conn_opts.username = USERNAME;    conn_opts.password = PASSWORD;     //使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)    {        printf("Failed to connect, return code %d\n", rc);        exit(EXIT_FAILURE);    }    pubmsg.payload = PAYLOAD;    pubmsg.payloadlen = strlen(PAYLOAD);    pubmsg.qos = QOS;    pubmsg.retained = 0;    while(CONNECT){    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);    printf("Waiting for up to %d seconds for publication of %s\n"            "on topic %s for client with ClientID: %s\n",            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);    printf("Message with delivery token %d delivered\n", token);    usleep(3000000L);    }            MQTTClient_disconnect(client, 10000);    MQTTClient_destroy(&client);}int main(int argc, char* argv[]){    pthread_t threads[NUM_THREADS];    long t;    pthread_create(&threads[0], NULL, subClient, (void *)0);    pthread_create(&threads[1], NULL, pubClient, (void *)1);    pthread_exit(NULL);}

  在代码中,我创建了两个线程,分别用来处理订阅客户端和发布客户端。

整体详解

接下来我讲解一下这个简单的客户端,其中,大体的流程如下:
客户端大体流程
  大体的流程如图所示,在客户端启动之后,会启动线程,创建一个订阅客户端,它会监听消息的到达,在消息到达之后会触发相应的回调函数以对消息进行处理;后在启动一个线程,创建一个发送客户端,用来发送消息的,每次发送消息之前会判断是否要掉线,如CONNECT=0则会掉线,否则发送消息给topic01。
订阅客户端详解
-------
  以下函数完成的是订阅的功能。

void *subClient(void *threadid)

过程大概如下:

  第一步:声明客户端,并通过函数给其赋值;

MQTTClient client;MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

  第二步:设置连接MQTT服务器的选项;

MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

  第三步:设置回调函数;

MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);//相应的回调函数connlost,msgarrvd,delivered我的代码中都有

  第四步:使用客户端和连接选项连接服务器;

MQTTClient_connect(client, &conn_opts))

  第五步订阅话题;

MQTTClient_subscribe(client, TOPIC, QOS);

  第六步一直等待,知道输入'Q' 或'q';

    do     {        ch = getchar();    } while(ch!='Q' && ch != 'q');

  第六步一直等待,直到输入'Q' 或'q';

    do     {        ch = getchar();    } while(ch!='Q' && ch != 'q');

  第七步取消订阅;

MQTTClient_unsubscribe(client, TOPIC);

  第八步.断开客户端连接;

 MQTTClient_disconnect(client, 10000);

  第九步.释放客户端使用的所有内存;

MQTTClient_destroy(&client);

  至此,订阅客户端就结束了。一般订阅客户端的大体结构都是这样。不同的是回调函数的个性化上。
发送客户端详解
-------
  以下函数完成的是发送的功能。

void *pubClient(void *threadid)

过程大概如下:

  第一步:声明客户端,并通过函数给其赋值;

MQTTClient client;MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

  第二步:设置连接MQTT服务器的选项;

MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;

  第三步:使用客户端和连接选项连接服务器;

MQTTClient_connect(client, &conn_opts)

  第四步设置发送消息的属性;

    pubmsg.payload = PAYLOAD;    pubmsg.payloadlen = strlen(PAYLOAD);    pubmsg.qos = QOS;    pubmsg.retained = 0;

  第五步循环发送消息;

   MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);

  第六步一直等待,当CONNECT=0时退出该客户端;

  第七步.断开客户端连接;

    MQTTClient_disconnect(client, 10000);

  第八步.释放客户端使用的所有内存;

 MQTTClient_destroy(&client);

  至此,发送客户端就结束了。一般的发送客户端大体结构也如此,但异步客户端可能有些许不同,无非就是设计回调函数,然后在连接,断开连接等时可以使用回调函数做一些操作而已,具体的可以自己研究。


  为了让大家能够更深入了解,我把自己学到的一些函数和结构体大致在下面讲解了一下。
相关结构体
-----

MQTTClient

定义:typedef void* MQTTClient;
含义:代表MQTT客户端的句柄。成功调用MQTTClient_create()后,可以得到有效的客户端句柄。


MQTTClient_connectOptions

定义:

typedef struct{char struct_id[4];//结构体的识别序列,必须为MQTCint struct_version;//结构体版本/**在0,1,2,3,4,5中取值:0-表示没有SSL选项且没有serverURIs;1-表示没有serverURIs;2-表示没有MQTTVersion3-表示没有返回值;4-表示没有二进制密码选项*/int keepAliveInterval;/**在这段时间内没有数据相关的消息时,客户端发送一个非常小的MQTT“ping”消息,服务器将会确认这个消息*/int cleansession;/**当cleansession为true时,会话状态信息在连接和断开连接时被丢弃。 将cleansession设置为false将保留会话状态信息*/int reliable;/*将该值设置为true意味着必须完成发布的消息(已收到确认),才能发送另一个消息*/MQTTClient_willOptions* will;/*如果程序不使用最后的意愿和遗嘱功能,请将此指针设置为NULL。*/const char* username;//用户名const char* password;//密码int connectTimeout;//允许尝试连接的过时时间int retryInterval;//尝试重连的时间MQTTClient_SSLOptions* ssl;/*如果程序不使用最后的ssl,请将此指针设置为NULL。*/int serverURIcount;char* const* serverURIs;/*连接服务器的url,以protocol:// host:port为格式*/int MQTTVersion;/*MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4) */struct{const char* serverURI;   int MQTTVersion;     int sessionPresent;  } returned;  struct {  int len;            const void* data;  } binarypwd;} MQTTClient_connectOptions;

含义:用来设置MQTTClient的连接选项的结构体。


MQTTClient_message

定义:

typedef struct{    char struct_id[4];//结构体的识别序列,必须为MQTM    int struct_version;//结构体的版本,必须为0    int payloadlen;//MQTT信息的长度    void* payload;//指向消息负载的指针    int qos;//服务质量    int retained;//保留标志    int dup;dup//标志指示这个消息是否是重复的。 只有在收到QoS1消息时才有意义。 如果为true,则客户端应用程序应采取适当的措施来处理重复的消息。    int msgid;//消息标识符通常保留供MQTT客户端和服务器内部使用。} MQTTClient_message;

含义:代表MQTT信息的结构体。
相关函数详解
------

MQTTClient_create

定义:

DLLExport int MQTTClient_create(            MQTTClient *    handle,        const char *    serverURI,        const char *    clientId,        int     persistence_type,        void *      persistence_context     ) 

作用:该函数创建了一个用于连接到特定服务器,使用特定持久存储的MQTT客户端。

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。