diff options
Diffstat (limited to 'lib/mqtt.c')
-rw-r--r-- | lib/mqtt.c | 141 |
1 files changed, 73 insertions, 68 deletions
diff --git a/lib/mqtt.c b/lib/mqtt.c index 71a00cfc2..2134409cd 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -5,7 +5,7 @@ * | (__| |_| | _ <| |___ * \___|\___/|_| \_\_____| * - * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al. + * Copyright (C) 2020 - 2021, Daniel Stenberg, <daniel@haxx.se>, et al. * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se> * * This software is licensed as described in the file COPYING, which @@ -59,10 +59,12 @@ * Forward declarations. */ -static CURLcode mqtt_do(struct connectdata *conn, bool *done); -static CURLcode mqtt_doing(struct connectdata *conn, bool *done); -static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock); -static CURLcode mqtt_setup_conn(struct connectdata *conn); +static CURLcode mqtt_do(struct Curl_easy *data, bool *done); +static CURLcode mqtt_doing(struct Curl_easy *data, bool *done); +static int mqtt_getsock(struct Curl_easy *data, struct connectdata *conn, + curl_socket_t *sock); +static CURLcode mqtt_setup_conn(struct Curl_easy *data, + struct connectdata *conn); /* * MQTT protocol handler. @@ -90,12 +92,13 @@ const struct Curl_handler Curl_handler_mqtt = { PROTOPT_NONE /* flags */ }; -static CURLcode mqtt_setup_conn(struct connectdata *conn) +static CURLcode mqtt_setup_conn(struct Curl_easy *data, + struct connectdata *conn) { /* allocate the HTTP-specific struct for the Curl_easy, only to survive during this request */ struct MQTT *mq; - struct Curl_easy *data = conn->data; + (void)conn; DEBUGASSERT(data->req.p.mqtt == NULL); mq = calloc(1, sizeof(struct MQTT)); @@ -105,15 +108,15 @@ static CURLcode mqtt_setup_conn(struct connectdata *conn) return CURLE_OK; } -static CURLcode mqtt_send(struct connectdata *conn, +static CURLcode mqtt_send(struct Curl_easy *data, char *buf, size_t len) { CURLcode result = CURLE_OK; + struct connectdata *conn = data->conn; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; - struct Curl_easy *data = conn->data; struct MQTT *mq = data->req.p.mqtt; ssize_t n; - result = Curl_write(conn, sockfd, buf, len, &n); + result = Curl_write(data, sockfd, buf, len, &n); if(!result) Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); if(len != (size_t)n) { @@ -130,14 +133,16 @@ static CURLcode mqtt_send(struct connectdata *conn, /* Generic function called by the multi interface to figure out what socket(s) to wait for and for what actions during the DOING and PROTOCONNECT states */ -static int mqtt_getsock(struct connectdata *conn, +static int mqtt_getsock(struct Curl_easy *data, + struct connectdata *conn, curl_socket_t *sock) { + (void)data; sock[0] = conn->sock[FIRSTSOCKET]; return GETSOCK_READSOCK(FIRSTSOCKET); } -static CURLcode mqtt_connect(struct connectdata *conn) +static CURLcode mqtt_connect(struct Curl_easy *data) { CURLcode result = CURLE_OK; const size_t client_id_offset = 14; @@ -157,31 +162,31 @@ static CURLcode mqtt_connect(struct connectdata *conn) packet[1] = (packetlen - 2) & 0x7f; packet[client_id_offset - 1] = MQTT_CLIENTID_LEN; - result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[clen], + result = Curl_rand_hex(data, (unsigned char *)&client_id[clen], MQTT_CLIENTID_LEN - clen + 1); memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN); - infof(conn->data, "Using client id '%s'\n", client_id); + infof(data, "Using client id '%s'\n", client_id); if(!result) - result = mqtt_send(conn, packet, packetlen); + result = mqtt_send(data, packet, packetlen); return result; } -static CURLcode mqtt_disconnect(struct connectdata *conn) +static CURLcode mqtt_disconnect(struct Curl_easy *data) { CURLcode result = CURLE_OK; - result = mqtt_send(conn, (char *)"\xe0\x00", 2); + result = mqtt_send(data, (char *)"\xe0\x00", 2); return result; } -static CURLcode mqtt_verify_connack(struct connectdata *conn) +static CURLcode mqtt_verify_connack(struct Curl_easy *data) { CURLcode result; + struct connectdata *conn = data->conn; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; unsigned char readbuf[MQTT_CONNACK_LEN]; ssize_t nread; - struct Curl_easy *data = conn->data; - result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread); + result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread); if(result) goto fail; @@ -204,18 +209,18 @@ fail: return result; } -static CURLcode mqtt_get_topic(struct connectdata *conn, +static CURLcode mqtt_get_topic(struct Curl_easy *data, char **topic, size_t *topiclen) { CURLcode result = CURLE_OK; - char *path = conn->data->state.up.path; + char *path = data->state.up.path; if(strlen(path) > 1) { - result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, + result = Curl_urldecode(data, path + 1, 0, topic, topiclen, REJECT_NADA); } else { - failf(conn->data, "Error: No topic specified."); + failf(data, "Error: No topic specified."); result = CURLE_URL_MALFORMAT; } return result; @@ -238,7 +243,7 @@ static int mqtt_encode_len(char *buf, size_t len) return i; } -static CURLcode mqtt_subscribe(struct connectdata *conn) +static CURLcode mqtt_subscribe(struct Curl_easy *data) { CURLcode result = CURLE_OK; char *topic = NULL; @@ -247,8 +252,9 @@ static CURLcode mqtt_subscribe(struct connectdata *conn) size_t packetlen; char encodedsize[4]; size_t n; + struct connectdata *conn = data->conn; - result = mqtt_get_topic(conn, &topic, &topiclen); + result = mqtt_get_topic(data, &topic, &topiclen); if(result) goto fail; @@ -274,7 +280,7 @@ static CURLcode mqtt_subscribe(struct connectdata *conn) memcpy(&packet[5 + n], topic, topiclen); packet[5 + n + topiclen] = 0; /* QoS zero */ - result = mqtt_send(conn, (char *)packet, packetlen); + result = mqtt_send(data, (char *)packet, packetlen); fail: free(topic); @@ -285,19 +291,20 @@ fail: /* * Called when the first byte was already read. */ -static CURLcode mqtt_verify_suback(struct connectdata *conn) +static CURLcode mqtt_verify_suback(struct Curl_easy *data) { CURLcode result; + struct connectdata *conn = data->conn; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; unsigned char readbuf[MQTT_SUBACK_LEN]; ssize_t nread; struct mqtt_conn *mqtt = &conn->proto.mqtt; - result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread); + result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread); if(result) goto fail; - Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); + Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); /* fixme */ if(nread < MQTT_SUBACK_LEN) { @@ -315,10 +322,10 @@ fail: return result; } -static CURLcode mqtt_publish(struct connectdata *conn) +static CURLcode mqtt_publish(struct Curl_easy *data) { CURLcode result; - char *payload = conn->data->set.postfields; + char *payload = data->set.postfields; size_t payloadlen; char *topic = NULL; size_t topiclen; @@ -327,7 +334,7 @@ static CURLcode mqtt_publish(struct connectdata *conn) size_t remaininglength; size_t encodelen; char encodedbytes[4]; - curl_off_t postfieldsize = conn->data->set.postfieldsize; + curl_off_t postfieldsize = data->set.postfieldsize; if(!payload) return CURLE_BAD_FUNCTION_ARGUMENT; @@ -336,7 +343,7 @@ static CURLcode mqtt_publish(struct connectdata *conn) else payloadlen = (size_t)postfieldsize; - result = mqtt_get_topic(conn, &topic, &topiclen); + result = mqtt_get_topic(data, &topic, &topiclen); if(result) goto fail; @@ -360,7 +367,7 @@ static CURLcode mqtt_publish(struct connectdata *conn) i += topiclen; memcpy(&pkt[i], payload, payloadlen); i += payloadlen; - result = mqtt_send(conn, (char *)pkt, i); + result = mqtt_send(data, (char *)pkt, i); fail: free(pkt); @@ -403,13 +410,14 @@ static const char *statenames[]={ #endif /* The only way to change state */ -static void mqstate(struct connectdata *conn, +static void mqstate(struct Curl_easy *data, enum mqttstate state, enum mqttstate nextstate) /* used if state == FIRST */ { + struct connectdata *conn = data->conn; struct mqtt_conn *mqtt = &conn->proto.mqtt; #ifdef CURLDEBUG - infof(conn->data, "%s (from %s) (next is %s)\n", + infof(data, "%s (from %s) (next is %s)\n", statenames[state], statenames[mqtt->state], (state == MQTT_FIRST)? statenames[nextstate] : ""); @@ -423,13 +431,12 @@ static void mqstate(struct connectdata *conn, /* for the publish packet */ #define MQTT_HEADER_LEN 5 /* max 5 bytes */ -static CURLcode mqtt_read_publish(struct connectdata *conn, - bool *done) +static CURLcode mqtt_read_publish(struct Curl_easy *data, bool *done) { CURLcode result = CURLE_OK; + struct connectdata *conn = data->conn; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; ssize_t nread; - struct Curl_easy *data = conn->data; unsigned char *pkt = (unsigned char *)data->state.buffer; size_t remlen; struct mqtt_conn *mqtt = &conn->proto.mqtt; @@ -439,11 +446,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, switch(mqtt->state) { MQTT_SUBACK_COMING: case MQTT_SUBACK_COMING: - result = mqtt_verify_suback(conn); + result = mqtt_verify_suback(data); if(result) break; - mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT); + mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); break; case MQTT_SUBACK: @@ -451,9 +458,9 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, /* we are expecting PUBLISH or SUBACK */ packet = mq->firstbyte & 0xf0; if(packet == MQTT_MSG_PUBLISH) - mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE); + mqstate(data, MQTT_PUB_REMAIN, MQTT_NOSTATE); else if(packet == MQTT_MSG_SUBACK) { - mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE); + mqstate(data, MQTT_SUBACK_COMING, MQTT_NOSTATE); goto MQTT_SUBACK_COMING; } else if(packet == MQTT_MSG_DISCONNECT) { @@ -480,7 +487,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, size_t rest = mq->npacket; if(rest > (size_t)data->set.buffer_size) rest = (size_t)data->set.buffer_size; - result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread); + result = Curl_read(data, sockfd, (char *)pkt, rest, &nread); if(result) { if(CURLE_AGAIN == result) { infof(data, "EEEE AAAAGAIN\n"); @@ -500,13 +507,13 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, /* if QoS is set, message contains packet id */ - result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread); + result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)pkt, nread); if(result) goto end; if(!mq->npacket) /* no more PUBLISH payload, back to subscribe wait state */ - mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT); + mqstate(data, MQTT_FIRST, MQTT_PUBWAIT); break; } default: @@ -518,27 +525,25 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, return result; } -static CURLcode mqtt_do(struct connectdata *conn, bool *done) +static CURLcode mqtt_do(struct Curl_easy *data, bool *done) { CURLcode result = CURLE_OK; - struct Curl_easy *data = conn->data; - *done = FALSE; /* unconditionally */ - result = mqtt_connect(conn); + result = mqtt_connect(data); if(result) { failf(data, "Error %d sending MQTT CONN request", result); return result; } - mqstate(conn, MQTT_FIRST, MQTT_CONNACK); + mqstate(data, MQTT_FIRST, MQTT_CONNACK); return CURLE_OK; } -static CURLcode mqtt_doing(struct connectdata *conn, bool *done) +static CURLcode mqtt_doing(struct Curl_easy *data, bool *done) { CURLcode result = CURLE_OK; + struct connectdata *conn = data->conn; struct mqtt_conn *mqtt = &conn->proto.mqtt; - struct Curl_easy *data = conn->data; struct MQTT *mq = data->req.p.mqtt; ssize_t nread; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; @@ -550,7 +555,7 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) if(mq->nsend) { /* send the remainder of an outgoing packet */ char *ptr = mq->sendleftovers; - result = mqtt_send(conn, mq->sendleftovers, mq->nsend); + result = mqtt_send(data, mq->sendleftovers, mq->nsend); free(ptr); if(result) return result; @@ -560,17 +565,17 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) switch(mqtt->state) { case MQTT_FIRST: /* Read the initial byte only */ - result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread); + result = Curl_read(data, sockfd, (char *)&mq->firstbyte, 1, &nread); if(!nread) break; Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1); /* remember the first byte */ mq->npacket = 0; - mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); + mqstate(data, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); /* FALLTHROUGH */ case MQTT_REMAINING_LENGTH: do { - result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread); + result = Curl_read(data, sockfd, (char *)&byte, 1, &nread); if(!nread) break; Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1); @@ -581,10 +586,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL); mq->npacket = 0; if(mq->remaining_length) { - mqstate(conn, mqtt->nextstate, MQTT_NOSTATE); + mqstate(data, mqtt->nextstate, MQTT_NOSTATE); break; } - mqstate(conn, MQTT_FIRST, MQTT_FIRST); + mqstate(data, MQTT_FIRST, MQTT_FIRST); if(mq->firstbyte == MQTT_MSG_DISCONNECT) { infof(data, "Got DISCONNECT\n"); @@ -592,22 +597,22 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) } break; case MQTT_CONNACK: - result = mqtt_verify_connack(conn); + result = mqtt_verify_connack(data); if(result) break; - if(conn->data->state.httpreq == HTTPREQ_POST) { - result = mqtt_publish(conn); + if(data->state.httpreq == HTTPREQ_POST) { + result = mqtt_publish(data); if(!result) { - result = mqtt_disconnect(conn); + result = mqtt_disconnect(data); *done = TRUE; } mqtt->nextstate = MQTT_FIRST; } else { - result = mqtt_subscribe(conn); + result = mqtt_subscribe(data); if(!result) { - mqstate(conn, MQTT_FIRST, MQTT_SUBACK); + mqstate(data, MQTT_FIRST, MQTT_SUBACK); } } break; @@ -615,11 +620,11 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) case MQTT_SUBACK: case MQTT_PUBWAIT: case MQTT_PUB_REMAIN: - result = mqtt_read_publish(conn, done); + result = mqtt_read_publish(data, done); break; default: - failf(conn->data, "State not handled yet"); + failf(data, "State not handled yet"); *done = TRUE; break; } |