Skip to content

Commit

Permalink
MQTT 5.0 connect and connack eclipse-paho#417
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Craggs committed Mar 8, 2018
1 parent 0e21156 commit 6a1f39f
Show file tree
Hide file tree
Showing 18 changed files with 1,632 additions and 266 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ SYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_C}}
SAMPLE_FILES_A = paho_c_pub paho_c_sub MQTTAsync_subscribe MQTTAsync_publish
ASYNC_SAMPLES = ${addprefix ${blddir}/samples/,${SAMPLE_FILES_A}}

TEST_FILES_C = test1 test2 sync_client_test test_mqtt4sync
TEST_FILES_C = test1 test15 test2 sync_client_test test_mqtt4sync
SYNC_TESTS = ${addprefix ${blddir}/test/,${TEST_FILES_C}}

TEST_FILES_CS = test3
Expand Down
3 changes: 1 addition & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ SET(common_src
SocketBuffer.c
Heap.c
LinkedList.c
MQTTV5Packet.c
MQTTV5Properties.c
MQTTProperties.c
)

IF (WIN32)
Expand Down
13 changes: 8 additions & 5 deletions src/MQTTAsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* Ian Craggs - auto reconnect timing fix #218
* Ian Craggs - fix for issue #190
* Ian Craggs - check for NULL SSL options #334
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/

/**
Expand Down Expand Up @@ -1237,9 +1238,11 @@ static int MQTTAsync_processCommand(void)

Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, command->command.details.conn.MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->command.details.conn.MQTTVersion);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->client->ssl, command->command.details.conn.MQTTVersion,
NULL, NULL);
#else
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion);
rc = MQTTProtocol_connect(serverURI, command->client->c, command->command.details.conn.MQTTVersion,
NULL, NULL);
#endif
if (command->client->c->connect_state == 0)
rc = SOCKET_ERROR;
Expand Down Expand Up @@ -2921,7 +2924,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
{
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, NULL, NULL) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
Expand All @@ -2940,7 +2943,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
{
#endif
m->c->connect_state = 3; /* TCP/SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, NULL, NULL)) == SOCKET_ERROR)
goto exit;
#if defined(OPENSSL)
}
Expand All @@ -2956,7 +2959,7 @@ static int MQTTAsync_connecting(MQTTAsyncs* m)
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* SSL connect completed, in which case send the MQTT connect packet */
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion)) == SOCKET_ERROR)
if ((rc = MQTTPacket_send_connect(m->c, m->connect.details.conn.MQTTVersion, NULL, NULL)) == SOCKET_ERROR)
goto exit;
}
#endif
Expand Down
84 changes: 55 additions & 29 deletions src/MQTTClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* Ian Craggs - binary will message support
* Ian Craggs - waitforCompletion fix #240
* Ian Craggs - check for NULL SSL options #334
* Ian Craggs - MQTT 5.0 support
*******************************************************************************/

/**
Expand Down Expand Up @@ -284,11 +285,13 @@ static thread_return_type WINAPI MQTTClient_run(void* n);
static void MQTTClient_stop(void);
static void MQTTClient_closeSession(Clients* client);
static int MQTTClient_cleanSession(Clients* client);
static int MQTTClient_connectURIVersion(
static MQTTResponse MQTTClient_connectURIVersion(
MQTTClient handle, MQTTClient_connectOptions* options,
const char* serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout);
static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI);
START_TIME_TYPE start, long millisecsTimeout,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
MQTTProperties* connectProperties, MQTTProperties* willProperties);
static int MQTTClient_disconnect1(MQTTClient handle, int timeout, int internal, int stop);
static int MQTTClient_disconnect_internal(MQTTClient handle, int timeout);
static void MQTTClient_retry(void);
Expand Down Expand Up @@ -848,12 +851,13 @@ void Protocol_processPublication(Publish* publish, Clients* client)
}


static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout)
static MQTTResponse MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI, int MQTTVersion,
START_TIME_TYPE start, long millisecsTimeout, MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
int sessionPresent = 0;
MQTTResponse resp = {SOCKET_ERROR, NULL};

FUNC_ENTRY;
if (m->ma && !running)
Expand All @@ -869,9 +873,9 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt

Log(TRACE_MIN, -1, "Connecting to serverURI %s with MQTT version %d", serverURI, MQTTVersion);
#if defined(OPENSSL)
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, MQTTVersion);
rc = MQTTProtocol_connect(serverURI, m->c, m->ssl, MQTTVersion, connectProperties, willProperties);
#else
rc = MQTTProtocol_connect(serverURI, m->c, MQTTVersion);
rc = MQTTProtocol_connect(serverURI, m->c, MQTTVersion, connectProperties, willProperties);
#endif
if (rc == SOCKET_ERROR)
goto exit;
Expand Down Expand Up @@ -923,7 +927,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
{
rc = MQTTCLIENT_SUCCESS;
m->c->connect_state = 3;
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
Expand All @@ -942,7 +946,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
{
#endif
m->c->connect_state = 3; /* TCP connect completed, in which case send the MQTT connect packet */
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
Expand All @@ -966,7 +970,7 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
if(!m->c->cleansession && m->c->session == NULL)
m->c->session = SSL_get1_session(m->c->net.ssl);
m->c->connect_state = 3; /* TCP connect completed, in which case send the MQTT connect packet */
if (MQTTPacket_send_connect(m->c, MQTTVersion) == SOCKET_ERROR)
if (MQTTPacket_send_connect(m->c, MQTTVersion, connectProperties, willProperties) == SOCKET_ERROR)
{
rc = SOCKET_ERROR;
goto exit;
Expand Down Expand Up @@ -1009,6 +1013,8 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
if (m->c->connected != 1)
rc = MQTTCLIENT_DISCONNECTED;
}
if (m->c->MQTTVersion == MQTTVERSION_5)
resp.properties = &connack->properties;
}
free(connack);
m->pack = NULL;
Expand All @@ -1026,8 +1032,10 @@ static int MQTTClient_connectURIVersion(MQTTClient handle, MQTTClient_connectOpt
}
else
MQTTClient_disconnect1(handle, 0, 0, (MQTTVersion == 3)); /* don't want to call connection lost */
FUNC_EXIT_RC(rc);
return rc;

resp.reasonCode = rc;
FUNC_EXIT_RC(resp.reasonCode);
return resp;
}

static int retryLoopInterval = 5;
Expand All @@ -1045,12 +1053,13 @@ static void setRetryLoopInterval(int keepalive)
}


static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI)
static MQTTResponse MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* options, const char* serverURI,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
START_TIME_TYPE start;
long millisecsTimeout = 30000L;
int rc = SOCKET_ERROR;
MQTTResponse rc = {SOCKET_ERROR, NULL};
int MQTTVersion = 0;

FUNC_ENTRY;
Expand All @@ -1061,6 +1070,7 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o
setRetryLoopInterval(options->keepAliveInterval);
m->c->cleansession = options->cleansession;
m->c->maxInflightMessages = (options->reliable) ? 1 : 10;
m->c->MQTTVersion = options->MQTTVersion;

if (m->c->will)
{
Expand Down Expand Up @@ -1167,42 +1177,57 @@ static int MQTTClient_connectURI(MQTTClient handle, MQTTClient_connectOptions* o

if (MQTTVersion == MQTTVERSION_DEFAULT)
{
if ((rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout)) != MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout);
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 4, start, millisecsTimeout,
connectProperties, willProperties);
if (rc.reasonCode != MQTTCLIENT_SUCCESS)
{
rc = MQTTClient_connectURIVersion(handle, options, serverURI, 3, start, millisecsTimeout,
connectProperties, willProperties);
}
}
else
rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout);
rc = MQTTClient_connectURIVersion(handle, options, serverURI, MQTTVersion, start, millisecsTimeout,
connectProperties, willProperties);

FUNC_EXIT_RC(rc);
FUNC_EXIT_RC(rc.reasonCode);
return rc;
}


int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
MQTTResponse response = MQTTClient_connect5(handle, options, NULL, NULL);

return response.reasonCode;
}


MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
MQTTProperties* connectProperties, MQTTProperties* willProperties)
{
MQTTClients* m = handle;
int rc = SOCKET_ERROR;
MQTTResponse rc = {SOCKET_ERROR, NULL};

FUNC_ENTRY;
Thread_lock_mutex(connect_mutex);
Thread_lock_mutex(mqttclient_mutex);

if (options == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}

if (strncmp(options->struct_id, "MQTC", 4) != 0 || options->struct_version < 0 || options->struct_version > 5)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}

#if defined(OPENSSL)
if (m->ssl && options->ssl == NULL)
{
rc = MQTTCLIENT_NULL_PARAMETER;
rc.reasonCode = MQTTCLIENT_NULL_PARAMETER;
goto exit;
}
#endif
Expand All @@ -1211,7 +1236,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
if (strncmp(options->will->struct_id, "MQTW", 4) != 0 || (options->will->struct_version != 0 && options->will->struct_version != 1))
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
Expand All @@ -1222,7 +1247,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
{
if (strncmp(options->ssl->struct_id, "MQTS", 4) != 0 || options->ssl->struct_version < 0 || options->ssl->struct_version > 2)
{
rc = MQTTCLIENT_BAD_STRUCTURE;
rc.reasonCode = MQTTCLIENT_BAD_STRUCTURE;
goto exit;
}
}
Expand All @@ -1231,19 +1256,19 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
if ((options->username && !UTF8_validateString(options->username)) ||
(options->password && !UTF8_validateString(options->password)))
{
rc = MQTTCLIENT_BAD_UTF8_STRING;
rc.reasonCode = MQTTCLIENT_BAD_UTF8_STRING;
goto exit;
}

if (options->MQTTVersion != MQTTVERSION_DEFAULT &&
(options->MQTTVersion < MQTTVERSION_3_1 || options->MQTTVersion > MQTTVERSION_5))
{
rc = MQTTCLIENT_BAD_MQTT_VERSION;
rc.reasonCode = MQTTCLIENT_BAD_MQTT_VERSION;
goto exit;
}

if (options->struct_version < 2 || options->serverURIcount == 0)
rc = MQTTClient_connectURI(handle, options, m->serverURI);
rc = MQTTClient_connectURI(handle, options, m->serverURI, connectProperties, willProperties);
else
{
int i;
Expand All @@ -1261,7 +1286,8 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
m->ssl = 1;
}
#endif
if ((rc = MQTTClient_connectURI(handle, options, serverURI)) == MQTTCLIENT_SUCCESS)
rc = MQTTClient_connectURI(handle, options, serverURI, connectProperties, willProperties);
if (rc.reasonCode == MQTTCLIENT_SUCCESS)
break;
}
}
Expand All @@ -1278,7 +1304,7 @@ int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options)
}
Thread_unlock_mutex(mqttclient_mutex);
Thread_unlock_mutex(connect_mutex);
FUNC_EXIT_RC(rc);
FUNC_EXIT_RC(rc.reasonCode);
return rc;
}

Expand Down
12 changes: 12 additions & 0 deletions src/MQTTClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@
/// @endcond
*/

#include "MQTTProperties.h"
#include "MQTTReasonCodes.h"
#if !defined(NO_PERSISTENCE)
#include "MQTTClientPersistence.h"
#endif
Expand Down Expand Up @@ -753,6 +755,16 @@ DLLExport MQTTClient_nameValue* MQTTClient_getVersionInfo(void);
*/
DLLExport int MQTTClient_connect(MQTTClient handle, MQTTClient_connectOptions* options);


typedef struct MQTTResponse
{
enum MQTTReasonCodes reasonCode;
MQTTProperties* properties; /* optional */
} MQTTResponse;

DLLExport MQTTResponse MQTTClient_connect5(MQTTClient handle, MQTTClient_connectOptions* options,
MQTTProperties* connectProperties, MQTTProperties* willProperties);

/**
* This function attempts to disconnect the client from the MQTT
* server. In order to allow the client time to complete handling of messages
Expand Down
Loading

0 comments on commit 6a1f39f

Please sign in to comment.