summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTamas Kovacs <tamas.2.kovacs@nokia-sbell.com>2022-02-11 19:31:40 +0800
committerJens Geyer <Jens-G@users.noreply.github.com>2022-03-14 12:36:23 +0100
commitb941b1124834d38daaa0e4355655b4ce63b80d3e (patch)
treec98ae9f2beaa6e10d919136a4976e03e8f7b2e29
parentbbea728aaa9f72bb3b58a1c5448b4e917eaf5796 (diff)
downloadthrift-b941b1124834d38daaa0e4355655b4ce63b80d3e.tar.gz
THRIFT-5515: TConnection::workSocket reads all pending oneway requests.
-rw-r--r--lib/cpp/src/thrift/server/TNonblockingServer.cpp251
1 files changed, 129 insertions, 122 deletions
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index f2b3e708b..ae92da3af 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -419,154 +419,161 @@ void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket)
}
void TNonblockingServer::TConnection::workSocket() {
- int got = 0, left = 0, sent = 0;
- uint32_t fetch = 0;
-
- switch (socketState_) {
- case SOCKET_RECV_FRAMING:
- union {
- uint8_t buf[sizeof(uint32_t)];
- uint32_t size;
- } framing;
-
- // if we've already received some bytes we kept them here
- framing.size = readWant_;
- // determine size of this frame
- try {
- // Read from the socket
- fetch = tSocket_->read(&framing.buf[readBufferPos_],
- uint32_t(sizeof(framing.size) - readBufferPos_));
- if (fetch == 0) {
- // Whenever we get here it means a remote disconnect
- close();
- return;
+ while (true) {
+ int got = 0, left = 0, sent = 0;
+ uint32_t fetch = 0;
+
+ switch (socketState_) {
+ case SOCKET_RECV_FRAMING:
+ union {
+ uint8_t buf[sizeof(uint32_t)];
+ uint32_t size;
+ } framing;
+
+ // if we've already received some bytes we kept them here
+ framing.size = readWant_;
+ // determine size of this frame
+ try {
+ // Read from the socket
+ fetch = tSocket_->read(&framing.buf[readBufferPos_],
+ uint32_t(sizeof(framing.size) - readBufferPos_));
+ if (fetch == 0) {
+ // Whenever we get here it means a remote disconnect
+ close();
+ return;
+ }
+ readBufferPos_ += fetch;
+ } catch (TTransportException& te) {
+ //In Nonblocking SSLSocket some operations need to be retried again.
+ //Current approach is parsing exception message, but a better solution needs to be investigated.
+ if(!strstr(te.what(), "retry")) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
+ }
}
- readBufferPos_ += fetch;
- } catch (TTransportException& te) {
- //In Nonblocking SSLSocket some operations need to be retried again.
- //Current approach is parsing exception message, but a better solution needs to be investigated.
- if(!strstr(te.what(), "retry")) {
- GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
- close();
+ if (readBufferPos_ < sizeof(framing.size)) {
+ // more needed before frame size is known -- save what we have so far
+ readWant_ = framing.size;
return;
}
- }
- if (readBufferPos_ < sizeof(framing.size)) {
- // more needed before frame size is known -- save what we have so far
- readWant_ = framing.size;
- return;
- }
-
- readWant_ = ntohl(framing.size);
- if (readWant_ > server_->getMaxFrameSize()) {
- // Don't allow giant frame sizes. This prevents bad clients from
- // causing us to try and allocate a giant buffer.
- GlobalOutput.printf(
- "TNonblockingServer: frame size too large "
- "(%" PRIu32 " > %" PRIu64
- ") from client %s. "
- "Remote side not using TFramedTransport?",
- readWant_,
- (uint64_t)server_->getMaxFrameSize(),
- tSocket_->getSocketInfo().c_str());
- close();
- return;
- }
- // size known; now get the rest of the frame
- transition();
-
- // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
- // regular sockets, because if there is more data, libevent will fire the event handler registered for read
- // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
- // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
- // that case, not having this workSocket() call here would result in a hang as we will never get to work the socket,
- // despite having more data.
- if (tSocket_->hasPendingDataToRead())
- {
- workSocket();
- }
+ readWant_ = ntohl(framing.size);
+ if (readWant_ > server_->getMaxFrameSize()) {
+ // Don't allow giant frame sizes. This prevents bad clients from
+ // causing us to try and allocate a giant buffer.
+ GlobalOutput.printf(
+ "TNonblockingServer: frame size too large "
+ "(%" PRIu32 " > %" PRIu64
+ ") from client %s. "
+ "Remote side not using TFramedTransport?",
+ readWant_,
+ (uint64_t)server_->getMaxFrameSize(),
+ tSocket_->getSocketInfo().c_str());
+ close();
+ return;
+ }
+ // size known; now get the rest of the frame
+ transition();
- return;
+ // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
+ // regular sockets, because if there is more data, libevent will fire the event handler registered for read
+ // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
+ // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
+ // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket,
+ // despite having more data.
+ if (tSocket_->hasPendingDataToRead())
+ {
+ continue;
+ }
- case SOCKET_RECV:
- // It is an error to be in this state if we already have all the data
- if (!(readBufferPos_ < readWant_)) {
- GlobalOutput.printf("TNonblockingServer: frame size too short");
- close();
return;
- }
- try {
- // Read from the socket
- fetch = readWant_ - readBufferPos_;
- got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
- } catch (TTransportException& te) {
- //In Nonblocking SSLSocket some operations need to be retried again.
- //Current approach is parsing exception message, but a better solution needs to be investigated.
- if(!strstr(te.what(), "retry")) {
- GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ if (!(readBufferPos_ < readWant_)) {
+ GlobalOutput.printf("TNonblockingServer: frame size too short");
close();
+ return;
}
- return;
- }
+ try {
+ // Read from the socket
+ fetch = readWant_ - readBufferPos_;
+ got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+ } catch (TTransportException& te) {
+ //In Nonblocking SSLSocket some operations need to be retried again.
+ //Current approach is parsing exception message, but a better solution needs to be investigated.
+ if(!strstr(te.what(), "retry")) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+ }
- if (got > 0) {
- // Move along in the buffer
- readBufferPos_ += got;
+ return;
+ }
- // Check that we did not overdo it
- assert(readBufferPos_ <= readWant_);
+ if (got > 0) {
+ // Move along in the buffer
+ readBufferPos_ += got;
- // We are done reading, move onto the next state
- if (readBufferPos_ == readWant_) {
- transition();
+ // Check that we did not overdo it
+ assert(readBufferPos_ <= readWant_);
+
+ // We are done reading, move onto the next state
+ if (readBufferPos_ == readWant_) {
+ transition();
+ if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead())
+ {
+ continue;
+ }
+ }
+ return;
}
- return;
- }
- // Whenever we get down here it means a remote disconnect
- close();
+ // Whenever we get down here it means a remote disconnect
+ close();
- return;
+ return;
- case SOCKET_SEND:
- // Should never have position past size
- assert(writeBufferPos_ <= writeBufferSize_);
+ case SOCKET_SEND:
+ // Should never have position past size
+ assert(writeBufferPos_ <= writeBufferSize_);
- // If there is no data to send, then let us move on
- if (writeBufferPos_ == writeBufferSize_) {
- GlobalOutput("WARNING: Send state with no data to send");
- transition();
- return;
- }
+ // If there is no data to send, then let us move on
+ if (writeBufferPos_ == writeBufferSize_) {
+ GlobalOutput("WARNING: Send state with no data to send");
+ transition();
+ return;
+ }
- try {
- left = writeBufferSize_ - writeBufferPos_;
- sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
- } catch (TTransportException& te) {
- GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
- close();
- return;
- }
+ try {
+ left = writeBufferSize_ - writeBufferPos_;
+ sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+ } catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+ close();
+ return;
+ }
- writeBufferPos_ += sent;
+ writeBufferPos_ += sent;
- // Did we overdo it?
- assert(writeBufferPos_ <= writeBufferSize_);
+ // Did we overdo it?
+ assert(writeBufferPos_ <= writeBufferSize_);
- // We are done!
- if (writeBufferPos_ == writeBufferSize_) {
- transition();
- }
+ // We are done!
+ if (writeBufferPos_ == writeBufferSize_) {
+ transition();
+ }
- return;
+ return;
- default:
- GlobalOutput.printf("Unexpected Socket State %d", socketState_);
- assert(0);
+ default:
+ GlobalOutput.printf("Unexpected Socket State %d", socketState_);
+ assert(0);
+ return;
+ }
}
}