summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsen Kirov <akirov@luxoft.com>2015-09-14 18:15:23 +0300
committerAsen Kirov <akirov@luxoft.com>2015-09-14 18:15:23 +0300
commit1182cde5e1e8a878029a68a2dc333add5e8257a7 (patch)
tree918954984ceb459950be96f7640c6171513886d4
parent12d569a8ae8f8822c3bba13c82f3667780d94250 (diff)
downloadsmartdevicelink-1182cde5e1e8a878029a68a2dc333add5e8257a7.tar.gz
Remove parsed JSON and invalid messages from MessageBroker's buffer
With the previous code, messages that cannot be parsed (pure garbage, invalid JSONs, JSONs that are not objects or does not have jsonrpc member set correctly) remained in MesageBrocker's TcpServer receive buffer for the corresponding file descriptor forever. Because the garbage is in front of the buffer and next messages are appended at the end, these next messages are not processed, since parsing stops on first error. Also if parsed message has different length than the original (this is possible if there are floating point numbers for example), the original message is not deleted properly and next message may be broken. The fix is to handle invalid messages and remove invalid or parsed data from the buffer, without removing any potentially valid, but incomplete data (masked websocket frames or SDL messages). It is still possible to misinterpred garbage as a beginning of a incomplete valid frame. In this case we are doing this: when we have old data in the buffer and a new message arrives, if we can't parse the buffer WITH the new message - try to parse only the new message and clear the buffer on success, because if the new message can be parsed alone, then the old must be garbage. Fixed problem when deleting receiving buffer of disconnected client.
-rw-r--r--src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp5
-rw-r--r--src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp222
-rw-r--r--src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp93
3 files changed, 263 insertions, 57 deletions
diff --git a/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp b/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp
index 952b250ae..c54204379 100644
--- a/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp
+++ b/src/3rd_party-static/MessageBroker/include/CMessageBroker.hpp
@@ -61,9 +61,10 @@ namespace NsMessageBroker
* \brief Receive data from TCP server (from client).
* \param fd FileDescriptor of socket.
* \param aJSONData JSON string.
+ * \param tryHard give up on first JSON parse error or try to workaround it.
*/
- void onMessageReceived(int fd, std::string& aJSONData);
-
+ void onMessageReceived(int fd, std::string& aJSONData, bool tryHard);
+
/**
* \brief Test of buffer parsing.
*/
diff --git a/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp b/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp
index 07f01ea12..3b14489a8 100644
--- a/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp
+++ b/src/3rd_party-static/MessageBroker/src/lib_messagebroker/CMessageBroker.cpp
@@ -4,6 +4,7 @@
* \author AKara
*/
+#include <cassert>
#include <stdio.h>
#include <vector>
@@ -216,6 +217,31 @@ class CMessageBroker_Private {
int popMessageFromWaitQue(CMessage* pMessage);
/**
+ * \brief Tries to remove the parsed part of the buffer
+ * \param root Parsed JSON value
+ * \param aJSONData The string buffer
+ * \return true on success, false on failure
+ */
+ bool cutParsedJSON(const Json::Value& root, std::string& aJSONData);
+
+ /**
+ * \brief Finds the position just after a JSON object or array in a buffer
+ * \param isObject Must be true for object, false for array
+ * \param aJSONData The string buffer
+ * \return The position in the buffer after the object or array on success,
+ * std::strin::npos on failure
+ */
+ size_t jumpOverJSONObjectOrArray(bool isObject, const std::string& aJSONData);
+
+ /**
+ * \brief Finds the position just after a JSON string in a buffer
+ * \param aJSONData The string buffer
+ * \return The position in the buffer after the string on success,
+ * std::strin::npos on failure
+ */
+ size_t jumpOverJSONString(const std::string& aJSONData);
+
+ /**
* \brief Que of messages.
*/
std::deque<CMessage*> mMessagesQueue;
@@ -288,35 +314,183 @@ CMessageBroker* CMessageBroker::getInstance() {
return &instance;
}
-void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData) {
- DBG_MSG(("CMessageBroker::onMessageReceived()\n"));
- while (!aJSONData.empty()) {
- Json::Value root;
- if (!p->m_reader.parse(aJSONData, root)) {
- DBG_MSG(("Received not JSON string! %s\n", aJSONData.c_str()));
- return;
+
+size_t CMessageBroker_Private::jumpOverJSONObjectOrArray(bool isObject,
+ const std::string& aJSONData) {
+ const char openBracket = isObject? '{' : '[';
+ const char closeBracket = isObject? '}' : ']';
+ int open_minus_close_brackets(1);
+ size_t position = aJSONData.find(openBracket); // Find the beginning of the object
+
+ while ((position != std::string::npos) && (open_minus_close_brackets > 0)) {
+ position = aJSONData.find_first_of(std::string("\"")+openBracket+closeBracket,
+ position+1);
+ if (std::string::npos == position) {
+ break;
}
- if(root["jsonrpc"]!="2.0") {
- DBG_MSG(("\t Json::Reader::parce didn't set up jsonrpc! jsonrpc = '%s'\n", root["jsonrpc"].asString().c_str()));
- return;
+ if ('"' == aJSONData[position]) {
+ // Ignore string interior, which might contain brackets and escaped "-s
+ do {
+ position = aJSONData.find('"', position+1); // Find the closing quote
+ } while ((std::string::npos != position) && ('\\' == aJSONData[position-1]));
+ } else if (openBracket == aJSONData[position]) {
+ ++open_minus_close_brackets;
+ } else if (closeBracket == aJSONData[position]) {
+ --open_minus_close_brackets;
}
- std::string wmes = p->m_recieverWriter.write(root);
- DBG_MSG(("Parsed JSON string %d : %s\n", wmes.length(),
- wmes.c_str()));
- DBG_MSG(("Buffer is:%s\n", aJSONData.c_str()));
- if (aJSONData.length() > wmes.length()) {
- // wmes string length can differ from buffer substr length
- size_t offset = wmes.length();
- char msg_begin = '{';
- if (aJSONData.at(offset) != msg_begin) {
- offset -= 1; // wmes can contain redudant \n in the tail.
+ }
+
+ if ((0 == open_minus_close_brackets) && (std::string::npos != position)) {
+ ++position; // Move after the closing bracket
+ } else {
+ position = std::string::npos;
+ }
+
+ return position;
+}
+
+
+size_t CMessageBroker_Private::jumpOverJSONString(const std::string& aJSONData) {
+ size_t position = aJSONData.find('"'); // Find the beginning of the string
+
+ do {
+ position = aJSONData.find('"', position+1); // Find the closing quote
+ } while ((std::string::npos != position) && ('\\' == aJSONData[position-1]));
+
+ if (std::string::npos != position) {
+ ++position; // Move after the closing quote
+ }
+
+ return position;
+}
+
+
+bool CMessageBroker_Private::cutParsedJSON(const Json::Value& root,
+ std::string& aJSONData) {
+ if (root.isNull() || aJSONData.empty()) {
+ DBG_MSG_ERROR(("JSON is null or the buffer is empty!\n"));
+ return false;
+ }
+
+ std::string parsed_json_str = m_recieverWriter.write(root);
+ DBG_MSG(("Parsed JSON string: '%s'\n", parsed_json_str.c_str()));
+
+ // Trim front spaces (if any)
+ const size_t nonempty_position = aJSONData.find_first_not_of(" \t\n\v\f\r");
+ aJSONData.erase(0, nonempty_position);
+ if (std::string::npos == nonempty_position) {
+ DBG_MSG_ERROR(("Buffer contains only blanks!\n"));
+ return false;
+ }
+
+ // JSON writer puts '\n' at the end. Remove it.
+ const size_t final_lf_pos = parsed_json_str.rfind('\n');
+ if (final_lf_pos == parsed_json_str.length()-1) {
+ parsed_json_str.erase(final_lf_pos, 1);
+ }
+
+ /* RFC 4627: "A JSON value MUST be an object, array, number, or string, or
+ * one of the following three literal names: false null true"
+ * So we will try to find the borders of the parsed part based on its type. */
+
+ size_t position(std::string::npos);
+
+ if (0 == aJSONData.find(parsed_json_str)) {
+ // If by chance parsed JSON is the same in the buffer and is at the beginning
+ position = parsed_json_str.length();
+ } else if (root.isObject() || root.isArray()) {
+ position = jumpOverJSONObjectOrArray(root.isObject(), aJSONData);
+ } else if (root.isString()) {
+ position = jumpOverJSONString(aJSONData);
+ } else if (root.isNumeric()) {
+ position = aJSONData.find_first_not_of("+-0123456789.eE");
+ } else if (root.isBool() || ("null" == parsed_json_str)) {
+ position = aJSONData.find(parsed_json_str);
+ if (std::string::npos != position) {
+ position += parsed_json_str.length();
+ }
+ } else {
+ DBG_MSG_ERROR(("Unknown JSON type!\n"));
+ }
+
+ if (std::string::npos == position) {
+ DBG_MSG_ERROR(("Error finding JSON object boundaries!\n"));
+ /* This should not happen, because the string is already parsed as a
+ * valid JSON. If this happens then above code is wrong. It is better
+ * to assert() than just return here, because otherwise we may enter an
+ * endless cycle - fail to process one and the same message again and
+ * again. Or we may clear the buffer and return, but in this way we will
+ * loose the next messages, miss a bug here, and create another bug. */
+ assert(std::string::npos != position);
+ return false; // For release version
+ }
+
+ if ((position >= aJSONData.length()) ||
+ ((position == aJSONData.length()-1) && isspace(aJSONData[position]))) {
+ // No next object. Clear entire aJSONData.
+ aJSONData = "";
+ } else {
+ // There is another object. Clear the current one.
+ aJSONData.erase(0, position);
+ }
+
+ return true;
+}
+
+
+void CMessageBroker::onMessageReceived(int fd, std::string& aJSONData, bool tryHard) {
+ DBG_MSG(("CMessageBroker::onMessageReceived(%d, '%s')\n", fd, aJSONData.c_str()));
+
+ while (! aJSONData.empty()) {
+ Json::Value root;
+ if ((! p->m_reader.parse(aJSONData, root)) || root.isNull()) {
+ DBG_MSG_ERROR(("Unable to parse JSON!"));
+ if (! tryHard) {
+ return;
+ }
+ uint8_t first_byte = static_cast<uint8_t>(aJSONData[0]);
+ if ((first_byte <= 0x08) || ((first_byte >= 0x80) && (first_byte <= 0x88))) {
+ DBG_MSG((" There is an unparsed websocket header probably.\n"));
+ /* Websocket headers can have FIN flag set in the first byte (0x80).
+ * Then there are 3 zero bits and 4 bits for opcode (from 0x00 to 0x0A).
+ * But actually we don't use opcodes above 0x08.
+ * Use this fact to distinguish websocket header from payload text data.
+ * It can be a coincidence of course, but we have to give it a try. */
+ return;
+ } else if ('{' == aJSONData[0]) {
+ DBG_MSG_ERROR((" Incomplete JSON object probably.\n"));
+ return;
+ } else {
+ DBG_MSG_ERROR((" Step in the buffer and try again...\n"));
+ aJSONData.erase(0, 1);
+ DBG_MSG_ERROR(("Buffer after cut is: '%s'\n", aJSONData.c_str()));
+ continue;
}
- aJSONData.erase(aJSONData.begin(), aJSONData.begin() + offset);
- DBG_MSG(("Buffer after cut is:%s\n", aJSONData.c_str()));
+
+ } else if (! root.isObject()) {
+ /* JSON RPC 2.0 messages are objects. Batch calls must be pre-rpocessed,
+ * so no need for "and !root.isArray()" */
+ DBG_MSG_ERROR(("Parsed JSON is not an object!\n"));
+ if (! tryHard) {
+ return;
+ }
+ // Cut parsed data from the buffer below and continue
+
+ } else if ((!root.isMember("jsonrpc")) || (root["jsonrpc"]!="2.0")) {
+ DBG_MSG_ERROR(("'jsonrpc' is not set correctly in parsed JSON!\n"));
+ if (! tryHard) {
+ return;
+ }
+ // Cut parsed object from the buffer below and continue
+
} else {
- aJSONData = "";
+ // Parsing successful. Pass the message up.
+ p->pushMessage(new CMessage(fd, root));
}
- p->pushMessage(new CMessage(fd, root));
+
+ p->cutParsedJSON(root, aJSONData);
+
+ DBG_MSG(("Buffer after cut is: '%s'\n", aJSONData.c_str()));
}
}
diff --git a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
index ca27502b9..bdd7b2bfd 100644
--- a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
+++ b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
@@ -58,9 +58,10 @@ ssize_t TcpServer::Send(int fd, const std::string& data) {
bool TcpServer::Recv(int fd) {
DBG_MSG(("TcpServer::Recv(%d)\n", fd));
- ssize_t nb = -1;
std::string* pReceivingBuffer = getBufferFor(fd);
+ bool buffer_was_not_empty = pReceivingBuffer->size() > 0;
+
std::vector<char> buf;
buf.reserve(RECV_BUFFER_LENGTH + pReceivingBuffer->size());
DBG_MSG(("Left in pReceivingBuffer: %d \n",
@@ -68,50 +69,87 @@ bool TcpServer::Recv(int fd) {
buf.assign(pReceivingBuffer->c_str(),
pReceivingBuffer->c_str() + pReceivingBuffer->size());
buf.resize(RECV_BUFFER_LENGTH + pReceivingBuffer->size());
- ssize_t received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0);
- nb = received_bytes;
+
+ int received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0);
+ if (received_bytes <= 0) {
+ DBG_MSG(("Received %d bytes from %d; error = %d\n",
+ received_bytes, fd, errno));
+ m_purge.push_back(fd);
+ return false;
+ }
+
+ unsigned int nb = received_bytes;
+ std::vector<char> last_msg_buf(buf.begin()+pReceivingBuffer->size(),
+ buf.begin()+pReceivingBuffer->size()+nb);
DBG_MSG(("Recieved %d from %d\n", nb, fd));
- nb += pReceivingBuffer->size();
+ nb += static_cast<unsigned int>(pReceivingBuffer->size());
DBG_MSG(("Recieved with buffer %d from %d\n", nb, fd));
- if (received_bytes > 0) {
- unsigned int recieved_data = nb;
+ if (nb > 0) { // This is redundant
if (isWebSocket(fd)) {
const unsigned int data_length =
- mWebSocketHandler.parseWebSocketDataLength(&buf[0], recieved_data);
+ mWebSocketHandler.parseWebSocketDataLength(&buf[0], nb);
- DBG_MSG(("Received %d actual data length %d\n",
- recieved_data, data_length));
+ DBG_MSG(("Received %d actual data length %d\n", nb, data_length));
- if (data_length > recieved_data) {
- DBG_MSG_ERROR(("Received %d actual data length %d\n",
- recieved_data, data_length));
+ if (data_length > nb) {
+ DBG_MSG_ERROR(("Received %d actual data length %d\n", nb, data_length));
DBG_MSG_ERROR(("Incomplete message"));
*pReceivingBuffer = std::string(&buf[0], nb);
return false;
}
- unsigned int b_size = static_cast<unsigned int>(nb);
- mWebSocketHandler.parseWebSocketData(&buf[0], b_size);
- nb = b_size;
+ mWebSocketHandler.parseWebSocketData(&buf[0], nb);
}
*pReceivingBuffer = std::string(&buf[0], nb);
- DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s",
+ DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s\n",
pReceivingBuffer->size(), pReceivingBuffer->c_str()));
- // we need to check websocket clients here
+
+ // we need to check for websocket handshake
if (!checkWebSocketHandShake(fd, pReceivingBuffer))
{ //JSON MESSAGE received. Send data in CMessageBroker.
if (mpMessageBroker) {
- mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer);
+ size_t buffer_size_before = pReceivingBuffer->size();
+ mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer, true);
+
+ if (buffer_was_not_empty && (pReceivingBuffer->size() == buffer_size_before)) {
+ /* We couldn't parse the buffer (with the last message at the end)
+ * Try to parse ONLY the last message */
+ DBG_MSG_ERROR(("Couldn't parse the whole buffer! Try only the last message.\n"));
+
+ nb = static_cast<unsigned int>(last_msg_buf.size());
+ if (isWebSocket(fd)) {
+ const unsigned int data_length =
+ mWebSocketHandler.parseWebSocketDataLength(&last_msg_buf[0], nb);
+ if (data_length > nb) {
+ DBG_MSG_ERROR(("The last message may be incomplete. Don't do anything.\n"));
+ /* Should we replace the buffer with the last message?
+ * Probably not. It may not be a real websocket message.
+ * Wait for a full message. */
+ return false;
+ }
+ mWebSocketHandler.parseWebSocketData(&last_msg_buf[0], nb);
+ }
+
+ std::string last_message = std::string(&last_msg_buf[0], nb);
+ buffer_size_before = last_message.size();
+ mpMessageBroker->onMessageReceived(fd, last_message, false);
+ if ( last_message.size() < buffer_size_before ) {
+ /* Parsing last message successful! Discard the old data and
+ * keep only what is left from the last message */
+ DBG_MSG_ERROR(("Parsing last message successful! Discard the old data.\n"));
+ *pReceivingBuffer = last_message;
+ }
+ }
} else {
return false;
}
- } else { // client is a websocket
- std::string handshakeResponse =
- "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n"
- "Connection: Upgrade\r\nSec-WebSocket-Accept: ";
+ } else { // message is a websocket handshake
ssize_t webSocketKeyPos = pReceivingBuffer->find("Sec-WebSocket-Key: ");
if (-1 != webSocketKeyPos) {
+ std::string handshakeResponse =
+ "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n"
+ "Connection: Upgrade\r\nSec-WebSocket-Accept: ";
std::string wsKey = pReceivingBuffer->substr(webSocketKeyPos + 19, 24);
mWebSocketHandler.handshake_0405(wsKey);
handshakeResponse += wsKey;
@@ -126,15 +164,8 @@ bool TcpServer::Recv(int fd) {
m_WebSocketClients.push_back(fd);
}
}
-
- return true;
- }
- else {
- DBG_MSG(("Received %d bytes from %d; error = %d\n",
- received_bytes, fd, errno));
- m_purge.push_back(fd);
- return false;
}
+ return true;
}
bool TcpServer::checkWebSocketHandShake(int fd, std::string* pReceivingBuffer) {
@@ -228,9 +259,9 @@ void TcpServer::WaitMessage(uint32_t ms) {
itr = m_receivingBuffers.find((*it));
if (itr != m_receivingBuffers.end())
{ // delete receiving buffer of disconnected client
+ mpMessageBroker->OnSocketClosed(itr->first);
delete itr->second;
m_receivingBuffers.erase(itr);
- mpMessageBroker->OnSocketClosed(itr->first);
}
}