summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorDaniel Stenberg <daniel@haxx.se>2020-04-16 13:20:52 +0200
committerDaniel Stenberg <daniel@haxx.se>2020-04-20 08:09:33 +0200
commit5811beba391baefad41cd6f8f4fa4e3862098813 (patch)
tree572fc2d609408300d6d4946476d5f2f41d09e96e /lib
parentd1a2816b4128faa8ebc50ce93285c7364652856e (diff)
downloadcurl-5811beba391baefad41cd6f8f4fa4e3862098813.tar.gz
mqtt: improve the state machine
To handle PUBLISH before SUBACK and more. Updated the existing tests and added three new ones. Reported-by: Christoph Krey Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html Closes #5246
Diffstat (limited to 'lib')
-rw-r--r--lib/mqtt.c180
-rw-r--r--lib/mqtt.h24
2 files changed, 139 insertions, 65 deletions
diff --git a/lib/mqtt.c b/lib/mqtt.c
index 3e244694d..35c1b3e83 100644
--- a/lib/mqtt.c
+++ b/lib/mqtt.c
@@ -51,8 +51,8 @@
#define MQTT_MSG_SUBACK 0x90
#define MQTT_MSG_DISCONNECT 0xe0
-#define MQTT_CONNACK_LEN 4
-#define MQTT_SUBACK_LEN 5
+#define MQTT_CONNACK_LEN 2
+#define MQTT_SUBACK_LEN 3
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
/*
@@ -194,13 +194,9 @@ static CURLcode mqtt_verify_connack(struct connectdata *conn)
}
/* verify CONNACK */
- if(readbuf[0] != MQTT_MSG_CONNACK ||
- readbuf[1] != 0x02 ||
- readbuf[2] != 0x00 ||
- readbuf[3] != 0x00) {
- failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x",
- MQTT_MSG_CONNACK, 0x02, 0x00, 0x00,
- readbuf[0], readbuf[1], readbuf[2], readbuf[3]);
+ if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
+ failf(data, "Expected %02x%02x but got %02x%02x",
+ 0x00, 0x00, readbuf[0], readbuf[1]);
result = CURLE_WEIRD_SERVER_REPLY;
}
@@ -285,6 +281,9 @@ fail:
return result;
}
+/*
+ * Called when the first byte was already read.
+ */
static CURLcode mqtt_verify_suback(struct connectdata *conn)
{
CURLcode result;
@@ -307,11 +306,9 @@ static CURLcode mqtt_verify_suback(struct connectdata *conn)
}
/* verify SUBACK */
- if(readbuf[0] != MQTT_MSG_SUBACK ||
- readbuf[1] != 0x03 ||
- readbuf[2] != ((mqtt->packetid >> 8) & 0xff) ||
- readbuf[3] != (mqtt->packetid & 0xff) ||
- readbuf[4] != 0x00)
+ if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
+ readbuf[1] != (mqtt->packetid & 0xff) ||
+ readbuf[2] != 0x00)
result = CURLE_WEIRD_SERVER_REPLY;
fail:
@@ -377,67 +374,97 @@ static size_t mqtt_decode_len(unsigned char *buf,
mult *= 128;
}
- *lenbytes = i;
+ if(lenbytes)
+ *lenbytes = i;
return len;
}
+#ifdef CURLDEBUG
+static const char *statenames[]={
+ "MQTT_FIRST",
+ "MQTT_REMAINING_LENGTH",
+ "MQTT_CONNACK",
+ "MQTT_SUBACK",
+ "MQTT_SUBACK_COMING",
+ "MQTT_PUBWAIT",
+ "MQTT_PUB_REMAIN"
+};
+#endif
+
+/* The only way to change state */
+static void mqstate(struct connectdata *conn,
+ enum mqttstate state,
+ enum mqttstate nextstate) /* used if state == FIRST */
+{
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+#ifdef CURLDEBUG
+ infof(conn->data, "%s (from %s) (next is %s)\n",
+ statenames[state],
+ statenames[mqtt->state],
+ (state == MQTT_FIRST)? statenames[nextstate] : "");
+#endif
+ mqtt->state = state;
+ if(state == MQTT_FIRST)
+ mqtt->nextstate = nextstate;
+}
+
+
/* for the publish packet */
#define MQTT_HEADER_LEN 5 /* max 5 bytes */
static CURLcode mqtt_read_publish(struct connectdata *conn,
bool *done)
{
- CURLcode result;
+ CURLcode result = CURLE_OK;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
ssize_t nread;
struct Curl_easy *data = conn->data;
unsigned char *pkt = (unsigned char *)data->state.buffer;
- size_t remlen, lenbytes;
+ size_t remlen;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.protop;
+ unsigned char packet;
switch(mqtt->state) {
- case MQTT_SUBWAIT:
- /* Read the initial byte and the entire Remaining Length field
- in this state */
- result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread);
+ MQTT_SUBACK_COMING:
+ case MQTT_SUBACK_COMING:
+ result = mqtt_verify_suback(conn);
if(result)
+ break;
+
+ mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
+ break;
+
+ case MQTT_SUBACK:
+ case MQTT_PUBWAIT:
+ /* we are expecting PUBLISH or SUBACK */
+ packet = mq->firstbyte & 0xf0;
+ if(packet == MQTT_MSG_PUBLISH)
+ mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
+ else if(packet == MQTT_MSG_SUBACK) {
+ mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
+ goto MQTT_SUBACK_COMING;
+ }
+ else if(packet == MQTT_MSG_DISCONNECT) {
+ infof(data, "Got DISCONNECT\n");
+ *done = TRUE;
goto end;
- if(data->set.verbose)
- Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1);
- /* we are expecting a PUBLISH message */
- if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) {
- if(pkt[0] == MQTT_MSG_DISCONNECT) {
- infof(data, "Got DISCONNECT\n");
- *done = TRUE;
- goto end;
- }
+ }
+ else {
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
- else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80))
- /* as long as the high bit is set in the length byte, we read one more
- byte, then get the remainder of the PUBLISH */
- mqtt->state = MQTT_SUB_REMAIN;
- mq->npacket++;
- if(mqtt->state == MQTT_SUBWAIT)
- return result;
/* -- switched state -- */
-
- /* remember the first byte */
- mq->firstbyte = pkt[0];
-
- remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes);
-
+ remlen = mq->remaining_length;
infof(data, "Remaining length: %zd bytes\n", remlen);
Curl_pgrsSetDownloadSize(data, remlen);
data->req.bytecount = 0;
data->req.size = remlen;
mq->npacket = remlen; /* get this many bytes */
/* FALLTHROUGH */
- case MQTT_SUB_REMAIN: {
+ case MQTT_PUB_REMAIN: {
/* read rest of packet, but no more. Cap to buffer size */
struct SingleRequest *k = &data->req;
size_t rest = mq->npacket;
@@ -450,6 +477,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
}
goto end;
}
+ if(!nread) {
+ infof(data, "server disconnected\n");
+ result = CURLE_PARTIAL_FILE;
+ goto end;
+ }
if(data->set.verbose)
Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
@@ -465,7 +497,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
if(!mq->npacket)
/* no more PUBLISH payload, back to subscribe wait state */
- mqtt->state = MQTT_SUBWAIT;
+ mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
break;
}
default:
@@ -481,7 +513,6 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done)
{
CURLcode result = CURLE_OK;
struct Curl_easy *data = conn->data;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
*done = FALSE; /* unconditionally */
@@ -490,7 +521,7 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done)
failf(data, "Error %d sending MQTT CONN request", result);
return result;
}
- mqtt->state = MQTT_CONNACK;
+ mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
return CURLE_OK;
}
@@ -500,6 +531,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct Curl_easy *data = conn->data;
struct MQTT *mq = data->req.protop;
+ ssize_t nread;
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+ unsigned char *pkt = (unsigned char *)data->state.buffer;
+ unsigned char byte;
*done = FALSE;
@@ -512,7 +547,41 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
return result;
}
+ infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
switch(mqtt->state) {
+ case MQTT_FIRST:
+ /* Read the initial byte only */
+ result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
+ if(result)
+ break;
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
+ /* remember the first byte */
+ mq->npacket = 0;
+ mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
+ /* FALLTHROUGH */
+ case MQTT_REMAINING_LENGTH:
+ do {
+ result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
+ if(result)
+ break;
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
+ pkt[mq->npacket++] = byte;
+ } while((byte & 0x80) && (mq->npacket < 4));
+ mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
+ mq->npacket = 0;
+ if(mq->remaining_length) {
+ mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
+ break;
+ }
+ mqstate(conn, MQTT_FIRST, MQTT_FIRST);
+
+ if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
+ infof(data, "Got DISCONNECT\n");
+ *done = TRUE;
+ }
+ break;
case MQTT_CONNACK:
result = mqtt_verify_connack(conn);
if(result)
@@ -524,24 +593,19 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
result = mqtt_disconnect(conn);
*done = TRUE;
}
+ mqtt->nextstate = MQTT_FIRST;
}
else {
result = mqtt_subscribe(conn);
- if(!result)
- mqtt->state = MQTT_SUBACK;
+ if(!result) {
+ mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
+ }
}
break;
case MQTT_SUBACK:
- result = mqtt_verify_suback(conn);
- if(result)
- break;
-
- mqtt->state = MQTT_SUBWAIT;
- break;
-
- case MQTT_SUBWAIT:
- case MQTT_SUB_REMAIN:
+ case MQTT_PUBWAIT:
+ case MQTT_PUB_REMAIN:
result = mqtt_read_publish(conn, done);
if(result)
break;
diff --git a/lib/mqtt.h b/lib/mqtt.h
index b5e447be5..155fbd60a 100644
--- a/lib/mqtt.h
+++ b/lib/mqtt.h
@@ -26,13 +26,22 @@
extern const struct Curl_handler Curl_handler_mqtt;
#endif
+enum mqttstate {
+ MQTT_FIRST, /* 0 */
+ MQTT_REMAINING_LENGTH, /* 1 */
+ MQTT_CONNACK, /* 2 */
+ MQTT_SUBACK, /* 3 */
+ MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
+ MQTT_PUBWAIT, /* 5 - wait for publish */
+ MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
+
+ MQTT_NOSTATE = 99 /* never an actual state */
+};
+
struct mqtt_conn {
- enum {
- MQTT_CONNACK,
- MQTT_SUBACK,
- MQTT_SUBWAIT, /* wait for subscribe response */
- MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */
- } state;
+ enum mqttstate state;
+ enum mqttstate nextstate; /* switch to this after remaining length is
+ done */
unsigned int packetid;
};
@@ -41,9 +50,10 @@ struct MQTT {
char *sendleftovers;
size_t nsend; /* size of sendleftovers */
- /* when receving a PUBLISH */
+ /* when receving */
size_t npacket; /* byte counter */
unsigned char firstbyte;
+ size_t remaining_length;
};
#endif /* HEADER_CURL_MQTT_H */