summaryrefslogtreecommitdiff
path: root/lib/mqtt.c
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mqtt.c')
-rw-r--r--lib/mqtt.c141
1 files changed, 73 insertions, 68 deletions
diff --git a/lib/mqtt.c b/lib/mqtt.c
index 71a00cfc2..2134409cd 100644
--- a/lib/mqtt.c
+++ b/lib/mqtt.c
@@ -5,7 +5,7 @@
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
- * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
+ * Copyright (C) 2020 - 2021, Daniel Stenberg, <daniel@haxx.se>, et al.
* Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
*
* This software is licensed as described in the file COPYING, which
@@ -59,10 +59,12 @@
* Forward declarations.
*/
-static CURLcode mqtt_do(struct connectdata *conn, bool *done);
-static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
-static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
-static CURLcode mqtt_setup_conn(struct connectdata *conn);
+static CURLcode mqtt_do(struct Curl_easy *data, bool *done);
+static CURLcode mqtt_doing(struct Curl_easy *data, bool *done);
+static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn,
+ curl_socket_t *sock);
+static CURLcode mqtt_setup_conn(struct Curl_easy *data,
+ struct connectdata *conn);
/*
* MQTT protocol handler.
@@ -90,12 +92,13 @@ const struct Curl_handler Curl_handler_mqtt = {
PROTOPT_NONE /* flags */
};
-static CURLcode mqtt_setup_conn(struct connectdata *conn)
+static CURLcode mqtt_setup_conn(struct Curl_easy *data,
+ struct connectdata *conn)
{
/* allocate the HTTP-specific struct for the Curl_easy, only to survive
during this request */
struct MQTT *mq;
- struct Curl_easy *data = conn->data;
+ (void)conn;
DEBUGASSERT(data->req.p.mqtt == NULL);
mq = calloc(1, sizeof(struct MQTT));
@@ -105,15 +108,15 @@ static CURLcode mqtt_setup_conn(struct connectdata *conn)
return CURLE_OK;
}
-static CURLcode mqtt_send(struct connectdata *conn,
+static CURLcode mqtt_send(struct Curl_easy *data,
char *buf, size_t len)
{
CURLcode result = CURLE_OK;
+ struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
- struct Curl_easy *data = conn->data;
struct MQTT *mq = data->req.p.mqtt;
ssize_t n;
- result = Curl_write(conn, sockfd, buf, len, &n);
+ result = Curl_write(data, sockfd, buf, len, &n);
if(!result)
Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
if(len != (size_t)n) {
@@ -130,14 +133,16 @@ static CURLcode mqtt_send(struct connectdata *conn,
/* Generic function called by the multi interface to figure out what socket(s)
to wait for and for what actions during the DOING and PROTOCONNECT
states */
-static int mqtt_getsock(struct connectdata *conn,
+static int mqtt_getsock(struct Curl_easy *data,
+ struct connectdata *conn,
curl_socket_t *sock)
{
+ (void)data;
sock[0] = conn->sock[FIRSTSOCKET];
return GETSOCK_READSOCK(FIRSTSOCKET);
}
-static CURLcode mqtt_connect(struct connectdata *conn)
+static CURLcode mqtt_connect(struct Curl_easy *data)
{
CURLcode result = CURLE_OK;
const size_t client_id_offset = 14;
@@ -157,31 +162,31 @@ static CURLcode mqtt_connect(struct connectdata *conn)
packet[1] = (packetlen - 2) & 0x7f;
packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
- result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen],
+ result = Curl_rand_hex(data, (unsigned char *)&client_id[clen],
MQTT_CLIENTID_LEN - clen + 1);
memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
- infof(conn->data, "Using client id '%s'\n", client_id);
+ infof(data, "Using client id '%s'\n", client_id);
if(!result)
- result = mqtt_send(conn, packet, packetlen);
+ result = mqtt_send(data, packet, packetlen);
return result;
}
-static CURLcode mqtt_disconnect(struct connectdata *conn)
+static CURLcode mqtt_disconnect(struct Curl_easy *data)
{
CURLcode result = CURLE_OK;
- result = mqtt_send(conn, (char *)"\xe0\x00", 2);
+ result = mqtt_send(data, (char *)"\xe0\x00", 2);
return result;
}
-static CURLcode mqtt_verify_connack(struct connectdata *conn)
+static CURLcode mqtt_verify_connack(struct Curl_easy *data)
{
CURLcode result;
+ struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
unsigned char readbuf[MQTT_CONNACK_LEN];
ssize_t nread;
- struct Curl_easy *data = conn->data;
- result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
+ result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
if(result)
goto fail;
@@ -204,18 +209,18 @@ fail:
return result;
}
-static CURLcode mqtt_get_topic(struct connectdata *conn,
+static CURLcode mqtt_get_topic(struct Curl_easy *data,
char **topic, size_t *topiclen)
{
CURLcode result = CURLE_OK;
- char *path = conn->data->state.up.path;
+ char *path = data->state.up.path;
if(strlen(path) > 1) {
- result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen,
+ result = Curl_urldecode(data, path + 1, 0, topic, topiclen,
REJECT_NADA);
}
else {
- failf(conn->data, "Error: No topic specified.");
+ failf(data, "Error: No topic specified.");
result = CURLE_URL_MALFORMAT;
}
return result;
@@ -238,7 +243,7 @@ static int mqtt_encode_len(char *buf, size_t len)
return i;
}
-static CURLcode mqtt_subscribe(struct connectdata *conn)
+static CURLcode mqtt_subscribe(struct Curl_easy *data)
{
CURLcode result = CURLE_OK;
char *topic = NULL;
@@ -247,8 +252,9 @@ static CURLcode mqtt_subscribe(struct connectdata *conn)
size_t packetlen;
char encodedsize[4];
size_t n;
+ struct connectdata *conn = data->conn;
- result = mqtt_get_topic(conn, &topic, &topiclen);
+ result = mqtt_get_topic(data, &topic, &topiclen);
if(result)
goto fail;
@@ -274,7 +280,7 @@ static CURLcode mqtt_subscribe(struct connectdata *conn)
memcpy(&packet[5 + n], topic, topiclen);
packet[5 + n + topiclen] = 0; /* QoS zero */
- result = mqtt_send(conn, (char *)packet, packetlen);
+ result = mqtt_send(data, (char *)packet, packetlen);
fail:
free(topic);
@@ -285,19 +291,20 @@ fail:
/*
* Called when the first byte was already read.
*/
-static CURLcode mqtt_verify_suback(struct connectdata *conn)
+static CURLcode mqtt_verify_suback(struct Curl_easy *data)
{
CURLcode result;
+ struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
unsigned char readbuf[MQTT_SUBACK_LEN];
ssize_t nread;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
- result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
+ result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
if(result)
goto fail;
- Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
/* fixme */
if(nread < MQTT_SUBACK_LEN) {
@@ -315,10 +322,10 @@ fail:
return result;
}
-static CURLcode mqtt_publish(struct connectdata *conn)
+static CURLcode mqtt_publish(struct Curl_easy *data)
{
CURLcode result;
- char *payload = conn->data->set.postfields;
+ char *payload = data->set.postfields;
size_t payloadlen;
char *topic = NULL;
size_t topiclen;
@@ -327,7 +334,7 @@ static CURLcode mqtt_publish(struct connectdata *conn)
size_t remaininglength;
size_t encodelen;
char encodedbytes[4];
- curl_off_t postfieldsize = conn->data->set.postfieldsize;
+ curl_off_t postfieldsize = data->set.postfieldsize;
if(!payload)
return CURLE_BAD_FUNCTION_ARGUMENT;
@@ -336,7 +343,7 @@ static CURLcode mqtt_publish(struct connectdata *conn)
else
payloadlen = (size_t)postfieldsize;
- result = mqtt_get_topic(conn, &topic, &topiclen);
+ result = mqtt_get_topic(data, &topic, &topiclen);
if(result)
goto fail;
@@ -360,7 +367,7 @@ static CURLcode mqtt_publish(struct connectdata *conn)
i += topiclen;
memcpy(&pkt[i], payload, payloadlen);
i += payloadlen;
- result = mqtt_send(conn, (char *)pkt, i);
+ result = mqtt_send(data, (char *)pkt, i);
fail:
free(pkt);
@@ -403,13 +410,14 @@ static const char *statenames[]={
#endif
/* The only way to change state */
-static void mqstate(struct connectdata *conn,
+static void mqstate(struct Curl_easy *data,
enum mqttstate state,
enum mqttstate nextstate) /* used if state == FIRST */
{
+ struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
#ifdef CURLDEBUG
- infof(conn->data, "%s (from %s) (next is %s)\n",
+ infof(data, "%s (from %s) (next is %s)\n",
statenames[state],
statenames[mqtt->state],
(state == MQTT_FIRST)? statenames[nextstate] : "");
@@ -423,13 +431,12 @@ static void mqstate(struct connectdata *conn,
/* for the publish packet */
#define MQTT_HEADER_LEN 5 /* max 5 bytes */
-static CURLcode mqtt_read_publish(struct connectdata *conn,
- bool *done)
+static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done)
{
CURLcode result = CURLE_OK;
+ struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
ssize_t nread;
- struct Curl_easy *data = conn->data;
unsigned char *pkt = (unsigned char *)data->state.buffer;
size_t remlen;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
@@ -439,11 +446,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
switch(mqtt->state) {
MQTT_SUBACK_COMING:
case MQTT_SUBACK_COMING:
- result = mqtt_verify_suback(conn);
+ result = mqtt_verify_suback(data);
if(result)
break;
- mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
+ mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
break;
case MQTT_SUBACK:
@@ -451,9 +458,9 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
/* we are expecting PUBLISH or SUBACK */
packet = mq->firstbyte & 0xf0;
if(packet == MQTT_MSG_PUBLISH)
- mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
+ mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE);
else if(packet == MQTT_MSG_SUBACK) {
- mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
+ mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE);
goto MQTT_SUBACK_COMING;
}
else if(packet == MQTT_MSG_DISCONNECT) {
@@ -480,7 +487,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
size_t rest = mq->npacket;
if(rest > (size_t)data->set.buffer_size)
rest = (size_t)data->set.buffer_size;
- result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
+ result = Curl_read(data, sockfd, (char *)pkt, rest, &nread);
if(result) {
if(CURLE_AGAIN == result) {
infof(data, "EEEE AAAAGAIN\n");
@@ -500,13 +507,13 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
/* if QoS is set, message contains packet id */
- result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
+ result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread);
if(result)
goto end;
if(!mq->npacket)
/* no more PUBLISH payload, back to subscribe wait state */
- mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
+ mqstate(data, MQTT_FIRST, MQTT_PUBWAIT);
break;
}
default:
@@ -518,27 +525,25 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
return result;
}
-static CURLcode mqtt_do(struct connectdata *conn, bool *done)
+static CURLcode mqtt_do(struct Curl_easy *data, bool *done)
{
CURLcode result = CURLE_OK;
- struct Curl_easy *data = conn->data;
-
*done = FALSE; /* unconditionally */
- result = mqtt_connect(conn);
+ result = mqtt_connect(data);
if(result) {
failf(data, "Error %d sending MQTT CONN request", result);
return result;
}
- mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
+ mqstate(data, MQTT_FIRST, MQTT_CONNACK);
return CURLE_OK;
}
-static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
+static CURLcode mqtt_doing(struct Curl_easy *data, bool *done)
{
CURLcode result = CURLE_OK;
+ struct connectdata *conn = data->conn;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
- struct Curl_easy *data = conn->data;
struct MQTT *mq = data->req.p.mqtt;
ssize_t nread;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
@@ -550,7 +555,7 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
if(mq->nsend) {
/* send the remainder of an outgoing packet */
char *ptr = mq->sendleftovers;
- result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
+ result = mqtt_send(data, mq->sendleftovers, mq->nsend);
free(ptr);
if(result)
return result;
@@ -560,17 +565,17 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
switch(mqtt->state) {
case MQTT_FIRST:
/* Read the initial byte only */
- result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
+ result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread);
if(!nread)
break;
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
/* remember the first byte */
mq->npacket = 0;
- mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
+ mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
/* FALLTHROUGH */
case MQTT_REMAINING_LENGTH:
do {
- result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
+ result = Curl_read(data, sockfd, (char *)&byte, 1, &nread);
if(!nread)
break;
Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
@@ -581,10 +586,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
mq->npacket = 0;
if(mq->remaining_length) {
- mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
+ mqstate(data, mqtt->nextstate, MQTT_NOSTATE);
break;
}
- mqstate(conn, MQTT_FIRST, MQTT_FIRST);
+ mqstate(data, MQTT_FIRST, MQTT_FIRST);
if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
infof(data, "Got DISCONNECT\n");
@@ -592,22 +597,22 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
}
break;
case MQTT_CONNACK:
- result = mqtt_verify_connack(conn);
+ result = mqtt_verify_connack(data);
if(result)
break;
- if(conn->data->state.httpreq == HTTPREQ_POST) {
- result = mqtt_publish(conn);
+ if(data->state.httpreq == HTTPREQ_POST) {
+ result = mqtt_publish(data);
if(!result) {
- result = mqtt_disconnect(conn);
+ result = mqtt_disconnect(data);
*done = TRUE;
}
mqtt->nextstate = MQTT_FIRST;
}
else {
- result = mqtt_subscribe(conn);
+ result = mqtt_subscribe(data);
if(!result) {
- mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
+ mqstate(data, MQTT_FIRST, MQTT_SUBACK);
}
}
break;
@@ -615,11 +620,11 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
case MQTT_SUBACK:
case MQTT_PUBWAIT:
case MQTT_PUB_REMAIN:
- result = mqtt_read_publish(conn, done);
+ result = mqtt_read_publish(data, done);
break;
default:
- failf(conn->data, "State not handled yet");
+ failf(data, "State not handled yet");
*done = TRUE;
break;
}