diff options
author | Tamas Kovacs <tamas.2.kovacs@nokia-sbell.com> | 2022-02-11 19:31:40 +0800 |
---|---|---|
committer | Jens Geyer <Jens-G@users.noreply.github.com> | 2022-03-14 12:36:23 +0100 |
commit | b941b1124834d38daaa0e4355655b4ce63b80d3e (patch) | |
tree | c98ae9f2beaa6e10d919136a4976e03e8f7b2e29 | |
parent | bbea728aaa9f72bb3b58a1c5448b4e917eaf5796 (diff) | |
download | thrift-b941b1124834d38daaa0e4355655b4ce63b80d3e.tar.gz |
THRIFT-5515: TConnection::workSocket reads all pending oneway requests.
-rw-r--r-- | lib/cpp/src/thrift/server/TNonblockingServer.cpp | 251 |
1 files changed, 129 insertions, 122 deletions
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp index f2b3e708b..ae92da3af 100644 --- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp +++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp @@ -419,154 +419,161 @@ void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) } void TNonblockingServer::TConnection::workSocket() { - int got = 0, left = 0, sent = 0; - uint32_t fetch = 0; - - switch (socketState_) { - case SOCKET_RECV_FRAMING: - union { - uint8_t buf[sizeof(uint32_t)]; - uint32_t size; - } framing; - - // if we've already received some bytes we kept them here - framing.size = readWant_; - // determine size of this frame - try { - // Read from the socket - fetch = tSocket_->read(&framing.buf[readBufferPos_], - uint32_t(sizeof(framing.size) - readBufferPos_)); - if (fetch == 0) { - // Whenever we get here it means a remote disconnect - close(); - return; + while (true) { + int got = 0, left = 0, sent = 0; + uint32_t fetch = 0; + + switch (socketState_) { + case SOCKET_RECV_FRAMING: + union { + uint8_t buf[sizeof(uint32_t)]; + uint32_t size; + } framing; + + // if we've already received some bytes we kept them here + framing.size = readWant_; + // determine size of this frame + try { + // Read from the socket + fetch = tSocket_->read(&framing.buf[readBufferPos_], + uint32_t(sizeof(framing.size) - readBufferPos_)); + if (fetch == 0) { + // Whenever we get here it means a remote disconnect + close(); + return; + } + readBufferPos_ += fetch; + } catch (TTransportException& te) { + //In Nonblocking SSLSocket some operations need to be retried again. + //Current approach is parsing exception message, but a better solution needs to be investigated. + if(!strstr(te.what(), "retry")) { + GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); + close(); + + return; + } } - readBufferPos_ += fetch; - } catch (TTransportException& te) { - //In Nonblocking SSLSocket some operations need to be retried again. - //Current approach is parsing exception message, but a better solution needs to be investigated. - if(!strstr(te.what(), "retry")) { - GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); - close(); + if (readBufferPos_ < sizeof(framing.size)) { + // more needed before frame size is known -- save what we have so far + readWant_ = framing.size; return; } - } - if (readBufferPos_ < sizeof(framing.size)) { - // more needed before frame size is known -- save what we have so far - readWant_ = framing.size; - return; - } - - readWant_ = ntohl(framing.size); - if (readWant_ > server_->getMaxFrameSize()) { - // Don't allow giant frame sizes. This prevents bad clients from - // causing us to try and allocate a giant buffer. - GlobalOutput.printf( - "TNonblockingServer: frame size too large " - "(%" PRIu32 " > %" PRIu64 - ") from client %s. " - "Remote side not using TFramedTransport?", - readWant_, - (uint64_t)server_->getMaxFrameSize(), - tSocket_->getSocketInfo().c_str()); - close(); - return; - } - // size known; now get the rest of the frame - transition(); - - // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for - // regular sockets, because if there is more data, libevent will fire the event handler registered for read - // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the - // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In - // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket, - // despite having more data. - if (tSocket_->hasPendingDataToRead()) - { - workSocket(); - } + readWant_ = ntohl(framing.size); + if (readWant_ > server_->getMaxFrameSize()) { + // Don't allow giant frame sizes. This prevents bad clients from + // causing us to try and allocate a giant buffer. + GlobalOutput.printf( + "TNonblockingServer: frame size too large " + "(%" PRIu32 " > %" PRIu64 + ") from client %s. " + "Remote side not using TFramedTransport?", + readWant_, + (uint64_t)server_->getMaxFrameSize(), + tSocket_->getSocketInfo().c_str()); + close(); + return; + } + // size known; now get the rest of the frame + transition(); - return; + // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for + // regular sockets, because if there is more data, libevent will fire the event handler registered for read + // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the + // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In + // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket, + // despite having more data. + if (tSocket_->hasPendingDataToRead()) + { + continue; + } - case SOCKET_RECV: - // It is an error to be in this state if we already have all the data - if (!(readBufferPos_ < readWant_)) { - GlobalOutput.printf("TNonblockingServer: frame size too short"); - close(); return; - } - try { - // Read from the socket - fetch = readWant_ - readBufferPos_; - got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); - } catch (TTransportException& te) { - //In Nonblocking SSLSocket some operations need to be retried again. - //Current approach is parsing exception message, but a better solution needs to be investigated. - if(!strstr(te.what(), "retry")) { - GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); + case SOCKET_RECV: + // It is an error to be in this state if we already have all the data + if (!(readBufferPos_ < readWant_)) { + GlobalOutput.printf("TNonblockingServer: frame size too short"); close(); + return; } - return; - } + try { + // Read from the socket + fetch = readWant_ - readBufferPos_; + got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); + } catch (TTransportException& te) { + //In Nonblocking SSLSocket some operations need to be retried again. + //Current approach is parsing exception message, but a better solution needs to be investigated. + if(!strstr(te.what(), "retry")) { + GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); + close(); + } - if (got > 0) { - // Move along in the buffer - readBufferPos_ += got; + return; + } - // Check that we did not overdo it - assert(readBufferPos_ <= readWant_); + if (got > 0) { + // Move along in the buffer + readBufferPos_ += got; - // We are done reading, move onto the next state - if (readBufferPos_ == readWant_) { - transition(); + // Check that we did not overdo it + assert(readBufferPos_ <= readWant_); + + // We are done reading, move onto the next state + if (readBufferPos_ == readWant_) { + transition(); + if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead()) + { + continue; + } + } + return; } - return; - } - // Whenever we get down here it means a remote disconnect - close(); + // Whenever we get down here it means a remote disconnect + close(); - return; + return; - case SOCKET_SEND: - // Should never have position past size - assert(writeBufferPos_ <= writeBufferSize_); + case SOCKET_SEND: + // Should never have position past size + assert(writeBufferPos_ <= writeBufferSize_); - // If there is no data to send, then let us move on - if (writeBufferPos_ == writeBufferSize_) { - GlobalOutput("WARNING: Send state with no data to send"); - transition(); - return; - } + // If there is no data to send, then let us move on + if (writeBufferPos_ == writeBufferSize_) { + GlobalOutput("WARNING: Send state with no data to send"); + transition(); + return; + } - try { - left = writeBufferSize_ - writeBufferPos_; - sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left); - } catch (TTransportException& te) { - GlobalOutput.printf("TConnection::workSocket(): %s ", te.what()); - close(); - return; - } + try { + left = writeBufferSize_ - writeBufferPos_; + sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left); + } catch (TTransportException& te) { + GlobalOutput.printf("TConnection::workSocket(): %s ", te.what()); + close(); + return; + } - writeBufferPos_ += sent; + writeBufferPos_ += sent; - // Did we overdo it? - assert(writeBufferPos_ <= writeBufferSize_); + // Did we overdo it? + assert(writeBufferPos_ <= writeBufferSize_); - // We are done! - if (writeBufferPos_ == writeBufferSize_) { - transition(); - } + // We are done! + if (writeBufferPos_ == writeBufferSize_) { + transition(); + } - return; + return; - default: - GlobalOutput.printf("Unexpected Socket State %d", socketState_); - assert(0); + default: + GlobalOutput.printf("Unexpected Socket State %d", socketState_); + assert(0); + return; + } } } |