From 66ac7b46fab85f175aec601cb48ea05408a1c186 Mon Sep 17 00:00:00 2001 From: wangfan Date: Thu, 24 Feb 2022 12:14:45 +0800 Subject: THRIFT-5494 fix cpu full caused by infinite select() when frameSize < maxReadBufferBytes but readBufferBytesAllocated.get() + frameSize always greater than MAX_READ_BUFFER_BYTES Client: Java Patch: wangfan This closes #2533 --- .../thrift/server/AbstractNonblockingServer.java | 5 +++-- .../apache/thrift/transport/TEndpointTransport.java | 7 ++++++- .../thrift/transport/TNonblockingServerSocket.java | 21 +++++++++++++++++++-- .../apache/thrift/transport/TServerTransport.java | 8 ++++++++ 4 files changed, 36 insertions(+), 5 deletions(-) diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index f91e8254f..beef954ef 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -352,9 +352,10 @@ public abstract class AbstractNonblockingServer extends TServer { // if this frame will always be too large for this server, log the // error and close the connection. - if (frameSize > MAX_READ_BUFFER_BYTES) { + if (frameSize > trans_.getMaxFrameSize()) { LOGGER.error("Read a frame size of " + frameSize - + ", which is bigger than the maximum allowable buffer size for ALL connections."); + + ", which is bigger than the maximum allowable frame size " + + trans_.getMaxFrameSize() + " for ALL connections."); return false; } diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java index f32efae90..f33b8b72d 100644 --- a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java @@ -26,15 +26,20 @@ public abstract class TEndpointTransport extends TTransport{ protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); } + public int getMaxFrameSize() { return getConfiguration().getMaxFrameSize(); } + + public void setMaxFrameSize(int maxFrameSize) { getConfiguration().setMaxFrameSize(maxFrameSize); } + protected long knownMessageSize; protected long remainingMessageSize; private TConfiguration _configuration; + public TConfiguration getConfiguration() { return _configuration; } - public TEndpointTransport( TConfiguration config) throws TTransportException { + public TEndpointTransport(TConfiguration config) throws TTransportException { _configuration = Objects.isNull(config) ? new TConfiguration() : config; resetConsumedMessageSize(-1); diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java index 163189233..535fd6f51 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java @@ -30,6 +30,8 @@ import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import org.apache.thrift.TConfiguration; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +56,11 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { */ private int clientTimeout_ = 0; + /** + * Limit for client sockets request size + */ + private int maxFrameSize_ = 0; + public static class NonblockingAbstractServerSocketArgs extends AbstractServerTransportArgs {} @@ -68,7 +75,11 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { * Creates just a port listening server socket */ public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException { - this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout)); + this(port, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + public TNonblockingServerSocket(int port, int clientTimeout, int maxFrameSize) throws TTransportException { + this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize)); } public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException { @@ -76,11 +87,16 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { } public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { - this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout)); + this(bindAddr, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, int maxFrameSize) throws TTransportException { + this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize)); } public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException { clientTimeout_ = args.clientTimeout; + maxFrameSize_ = args.maxFrameSize; try { serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); @@ -121,6 +137,7 @@ public class TNonblockingServerSocket extends TNonblockingServerTransport { TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); tsocket.setTimeout(clientTimeout_); + tsocket.setMaxFrameSize(maxFrameSize_); return tsocket; } catch (IOException iox) { throw new TTransportException(iox); diff --git a/lib/java/src/org/apache/thrift/transport/TServerTransport.java b/lib/java/src/org/apache/thrift/transport/TServerTransport.java index 55ef0c4c4..3a7b49a31 100644 --- a/lib/java/src/org/apache/thrift/transport/TServerTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TServerTransport.java @@ -22,6 +22,8 @@ package org.apache.thrift.transport; import java.io.Closeable; import java.net.InetSocketAddress; +import org.apache.thrift.TConfiguration; + /** * Server transport. Object which provides client transports. * @@ -32,6 +34,7 @@ public abstract class TServerTransport implements Closeable { int backlog = 0; // A value of 0 means the default value will be used (currently set at 50) int clientTimeout = 0; InetSocketAddress bindAddr; + int maxFrameSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE; public T backlog(int backlog) { this.backlog = backlog; @@ -52,6 +55,11 @@ public abstract class TServerTransport implements Closeable { this.bindAddr = bindAddr; return (T) this; } + + public T maxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; + return (T) this; + } } public abstract void listen() throws TTransportException; -- cgit v1.2.1