summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Stenberg <daniel@haxx.se>2020-04-16 13:20:52 +0200
committerDaniel Stenberg <daniel@haxx.se>2020-04-19 21:06:40 +0200
commit4c721fb379176181a853592e6e88d16607fcfcf0 (patch)
tree98ab7c5dc6f761affb45978b94cd45d0c47680cc
parent5cade4ffabc1616fd6a77e7c6ca13e1dbb896242 (diff)
downloadcurl-bagder/mqtt-fixups.tar.gz
mqtt: improve the state machinebagder/mqtt-fixups
To handle PUBLISH before SUBACK and more. Updated the existing tests and added three new ones. Reported-by: Christoph Krey Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html
-rw-r--r--lib/mqtt.c180
-rw-r--r--lib/mqtt.h24
-rw-r--r--tests/data/Makefile.inc2
-rw-r--r--tests/data/test11902
-rw-r--r--tests/data/test11912
-rw-r--r--tests/data/test11922
-rw-r--r--tests/data/test11932
-rw-r--r--tests/data/test119459
-rw-r--r--tests/data/test119563
-rw-r--r--tests/data/test119662
-rw-r--r--tests/server/mqttd.c103
11 files changed, 402 insertions, 99 deletions
diff --git a/lib/mqtt.c b/lib/mqtt.c
index 3e244694d..35c1b3e83 100644
--- a/lib/mqtt.c
+++ b/lib/mqtt.c
@@ -51,8 +51,8 @@
#define MQTT_MSG_SUBACK 0x90
#define MQTT_MSG_DISCONNECT 0xe0
-#define MQTT_CONNACK_LEN 4
-#define MQTT_SUBACK_LEN 5
+#define MQTT_CONNACK_LEN 2
+#define MQTT_SUBACK_LEN 3
#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
/*
@@ -194,13 +194,9 @@ static CURLcode mqtt_verify_connack(struct connectdata *conn)
}
/* verify CONNACK */
- if(readbuf[0] != MQTT_MSG_CONNACK ||
- readbuf[1] != 0x02 ||
- readbuf[2] != 0x00 ||
- readbuf[3] != 0x00) {
- failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x",
- MQTT_MSG_CONNACK, 0x02, 0x00, 0x00,
- readbuf[0], readbuf[1], readbuf[2], readbuf[3]);
+ if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
+ failf(data, "Expected %02x%02x but got %02x%02x",
+ 0x00, 0x00, readbuf[0], readbuf[1]);
result = CURLE_WEIRD_SERVER_REPLY;
}
@@ -285,6 +281,9 @@ fail:
return result;
}
+/*
+ * Called when the first byte was already read.
+ */
static CURLcode mqtt_verify_suback(struct connectdata *conn)
{
CURLcode result;
@@ -307,11 +306,9 @@ static CURLcode mqtt_verify_suback(struct connectdata *conn)
}
/* verify SUBACK */
- if(readbuf[0] != MQTT_MSG_SUBACK ||
- readbuf[1] != 0x03 ||
- readbuf[2] != ((mqtt->packetid >> 8) & 0xff) ||
- readbuf[3] != (mqtt->packetid & 0xff) ||
- readbuf[4] != 0x00)
+ if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
+ readbuf[1] != (mqtt->packetid & 0xff) ||
+ readbuf[2] != 0x00)
result = CURLE_WEIRD_SERVER_REPLY;
fail:
@@ -377,67 +374,97 @@ static size_t mqtt_decode_len(unsigned char *buf,
mult *= 128;
}
- *lenbytes = i;
+ if(lenbytes)
+ *lenbytes = i;
return len;
}
+#ifdef CURLDEBUG
+static const char *statenames[]={
+ "MQTT_FIRST",
+ "MQTT_REMAINING_LENGTH",
+ "MQTT_CONNACK",
+ "MQTT_SUBACK",
+ "MQTT_SUBACK_COMING",
+ "MQTT_PUBWAIT",
+ "MQTT_PUB_REMAIN"
+};
+#endif
+
+/* The only way to change state */
+static void mqstate(struct connectdata *conn,
+ enum mqttstate state,
+ enum mqttstate nextstate) /* used if state == FIRST */
+{
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+#ifdef CURLDEBUG
+ infof(conn->data, "%s (from %s) (next is %s)\n",
+ statenames[state],
+ statenames[mqtt->state],
+ (state == MQTT_FIRST)? statenames[nextstate] : "");
+#endif
+ mqtt->state = state;
+ if(state == MQTT_FIRST)
+ mqtt->nextstate = nextstate;
+}
+
+
/* for the publish packet */
#define MQTT_HEADER_LEN 5 /* max 5 bytes */
static CURLcode mqtt_read_publish(struct connectdata *conn,
bool *done)
{
- CURLcode result;
+ CURLcode result = CURLE_OK;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
ssize_t nread;
struct Curl_easy *data = conn->data;
unsigned char *pkt = (unsigned char *)data->state.buffer;
- size_t remlen, lenbytes;
+ size_t remlen;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct MQTT *mq = data->req.protop;
+ unsigned char packet;
switch(mqtt->state) {
- case MQTT_SUBWAIT:
- /* Read the initial byte and the entire Remaining Length field
- in this state */
- result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread);
+ MQTT_SUBACK_COMING:
+ case MQTT_SUBACK_COMING:
+ result = mqtt_verify_suback(conn);
if(result)
+ break;
+
+ mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
+ break;
+
+ case MQTT_SUBACK:
+ case MQTT_PUBWAIT:
+ /* we are expecting PUBLISH or SUBACK */
+ packet = mq->firstbyte & 0xf0;
+ if(packet == MQTT_MSG_PUBLISH)
+ mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE);
+ else if(packet == MQTT_MSG_SUBACK) {
+ mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE);
+ goto MQTT_SUBACK_COMING;
+ }
+ else if(packet == MQTT_MSG_DISCONNECT) {
+ infof(data, "Got DISCONNECT\n");
+ *done = TRUE;
goto end;
- if(data->set.verbose)
- Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1);
- /* we are expecting a PUBLISH message */
- if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) {
- if(pkt[0] == MQTT_MSG_DISCONNECT) {
- infof(data, "Got DISCONNECT\n");
- *done = TRUE;
- goto end;
- }
+ }
+ else {
result = CURLE_WEIRD_SERVER_REPLY;
goto end;
}
- else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80))
- /* as long as the high bit is set in the length byte, we read one more
- byte, then get the remainder of the PUBLISH */
- mqtt->state = MQTT_SUB_REMAIN;
- mq->npacket++;
- if(mqtt->state == MQTT_SUBWAIT)
- return result;
/* -- switched state -- */
-
- /* remember the first byte */
- mq->firstbyte = pkt[0];
-
- remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes);
-
+ remlen = mq->remaining_length;
infof(data, "Remaining length: %zd bytes\n", remlen);
Curl_pgrsSetDownloadSize(data, remlen);
data->req.bytecount = 0;
data->req.size = remlen;
mq->npacket = remlen; /* get this many bytes */
/* FALLTHROUGH */
- case MQTT_SUB_REMAIN: {
+ case MQTT_PUB_REMAIN: {
/* read rest of packet, but no more. Cap to buffer size */
struct SingleRequest *k = &data->req;
size_t rest = mq->npacket;
@@ -450,6 +477,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
}
goto end;
}
+ if(!nread) {
+ infof(data, "server disconnected\n");
+ result = CURLE_PARTIAL_FILE;
+ goto end;
+ }
if(data->set.verbose)
Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
@@ -465,7 +497,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn,
if(!mq->npacket)
/* no more PUBLISH payload, back to subscribe wait state */
- mqtt->state = MQTT_SUBWAIT;
+ mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT);
break;
}
default:
@@ -481,7 +513,6 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done)
{
CURLcode result = CURLE_OK;
struct Curl_easy *data = conn->data;
- struct mqtt_conn *mqtt = &conn->proto.mqtt;
*done = FALSE; /* unconditionally */
@@ -490,7 +521,7 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done)
failf(data, "Error %d sending MQTT CONN request", result);
return result;
}
- mqtt->state = MQTT_CONNACK;
+ mqstate(conn, MQTT_FIRST, MQTT_CONNACK);
return CURLE_OK;
}
@@ -500,6 +531,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
struct mqtt_conn *mqtt = &conn->proto.mqtt;
struct Curl_easy *data = conn->data;
struct MQTT *mq = data->req.protop;
+ ssize_t nread;
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+ unsigned char *pkt = (unsigned char *)data->state.buffer;
+ unsigned char byte;
*done = FALSE;
@@ -512,7 +547,41 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
return result;
}
+ infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state);
switch(mqtt->state) {
+ case MQTT_FIRST:
+ /* Read the initial byte only */
+ result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread);
+ if(result)
+ break;
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1);
+ /* remember the first byte */
+ mq->npacket = 0;
+ mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE);
+ /* FALLTHROUGH */
+ case MQTT_REMAINING_LENGTH:
+ do {
+ result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread);
+ if(result)
+ break;
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1);
+ pkt[mq->npacket++] = byte;
+ } while((byte & 0x80) && (mq->npacket < 4));
+ mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL);
+ mq->npacket = 0;
+ if(mq->remaining_length) {
+ mqstate(conn, mqtt->nextstate, MQTT_NOSTATE);
+ break;
+ }
+ mqstate(conn, MQTT_FIRST, MQTT_FIRST);
+
+ if(mq->firstbyte == MQTT_MSG_DISCONNECT) {
+ infof(data, "Got DISCONNECT\n");
+ *done = TRUE;
+ }
+ break;
case MQTT_CONNACK:
result = mqtt_verify_connack(conn);
if(result)
@@ -524,24 +593,19 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
result = mqtt_disconnect(conn);
*done = TRUE;
}
+ mqtt->nextstate = MQTT_FIRST;
}
else {
result = mqtt_subscribe(conn);
- if(!result)
- mqtt->state = MQTT_SUBACK;
+ if(!result) {
+ mqstate(conn, MQTT_FIRST, MQTT_SUBACK);
+ }
}
break;
case MQTT_SUBACK:
- result = mqtt_verify_suback(conn);
- if(result)
- break;
-
- mqtt->state = MQTT_SUBWAIT;
- break;
-
- case MQTT_SUBWAIT:
- case MQTT_SUB_REMAIN:
+ case MQTT_PUBWAIT:
+ case MQTT_PUB_REMAIN:
result = mqtt_read_publish(conn, done);
if(result)
break;
diff --git a/lib/mqtt.h b/lib/mqtt.h
index b5e447be5..155fbd60a 100644
--- a/lib/mqtt.h
+++ b/lib/mqtt.h
@@ -26,13 +26,22 @@
extern const struct Curl_handler Curl_handler_mqtt;
#endif
+enum mqttstate {
+ MQTT_FIRST, /* 0 */
+ MQTT_REMAINING_LENGTH, /* 1 */
+ MQTT_CONNACK, /* 2 */
+ MQTT_SUBACK, /* 3 */
+ MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */
+ MQTT_PUBWAIT, /* 5 - wait for publish */
+ MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */
+
+ MQTT_NOSTATE = 99 /* never an actual state */
+};
+
struct mqtt_conn {
- enum {
- MQTT_CONNACK,
- MQTT_SUBACK,
- MQTT_SUBWAIT, /* wait for subscribe response */
- MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */
- } state;
+ enum mqttstate state;
+ enum mqttstate nextstate; /* switch to this after remaining length is
+ done */
unsigned int packetid;
};
@@ -41,9 +50,10 @@ struct MQTT {
char *sendleftovers;
size_t nsend; /* size of sendleftovers */
- /* when receving a PUBLISH */
+ /* when receving */
size_t npacket; /* byte counter */
unsigned char firstbyte;
+ size_t remaining_length;
};
#endif /* HEADER_CURL_MQTT_H */
diff --git a/tests/data/Makefile.inc b/tests/data/Makefile.inc
index aabe1e6d9..e474bbb9b 100644
--- a/tests/data/Makefile.inc
+++ b/tests/data/Makefile.inc
@@ -138,7 +138,7 @@ test1160 test1161 test1162 test1163 test1164 test1165 test1166 test1167 \
\
test1170 test1171 test1172 test1173 test1174 test1175 test1176 test1177 \
\
-test1190 test1191 test1192 test1193 \
+test1190 test1191 test1192 test1193 test1194 test1195 test1196 \
\
test1200 test1201 test1202 test1203 test1204 test1205 test1206 test1207 \
test1208 test1209 test1210 test1211 test1212 test1213 test1214 test1215 \
diff --git a/tests/data/test1190 b/tests/data/test1190
index 491f2b843..007a15013 100644
--- a/tests/data/test1190
+++ b/tests/data/test1190
@@ -46,7 +46,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
</strippart>
<protocol>
client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
client SUBSCRIBE 9 000100043131393000
server SUBACK 3 9003000100
server PUBLISH c 300c00043131393068656c6c6f0a
diff --git a/tests/data/test1191 b/tests/data/test1191
index fc8c68bb2..a36bc3113 100644
--- a/tests/data/test1191
+++ b/tests/data/test1191
@@ -42,7 +42,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
</strippart>
<protocol>
client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
client PUBLISH f 000431313931736f6d657468696e67
client DISCONNECT 0 e000
</protocol>
diff --git a/tests/data/test1192 b/tests/data/test1192
index 92b96c3fd..691c7783f 100644
--- a/tests/data/test1192
+++ b/tests/data/test1192
@@ -46,7 +46,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
</strippart>
<protocol>
client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
client SUBSCRIBE 80af3131393200
server SUBACK 3 9003000100
server PUBLISH 80d 308df3131393268656c6c6f0a
diff --git a/tests/data/test1193 b/tests/data/test1193
index 479ed5fe3..8da9abb21 100644
--- a/tests/data/test1193
+++ b/tests/data/test1193
@@ -64,7 +64,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/
</strippart>
<protocol>
client CONNECT 18 00044d5154540402003c000c6375726c
-server CONACK 2 20020000
+server CONNACK 2 20020000
client PUBLISH 7c
client DISCONNECT 0 e000
</protocol>
diff --git a/tests/data/test1194 b/tests/data/test1194
new file mode 100644
index 000000000..497891add
--- /dev/null
+++ b/tests/data/test1194
@@ -0,0 +1,59 @@
+<testcase>
+<info>
+<keywords>
+MQTT
+MQTT SUBSCRIBE
+</keywords>
+</info>
+
+#
+# Server-side
+<reply>
+<data nocheck="yes">
+hello
+</data>
+<datacheck hex="yes">
+00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a
+</datacheck>
+<servercmd>
+PUBLISH-before-SUBACK TRUE
+</servercmd>
+</reply>
+
+#
+# Client-side
+<client>
+<features>
+mqtt
+</features>
+<server>
+mqtt
+</server>
+<name>
+MQTT SUBSCRIBE with PUBLISH befoire SUBACK
+</name>
+<command option="binary-trace">
+mqtt://%HOSTIP:%MQTTPORT/1194
+</command>
+</client>
+
+#
+# Verify data after the test has been "shot"
+<verify>
+# These are hexadecimal protocol dumps from the client
+#
+# Strip out the random part of the client id from the CONNECT message
+# before comparison
+<strippart>
+s/^(.* 00044d5154540402003c000c6375726c).*/$1/
+</strippart>
+<protocol>
+client CONNECT 18 00044d5154540402003c000c6375726c
+server CONNACK 2 20020000
+client SUBSCRIBE 9 000100043131393400
+server PUBLISH c 300c00043131393468656c6c6f0a
+server SUBACK 3 9003000100
+server DISCONNECT 0 e000
+</protocol>
+</verify>
+</testcase>
diff --git a/tests/data/test1195 b/tests/data/test1195
new file mode 100644
index 000000000..0dfaccd53
--- /dev/null
+++ b/tests/data/test1195
@@ -0,0 +1,63 @@
+<testcase>
+<info>
+<keywords>
+MQTT
+MQTT SUBSCRIBE
+</keywords>
+</info>
+
+#
+# Server-side
+<reply>
+<data nocheck="yes">
+hello
+</data>
+<datacheck hex="yes">
+00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a
+</datacheck>
+<servercmd>
+PUBLISH-before-SUBACK TRUE
+short-PUBLISH TRUE
+</servercmd>
+</reply>
+
+#
+# Client-side
+<client>
+<features>
+mqtt
+</features>
+<server>
+mqtt
+</server>
+<name>
+MQTT SUBSCRIBE with short PUBLISH
+</name>
+<command option="binary-trace">
+mqtt://%HOSTIP:%MQTTPORT/1195
+</command>
+</client>
+
+#
+# Verify data after the test has been "shot"
+<verify>
+# These are hexadecimal protocol dumps from the client
+#
+# Strip out the random part of the client id from the CONNECT message
+# before comparison
+<strippart>
+s/^(.* 00044d5154540402003c000c6375726c).*/$1/
+</strippart>
+<protocol>
+client CONNECT 18 00044d5154540402003c000c6375726c
+server CONNACK 2 20020000
+client SUBSCRIBE 9 000100043131393500
+server PUBLISH c 300c00043131393568656c6c
+</protocol>
+
+# 18 is CURLE_PARTIAL_FILE
+<errorcode>
+18
+</errorcode>
+</verify>
+</testcase>
diff --git a/tests/data/test1196 b/tests/data/test1196
new file mode 100644
index 000000000..c07efd927
--- /dev/null
+++ b/tests/data/test1196
@@ -0,0 +1,62 @@
+<testcase>
+<info>
+<keywords>
+MQTT
+MQTT SUBSCRIBE
+</keywords>
+</info>
+
+#
+# Server-side
+<reply>
+<data nocheck="yes">
+hello
+</data>
+<datacheck hex="yes">
+00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a
+</datacheck>
+
+# error 1 - "Connection Refused, unacceptable protocol version"
+<servercmd>
+error-CONNACK 1
+</servercmd>
+</reply>
+
+#
+# Client-side
+<client>
+<features>
+mqtt
+</features>
+<server>
+mqtt
+</server>
+<name>
+MQTT with error in CONNACK
+</name>
+<command option="binary-trace">
+mqtt://%HOSTIP:%MQTTPORT/1196
+</command>
+</client>
+
+#
+# Verify data after the test has been "shot"
+<verify>
+# These are hexadecimal protocol dumps from the client
+#
+# Strip out the random part of the client id from the CONNECT message
+# before comparison
+<strippart>
+s/^(.* 00044d5154540402003c000c6375726c).*/$1/
+</strippart>
+<protocol>
+client CONNECT 18 00044d5154540402003c000c6375726c
+server CONNACK 2 20020001
+</protocol>
+
+# 8 is CURLE_WEIRD_SERVER_REPLY
+<errorcode>
+8
+</errorcode>
+</verify>
+</testcase>
diff --git a/tests/server/mqttd.c b/tests/server/mqttd.c
index db5723cdd..6785b0014 100644
--- a/tests/server/mqttd.c
+++ b/tests/server/mqttd.c
@@ -104,6 +104,10 @@
struct configurable {
unsigned char version; /* initial version byte in the request must match
this */
+ bool publish_before_suback;
+ bool short_publish;
+ unsigned char error_connack;
+ int testnum;
};
#define REQUEST_DUMP "log/server.input"
@@ -124,6 +128,10 @@ static void resetdefaults(void)
{
logmsg("Reset to defaults");
config.version = CONFIG_VERSION;
+ config.publish_before_suback = FALSE;
+ config.short_publish = FALSE;
+ config.error_connack = 0;
+ config.testnum = 0;
}
static unsigned char byteval(char *value)
@@ -147,10 +155,29 @@ static void getconfig(void)
config.version = byteval(value);
logmsg("version [%d] set", config.version);
}
+ else if(!strcmp(key, "PUBLISH-before-SUBACK")) {
+ logmsg("PUBLISH-before-SUBACK set");
+ config.publish_before_suback = TRUE;
+ }
+ else if(!strcmp(key, "short-PUBLISH")) {
+ logmsg("short-PUBLISH set");
+ config.short_publish = TRUE;
+ }
+ else if(!strcmp(key, "error-CONNACK")) {
+ config.error_connack = byteval(value);
+ logmsg("error-CONNACK = %d", config.error_connack);
+ }
+ else if(!strcmp(key, "Testnum")) {
+ config.testnum = atoi(value);
+ logmsg("testnum = %d", config.testnum);
+ }
}
}
fclose(fp);
}
+ else {
+ logmsg("No config file '%s' to read", configfile);
+ }
}
static void loghex(unsigned char *buffer, ssize_t len)
@@ -209,11 +236,17 @@ static int connack(FILE *dump, curl_socket_t fd)
MQTT_MSG_CONNACK, 0x02,
0x00, 0x00
};
- ssize_t rc = swrite(fd, (char *)packet, sizeof(packet));
- if(rc == sizeof(packet)) {
- logmsg("WROTE %d bytes [CONACK]", rc);
+ ssize_t rc;
+
+ packet[3] = config.error_connack;
+
+ rc = swrite(fd, (char *)packet, sizeof(packet));
+ if(rc > 0) {
+ logmsg("WROTE %d bytes [CONNACK]", rc);
loghex(packet, rc);
- logprotocol(FROM_SERVER, "CONACK", 2, dump, packet, sizeof(packet));
+ logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet));
+ }
+ if(rc == sizeof(packet)) {
return 0;
}
return 1;
@@ -360,6 +393,7 @@ static int publish(FILE *dump,
size_t payloadindex;
ssize_t remaininglength = topiclen + 2 + payloadlen;
ssize_t packetlen;
+ ssize_t sendamount;
ssize_t rc;
char rembuffer[4];
int encodedlen;
@@ -385,13 +419,18 @@ static int publish(FILE *dump,
payloadindex = 3 + topiclen + encodedlen;
memcpy(&packet[payloadindex], payload, payloadlen);
- rc = swrite(fd, (char *)packet, packetlen);
- if(rc == packetlen) {
+ sendamount = packetlen;
+ if(config.short_publish)
+ sendamount -= 2;
+
+ rc = swrite(fd, (char *)packet, sendamount);
+ if(rc > 0) {
logmsg("WROTE %d bytes [PUBLISH]", rc);
loghex(packet, rc);
logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc);
- return 0;
}
+ if(rc == packetlen)
+ return 0;
return 1;
}
@@ -459,6 +498,11 @@ static curl_socket_t mqttit(curl_socket_t fd)
getconfig();
+ testno = config.testnum;
+
+ if(testno)
+ logmsg("Found test number %ld", testno);
+
do {
/* get the fixed header */
rc = fixedheader(fd, &byte, &remaining_length, &bytes);
@@ -506,8 +550,10 @@ static curl_socket_t mqttit(curl_socket_t fd)
}
}
else if(byte == MQTT_MSG_SUBSCRIBE) {
- char *testnop;
-
+ FILE *stream;
+ int error;
+ char *data;
+ size_t datalen;
logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length,
dump, buffer, rc);
logmsg("Incoming SUBSCRIBE");
@@ -533,26 +579,25 @@ static curl_socket_t mqttit(curl_socket_t fd)
/* there's a QoS byte (two bits) after the topic */
logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id);
- if(suback(dump, fd, packet_id)) {
- logmsg("failed sending SUBACK");
- goto end;
- }
- testnop = strrchr(topic, '/');
- if(!testnop)
- testnop = topic;
- else
- testnop++; /* pass the slash */
- testno = strtol(testnop, NULL, 10);
- if(testno) {
- FILE *stream;
- int error;
- char *data;
- size_t datalen;
- logmsg("Found test number %ld", testno);
- stream = test2fopen(testno);
- error = getpart(&data, &datalen, "reply", "data", stream);
- if(!error)
- publish(dump, fd, packet_id, topic, data, datalen);
+ stream = test2fopen(testno);
+ error = getpart(&data, &datalen, "reply", "data", stream);
+ if(!error) {
+ if(!config.publish_before_suback) {
+ if(suback(dump, fd, packet_id)) {
+ logmsg("failed sending SUBACK");
+ goto end;
+ }
+ }
+ if(publish(dump, fd, packet_id, topic, data, datalen)) {
+ logmsg("PUBLISH failed");
+ goto end;
+ }
+ if(config.publish_before_suback) {
+ if(suback(dump, fd, packet_id)) {
+ logmsg("failed sending SUBACK");
+ goto end;
+ }
+ }
}
else {
char *def = (char *)"this is random payload yes yes it is";