summaryrefslogtreecommitdiff
path: root/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java')
-rw-r--r--lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java233
1 files changed, 99 insertions, 134 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
index beef954ef..44341d909 100644
--- a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -19,37 +19,35 @@
package org.apache.thrift.server;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Provides common methods and classes used by nonblocking TServer
- * implementations.
- */
+/** Provides common methods and classes used by nonblocking TServer implementations. */
public abstract class AbstractNonblockingServer extends TServer {
protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
- public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
+ public abstract static class AbstractNonblockingServerArgs<
+ T extends AbstractNonblockingServerArgs<T>>
+ extends AbstractServerArgs<T> {
public long maxReadBufferBytes = 256 * 1024 * 1024;
public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
@@ -59,15 +57,13 @@ public abstract class AbstractNonblockingServer extends TServer {
}
/**
- * The maximum amount of memory we will allocate to client IO buffers at a
- * time. Without this limit, the server will gladly allocate client buffers
- * right into an out of memory exception, rather than waiting.
+ * The maximum amount of memory we will allocate to client IO buffers at a time. Without this
+ * limit, the server will gladly allocate client buffers right into an out of memory exception,
+ * rather than waiting.
*/
final long MAX_READ_BUFFER_BYTES;
- /**
- * How many bytes are currently allocated to read buffers.
- */
+ /** How many bytes are currently allocated to read buffers. */
final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
@@ -75,9 +71,7 @@ public abstract class AbstractNonblockingServer extends TServer {
MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
}
- /**
- * Begin accepting connections and processing invocations.
- */
+ /** Begin accepting connections and processing invocations. */
public void serve() {
// start any IO threads
if (!startThreads()) {
@@ -107,17 +101,13 @@ public abstract class AbstractNonblockingServer extends TServer {
*/
protected abstract boolean startThreads();
- /**
- * A method that will block until when threads handling the serving have been
- * shut down.
- */
+ /** A method that will block until when threads handling the serving have been shut down. */
protected abstract void waitForShutdown();
/**
* Have the server transport start accepting connections.
*
- * @return true if we started listening successfully, false if something went
- * wrong.
+ * @return true if we started listening successfully, false if something went wrong.
*/
protected boolean startListening() {
try {
@@ -129,27 +119,23 @@ public abstract class AbstractNonblockingServer extends TServer {
}
}
- /**
- * Stop listening for connections.
- */
+ /** Stop listening for connections. */
protected void stopListening() {
serverTransport_.close();
}
/**
- * Perform an invocation. This method could behave several different ways -
- * invoke immediately inline, queue for separate execution, etc.
+ * Perform an invocation. This method could behave several different ways - invoke immediately
+ * inline, queue for separate execution, etc.
*
- * @return true if invocation was successfully requested, which is not a
- * guarantee that invocation has completed. False if the request
- * failed.
+ * @return true if invocation was successfully requested, which is not a guarantee that invocation
+ * has completed. False if the request failed.
*/
protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
/**
- * An abstract thread that handles selecting on a set of transports and
- * {@link FrameBuffer FrameBuffers} associated with selected keys
- * corresponding to requests.
+ * An abstract thread that handles selecting on a set of transports and {@link FrameBuffer
+ * FrameBuffers} associated with selected keys corresponding to requests.
*/
protected abstract class AbstractSelectThread extends Thread {
protected Selector selector;
@@ -161,17 +147,15 @@ public abstract class AbstractNonblockingServer extends TServer {
this.selector = SelectorProvider.provider().openSelector();
}
- /**
- * If the selector is blocked, wake it up.
- */
+ /** If the selector is blocked, wake it up. */
public void wakeupSelector() {
selector.wakeup();
}
/**
- * Add FrameBuffer to the list of select interest changes and wake up the
- * selector if it's blocked. When the select() call exits, it'll give the
- * FrameBuffer a chance to change its interests.
+ * Add FrameBuffer to the list of select interest changes and wake up the selector if it's
+ * blocked. When the select() call exits, it'll give the FrameBuffer a chance to change its
+ * interests.
*/
public void requestSelectInterestChange(FrameBuffer frameBuffer) {
synchronized (selectInterestChanges) {
@@ -182,8 +166,8 @@ public abstract class AbstractNonblockingServer extends TServer {
}
/**
- * Check to see if there are any FrameBuffers that have switched their
- * interest type from read to write or vice versa.
+ * Check to see if there are any FrameBuffers that have switched their interest type from read
+ * to write or vice versa.
*/
protected void processInterestChanges() {
synchronized (selectInterestChanges) {
@@ -195,8 +179,8 @@ public abstract class AbstractNonblockingServer extends TServer {
}
/**
- * Do the work required to read from a readable client. If the frame is
- * fully read, then invoke the method call.
+ * Do the work required to read from a readable client. If the frame is fully read, then invoke
+ * the method call.
*/
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
@@ -213,9 +197,7 @@ public abstract class AbstractNonblockingServer extends TServer {
}
}
- /**
- * Let a writable client get written, if there's data to be written.
- */
+ /** Let a writable client get written, if there's data to be written. */
protected void handleWrite(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.write()) {
@@ -223,9 +205,7 @@ public abstract class AbstractNonblockingServer extends TServer {
}
}
- /**
- * Do connection-close cleanup on a given SelectionKey.
- */
+ /** Do connection-close cleanup on a given SelectionKey. */
protected void cleanupSelectionKey(SelectionKey key) {
// remove the records from the two maps
FrameBuffer buffer = (FrameBuffer) key.attachment();
@@ -238,9 +218,7 @@ public abstract class AbstractNonblockingServer extends TServer {
}
} // SelectThread
- /**
- * Possible states for the FrameBuffer state machine.
- */
+ /** Possible states for the FrameBuffer state machine. */
private enum FrameBufferState {
// in the midst of reading the frame size off the wire
READING_FRAME_SIZE,
@@ -260,13 +238,12 @@ public abstract class AbstractNonblockingServer extends TServer {
}
/**
- * Class that implements a sort of state machine around the interaction with a
- * client and an invoker. It manages reading the frame size and frame data,
- * getting it handed off as wrapped transports, and then the writing of
- * response data back to the client. In the process it manages flipping the
- * read and write bits on the selection key for its client.
+ * Class that implements a sort of state machine around the interaction with a client and an
+ * invoker. It manages reading the frame size and frame data, getting it handed off as wrapped
+ * transports, and then the writing of response data back to the client. In the process it manages
+ * flipping the read and write bits on the selection key for its client.
*/
- public class FrameBuffer {
+ public class FrameBuffer {
private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
// the actual transport hooked up to the client.
@@ -303,9 +280,11 @@ public abstract class AbstractNonblockingServer extends TServer {
// context associated with this connection
protected final ServerContext context_;
- public FrameBuffer(final TNonblockingTransport trans,
+ public FrameBuffer(
+ final TNonblockingTransport trans,
final SelectionKey selectionKey,
- final AbstractSelectThread selectThread) throws TTransportException {
+ final AbstractSelectThread selectThread)
+ throws TTransportException {
trans_ = trans;
selectionKey_ = selectionKey;
selectThread_ = selectThread;
@@ -321,16 +300,15 @@ public abstract class AbstractNonblockingServer extends TServer {
if (eventHandler_ != null) {
context_ = eventHandler_.createContext(inProt_, outProt_);
} else {
- context_ = null;
+ context_ = null;
}
}
/**
- * Give this FrameBuffer a chance to read. The selector loop should have
- * received a read event for this FrameBuffer.
+ * Give this FrameBuffer a chance to read. The selector loop should have received a read event
+ * for this FrameBuffer.
*
- * @return true if the connection should live on, false if it should be
- * closed
+ * @return true if the connection should live on, false if it should be closed
*/
public boolean read() {
if (state_ == FrameBufferState.READING_FRAME_SIZE) {
@@ -345,17 +323,22 @@ public abstract class AbstractNonblockingServer extends TServer {
// pull out the frame size as an integer.
int frameSize = buffer_.getInt(0);
if (frameSize <= 0) {
- LOGGER.error("Read an invalid frame size of " + frameSize
- + ". Are you using TFramedTransport on the client side?");
+ LOGGER.error(
+ "Read an invalid frame size of "
+ + frameSize
+ + ". Are you using TFramedTransport on the client side?");
return false;
}
// if this frame will always be too large for this server, log the
// error and close the connection.
if (frameSize > trans_.getMaxFrameSize()) {
- LOGGER.error("Read a frame size of " + frameSize
- + ", which is bigger than the maximum allowable frame size "
- + trans_.getMaxFrameSize() + " for ALL connections.");
+ LOGGER.error(
+ "Read a frame size of "
+ + frameSize
+ + ", which is bigger than the maximum allowable frame size "
+ + trans_.getMaxFrameSize()
+ + " for ALL connections.");
return false;
}
@@ -406,9 +389,7 @@ public abstract class AbstractNonblockingServer extends TServer {
return false;
}
- /**
- * Give this FrameBuffer a chance to write its output to the final client.
- */
+ /** Give this FrameBuffer a chance to write its output to the final client. */
public boolean write() {
if (state_ == FrameBufferState.WRITING) {
try {
@@ -431,40 +412,33 @@ public abstract class AbstractNonblockingServer extends TServer {
return false;
}
- /**
- * Give this FrameBuffer a chance to set its interest to write, once data
- * has come in.
- */
+ /** Give this FrameBuffer a chance to set its interest to write, once data has come in. */
public void changeSelectInterests() {
switch (state_) {
- case AWAITING_REGISTER_WRITE:
- // set the OP_WRITE interest
- selectionKey_.interestOps(SelectionKey.OP_WRITE);
- state_ = FrameBufferState.WRITING;
- break;
- case AWAITING_REGISTER_READ:
- prepareRead();
- break;
- case AWAITING_CLOSE:
- close();
- selectionKey_.cancel();
- break;
- default:
- LOGGER.error(
- "changeSelectInterest was called, but state is invalid ({})",
- state_);
+ case AWAITING_REGISTER_WRITE:
+ // set the OP_WRITE interest
+ selectionKey_.interestOps(SelectionKey.OP_WRITE);
+ state_ = FrameBufferState.WRITING;
+ break;
+ case AWAITING_REGISTER_READ:
+ prepareRead();
+ break;
+ case AWAITING_CLOSE:
+ close();
+ selectionKey_.cancel();
+ break;
+ default:
+ LOGGER.error("changeSelectInterest was called, but state is invalid ({})", state_);
}
}
- /**
- * Shut the connection down.
- */
+ /** Shut the connection down. */
public void close() {
// if we're being closed due to an error, we might have allocated a
// buffer that we need to subtract for our memory accounting.
- if (state_ == FrameBufferState.READING_FRAME ||
- state_ == FrameBufferState.READ_FRAME_COMPLETE ||
- state_ == FrameBufferState.AWAITING_CLOSE) {
+ if (state_ == FrameBufferState.READING_FRAME
+ || state_ == FrameBufferState.READ_FRAME_COMPLETE
+ || state_ == FrameBufferState.AWAITING_CLOSE) {
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
}
trans_.close();
@@ -473,19 +447,16 @@ public abstract class AbstractNonblockingServer extends TServer {
}
}
- /**
- * Check if this FrameBuffer has a full frame read.
- */
+ /** Check if this FrameBuffer has a full frame read. */
public boolean isFrameFullyRead() {
return state_ == FrameBufferState.READ_FRAME_COMPLETE;
}
/**
- * After the processor has processed the invocation, whatever thread is
- * managing invocations should call this method on this FrameBuffer so we
- * know it's time to start trying to write again. Also, if it turns out that
- * there actually isn't any data in the response buffer, we'll skip trying
- * to write and instead go back to reading.
+ * After the processor has processed the invocation, whatever thread is managing invocations
+ * should call this method on this FrameBuffer so we know it's time to start trying to write
+ * again. Also, if it turns out that there actually isn't any data in the response buffer, we'll
+ * skip trying to write and instead go back to reading.
*/
public void responseReady() {
// the read buffer is definitely no longer in use, so we will decrement
@@ -511,9 +482,7 @@ public abstract class AbstractNonblockingServer extends TServer {
requestSelectInterestChange();
}
- /**
- * Actually invoke the method signified by this FrameBuffer.
- */
+ /** Actually invoke the method signified by this FrameBuffer. */
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
@@ -538,22 +507,18 @@ public abstract class AbstractNonblockingServer extends TServer {
/**
* Perform a read into buffer.
*
- * @return true if the read succeeded, false if there was an error or the
- * connection closed.
+ * @return true if the read succeeded, false if there was an error or the connection closed.
*/
private boolean internalRead() {
try {
- return trans_.read(buffer_) >= 0;
+ return trans_.read(buffer_) >= 0;
} catch (TTransportException e) {
LOGGER.warn("Got an Exception in internalRead", e);
return false;
}
}
- /**
- * We're done writing, so reset our interest ops and change state
- * accordingly.
- */
+ /** We're done writing, so reset our interest ops and change state accordingly. */
private void prepareRead() {
// we can set our interest directly without using the queue because
// we're in the select thread.
@@ -564,11 +529,10 @@ public abstract class AbstractNonblockingServer extends TServer {
}
/**
- * When this FrameBuffer needs to change its select interests and execution
- * might not be in its select thread, then this method will make sure the
- * interest change gets done when the select thread wakes back up. When the
- * current thread is this FrameBuffer's select thread, then it just does the
- * interest change immediately.
+ * When this FrameBuffer needs to change its select interests and execution might not be in its
+ * select thread, then this method will make sure the interest change gets done when the select
+ * thread wakes back up. When the current thread is this FrameBuffer's select thread, then it
+ * just does the interest change immediately.
*/
protected void requestSelectInterestChange() {
if (Thread.currentThread() == this.selectThread_) {
@@ -580,19 +544,20 @@ public abstract class AbstractNonblockingServer extends TServer {
} // FrameBuffer
public class AsyncFrameBuffer extends FrameBuffer {
- public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) throws TTransportException {
+ public AsyncFrameBuffer(
+ TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread)
+ throws TTransportException {
super(trans, selectionKey, selectThread);
}
public TProtocol getInputProtocol() {
- return inProt_;
+ return inProt_;
}
public TProtocol getOutputProtocol() {
return outProt_;
}
-
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
@@ -601,7 +566,7 @@ public abstract class AbstractNonblockingServer extends TServer {
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
}
- ((TAsyncProcessor)processorFactory_.getProcessor(inTrans_)).process(this);
+ ((TAsyncProcessor) processorFactory_.getProcessor(inTrans_)).process(this);
return;
} catch (TException te) {
LOGGER.warn("Exception while invoking!", te);