diff options
Diffstat (limited to 'src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp')
-rw-r--r-- | src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp | 93 |
1 files changed, 62 insertions, 31 deletions
diff --git a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp index ca27502b9..bdd7b2bfd 100644 --- a/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp +++ b/src/3rd_party-static/MessageBroker/src/server/mb_tcpserver.cpp @@ -58,9 +58,10 @@ ssize_t TcpServer::Send(int fd, const std::string& data) { bool TcpServer::Recv(int fd) { DBG_MSG(("TcpServer::Recv(%d)\n", fd)); - ssize_t nb = -1; std::string* pReceivingBuffer = getBufferFor(fd); + bool buffer_was_not_empty = pReceivingBuffer->size() > 0; + std::vector<char> buf; buf.reserve(RECV_BUFFER_LENGTH + pReceivingBuffer->size()); DBG_MSG(("Left in pReceivingBuffer: %d \n", @@ -68,50 +69,87 @@ bool TcpServer::Recv(int fd) { buf.assign(pReceivingBuffer->c_str(), pReceivingBuffer->c_str() + pReceivingBuffer->size()); buf.resize(RECV_BUFFER_LENGTH + pReceivingBuffer->size()); - ssize_t received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0); - nb = received_bytes; + + int received_bytes = recv(fd, &buf[pReceivingBuffer->size()], MAX_RECV_DATA, 0); + if (received_bytes <= 0) { + DBG_MSG(("Received %d bytes from %d; error = %d\n", + received_bytes, fd, errno)); + m_purge.push_back(fd); + return false; + } + + unsigned int nb = received_bytes; + std::vector<char> last_msg_buf(buf.begin()+pReceivingBuffer->size(), + buf.begin()+pReceivingBuffer->size()+nb); DBG_MSG(("Recieved %d from %d\n", nb, fd)); - nb += pReceivingBuffer->size(); + nb += static_cast<unsigned int>(pReceivingBuffer->size()); DBG_MSG(("Recieved with buffer %d from %d\n", nb, fd)); - if (received_bytes > 0) { - unsigned int recieved_data = nb; + if (nb > 0) { // This is redundant if (isWebSocket(fd)) { const unsigned int data_length = - mWebSocketHandler.parseWebSocketDataLength(&buf[0], recieved_data); + mWebSocketHandler.parseWebSocketDataLength(&buf[0], nb); - DBG_MSG(("Received %d actual data length %d\n", - recieved_data, data_length)); + DBG_MSG(("Received %d actual data length %d\n", nb, data_length)); - if (data_length > recieved_data) { - DBG_MSG_ERROR(("Received %d actual data length %d\n", - recieved_data, data_length)); + if (data_length > nb) { + DBG_MSG_ERROR(("Received %d actual data length %d\n", nb, data_length)); DBG_MSG_ERROR(("Incomplete message")); *pReceivingBuffer = std::string(&buf[0], nb); return false; } - unsigned int b_size = static_cast<unsigned int>(nb); - mWebSocketHandler.parseWebSocketData(&buf[0], b_size); - nb = b_size; + mWebSocketHandler.parseWebSocketData(&buf[0], nb); } *pReceivingBuffer = std::string(&buf[0], nb); - DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s", + DBG_MSG(("pReceivingBuffer before onMessageReceived:%d : %s\n", pReceivingBuffer->size(), pReceivingBuffer->c_str())); - // we need to check websocket clients here + + // we need to check for websocket handshake if (!checkWebSocketHandShake(fd, pReceivingBuffer)) { //JSON MESSAGE received. Send data in CMessageBroker. if (mpMessageBroker) { - mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer); + size_t buffer_size_before = pReceivingBuffer->size(); + mpMessageBroker->onMessageReceived(fd, *pReceivingBuffer, true); + + if (buffer_was_not_empty && (pReceivingBuffer->size() == buffer_size_before)) { + /* We couldn't parse the buffer (with the last message at the end) + * Try to parse ONLY the last message */ + DBG_MSG_ERROR(("Couldn't parse the whole buffer! Try only the last message.\n")); + + nb = static_cast<unsigned int>(last_msg_buf.size()); + if (isWebSocket(fd)) { + const unsigned int data_length = + mWebSocketHandler.parseWebSocketDataLength(&last_msg_buf[0], nb); + if (data_length > nb) { + DBG_MSG_ERROR(("The last message may be incomplete. Don't do anything.\n")); + /* Should we replace the buffer with the last message? + * Probably not. It may not be a real websocket message. + * Wait for a full message. */ + return false; + } + mWebSocketHandler.parseWebSocketData(&last_msg_buf[0], nb); + } + + std::string last_message = std::string(&last_msg_buf[0], nb); + buffer_size_before = last_message.size(); + mpMessageBroker->onMessageReceived(fd, last_message, false); + if ( last_message.size() < buffer_size_before ) { + /* Parsing last message successful! Discard the old data and + * keep only what is left from the last message */ + DBG_MSG_ERROR(("Parsing last message successful! Discard the old data.\n")); + *pReceivingBuffer = last_message; + } + } } else { return false; } - } else { // client is a websocket - std::string handshakeResponse = - "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n" - "Connection: Upgrade\r\nSec-WebSocket-Accept: "; + } else { // message is a websocket handshake ssize_t webSocketKeyPos = pReceivingBuffer->find("Sec-WebSocket-Key: "); if (-1 != webSocketKeyPos) { + std::string handshakeResponse = + "HTTP/1.1 101 Switching Protocols\r\nUpgrade: WebSocket\r\n" + "Connection: Upgrade\r\nSec-WebSocket-Accept: "; std::string wsKey = pReceivingBuffer->substr(webSocketKeyPos + 19, 24); mWebSocketHandler.handshake_0405(wsKey); handshakeResponse += wsKey; @@ -126,15 +164,8 @@ bool TcpServer::Recv(int fd) { m_WebSocketClients.push_back(fd); } } - - return true; - } - else { - DBG_MSG(("Received %d bytes from %d; error = %d\n", - received_bytes, fd, errno)); - m_purge.push_back(fd); - return false; } + return true; } bool TcpServer::checkWebSocketHandShake(int fd, std::string* pReceivingBuffer) { @@ -228,9 +259,9 @@ void TcpServer::WaitMessage(uint32_t ms) { itr = m_receivingBuffers.find((*it)); if (itr != m_receivingBuffers.end()) { // delete receiving buffer of disconnected client + mpMessageBroker->OnSocketClosed(itr->first); delete itr->second; m_receivingBuffers.erase(itr); - mpMessageBroker->OnSocketClosed(itr->first); } } |