summaryrefslogtreecommitdiff
path: root/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp')
-rw-r--r--src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp93
1 files changed, 62 insertions, 31 deletions
diff --git a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
index ca27502b9..bdd7b2bfd 100644
--- a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
+++ b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp
@@ -58,9 +58,10 @@ ssize_t TcpServer::Send(int fd, const std::string& data) {
bool TcpServer::Recv(int fd) {
DBG_MSG(("TcpServer::Recv(%d)\n", fd));
- ssize_t nb = -1;
std::string* pReceivingBuffer = getBufferFor(fd);
+ bool buffer_was_not_empty = pReceivingBuffer->size() > 0;
+
std::vector<char> buf;
buf.reserve(RECV_BUFFER_LENGTH + pReceivingBuffer->size());
DBG_MSG(("Left in pReceivingBuffer: %d \n",
@@ -68,50 +69,87 @@ bool TcpServer::Recv(int fd) {
buf.assign(pReceivingBuffer->c_str(),
pReceivingBuffer->c_str() + pReceivingBuffer->size());
buf.resize(RECV_BUFFER_LENGTH + pReceivingBuffer->size());
- ssize_t received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0);
- nb = received_bytes;
+
+ int received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0);
+ if (received_bytes <= 0) {
+ DBG_MSG(("Received %d bytes from %d; error = %d\n",
+ received_bytes, fd, errno));
+ m_purge.push_back(fd);
+ return false;
+ }
+
+ unsigned int nb = received_bytes;
+ std::vector<char> last_msg_buf(buf.begin()+pReceivingBuffer->size(),
+ buf.begin()+pReceivingBuffer->size()+nb);
DBG_MSG(("Recieved %d from %d\n", nb, fd));
- nb += pReceivingBuffer->size();
+ nb += static_cast<unsigned int>(pReceivingBuffer->size());
DBG_MSG(("Recieved with buffer %d from %d\n", nb, fd));
- if (received_bytes > 0) {
- unsigned int recieved_data = nb;
+ if (nb > 0) { // This is redundant
if (isWebSocket(fd)) {
const unsigned int data_length =
- mWebSocketHandler.parseWebSocketDataLength(&buf[0], recieved_data);
+ mWebSocketHandler.parseWebSocketDataLength(&buf[0], nb);
- DBG_MSG(("Received %d actual data length %d\n",
- recieved_data, data_length));
+ DBG_MSG(("Received %d actual data length %d\n", nb, data_length));
- if (data_length > recieved_data) {
- DBG_MSG_ERROR(("Received %d actual data length %d\n",
- recieved_data, data_length));
+ if (data_length > nb) {
+ DBG_MSG_ERROR(("Received %d actual data length %d\n", nb, data_length));
DBG_MSG_ERROR(("Incomplete message"));
*pReceivingBuffer = std::string(&buf[0], nb);
return false;
}
- unsigned int b_size = static_cast<unsigned int>(nb);
- mWebSocketHandler.parseWebSocketData(&buf[0], b_size);
- nb = b_size;
+ mWebSocketHandler.parseWebSocketData(&buf[0], nb);
}
*pReceivingBuffer = std::string(&buf[0], nb);
- DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s",
+ DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s\n",
pReceivingBuffer->size(), pReceivingBuffer->c_str()));
- // we need to check websocket clients here
+
+ // we need to check for websocket handshake
if (!checkWebSocketHandShake(fd, pReceivingBuffer))
{ //JSON MESSAGE received. Send data in CMessageBroker.
if (mpMessageBroker) {
- mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer);
+ size_t buffer_size_before = pReceivingBuffer->size();
+ mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer, true);
+
+ if (buffer_was_not_empty && (pReceivingBuffer->size() == buffer_size_before)) {
+ /* We couldn't parse the buffer (with the last message at the end)
+ * Try to parse ONLY the last message */
+ DBG_MSG_ERROR(("Couldn't parse the whole buffer! Try only the last message.\n"));
+
+ nb = static_cast<unsigned int>(last_msg_buf.size());
+ if (isWebSocket(fd)) {
+ const unsigned int data_length =
+ mWebSocketHandler.parseWebSocketDataLength(&last_msg_buf[0], nb);
+ if (data_length > nb) {
+ DBG_MSG_ERROR(("The last message may be incomplete. Don't do anything.\n"));
+ /* Should we replace the buffer with the last message?
+ * Probably not. It may not be a real websocket message.
+ * Wait for a full message. */
+ return false;
+ }
+ mWebSocketHandler.parseWebSocketData(&last_msg_buf[0], nb);
+ }
+
+ std::string last_message = std::string(&last_msg_buf[0], nb);
+ buffer_size_before = last_message.size();
+ mpMessageBroker->onMessageReceived(fd, last_message, false);
+ if ( last_message.size() < buffer_size_before ) {
+ /* Parsing last message successful! Discard the old data and
+ * keep only what is left from the last message */
+ DBG_MSG_ERROR(("Parsing last message successful! Discard the old data.\n"));
+ *pReceivingBuffer = last_message;
+ }
+ }
} else {
return false;
}
- } else { // client is a websocket
- std::string handshakeResponse =
- "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n"
- "Connection: Upgrade\r\nSec-WebSocket-Accept: ";
+ } else { // message is a websocket handshake
ssize_t webSocketKeyPos = pReceivingBuffer->find("Sec-WebSocket-Key: ");
if (-1 != webSocketKeyPos) {
+ std::string handshakeResponse =
+ "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n"
+ "Connection: Upgrade\r\nSec-WebSocket-Accept: ";
std::string wsKey = pReceivingBuffer->substr(webSocketKeyPos + 19, 24);
mWebSocketHandler.handshake_0405(wsKey);
handshakeResponse += wsKey;
@@ -126,15 +164,8 @@ bool TcpServer::Recv(int fd) {
m_WebSocketClients.push_back(fd);
}
}
-
- return true;
- }
- else {
- DBG_MSG(("Received %d bytes from %d; error = %d\n",
- received_bytes, fd, errno));
- m_purge.push_back(fd);
- return false;
}
+ return true;
}
bool TcpServer::checkWebSocketHandShake(int fd, std::string* pReceivingBuffer) {
@@ -228,9 +259,9 @@ void TcpServer::WaitMessage(uint32_t ms) {
itr = m_receivingBuffers.find((*it));
if (itr != m_receivingBuffers.end())
{ // delete receiving buffer of disconnected client
+ mpMessageBroker->OnSocketClosed(itr->first);
delete itr->second;
m_receivingBuffers.erase(itr);
- mpMessageBroker->OnSocketClosed(itr->first);
}
}