diff options
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java')
-rw-r--r-- | lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java | 233 |
1 files changed, 99 insertions, 134 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java index beef954ef..44341d909 100644 --- a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java @@ -19,37 +19,35 @@ package org.apache.thrift.server; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TMemoryInputTransport; import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.layered.TFramedTransport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.spi.SelectorProvider; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Provides common methods and classes used by nonblocking TServer - * implementations. - */ +/** Provides common methods and classes used by nonblocking TServer implementations. */ public abstract class AbstractNonblockingServer extends TServer { protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); - public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> { + public abstract static class AbstractNonblockingServerArgs< + T extends AbstractNonblockingServerArgs<T>> + extends AbstractServerArgs<T> { public long maxReadBufferBytes = 256 * 1024 * 1024; public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { @@ -59,15 +57,13 @@ public abstract class AbstractNonblockingServer extends TServer { } /** - * The maximum amount of memory we will allocate to client IO buffers at a - * time. Without this limit, the server will gladly allocate client buffers - * right into an out of memory exception, rather than waiting. + * The maximum amount of memory we will allocate to client IO buffers at a time. Without this + * limit, the server will gladly allocate client buffers right into an out of memory exception, + * rather than waiting. */ final long MAX_READ_BUFFER_BYTES; - /** - * How many bytes are currently allocated to read buffers. - */ + /** How many bytes are currently allocated to read buffers. */ final AtomicLong readBufferBytesAllocated = new AtomicLong(0); public AbstractNonblockingServer(AbstractNonblockingServerArgs args) { @@ -75,9 +71,7 @@ public abstract class AbstractNonblockingServer extends TServer { MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; } - /** - * Begin accepting connections and processing invocations. - */ + /** Begin accepting connections and processing invocations. */ public void serve() { // start any IO threads if (!startThreads()) { @@ -107,17 +101,13 @@ public abstract class AbstractNonblockingServer extends TServer { */ protected abstract boolean startThreads(); - /** - * A method that will block until when threads handling the serving have been - * shut down. - */ + /** A method that will block until when threads handling the serving have been shut down. */ protected abstract void waitForShutdown(); /** * Have the server transport start accepting connections. * - * @return true if we started listening successfully, false if something went - * wrong. + * @return true if we started listening successfully, false if something went wrong. */ protected boolean startListening() { try { @@ -129,27 +119,23 @@ public abstract class AbstractNonblockingServer extends TServer { } } - /** - * Stop listening for connections. - */ + /** Stop listening for connections. */ protected void stopListening() { serverTransport_.close(); } /** - * Perform an invocation. This method could behave several different ways - - * invoke immediately inline, queue for separate execution, etc. + * Perform an invocation. This method could behave several different ways - invoke immediately + * inline, queue for separate execution, etc. * - * @return true if invocation was successfully requested, which is not a - * guarantee that invocation has completed. False if the request - * failed. + * @return true if invocation was successfully requested, which is not a guarantee that invocation + * has completed. False if the request failed. */ protected abstract boolean requestInvoke(FrameBuffer frameBuffer); /** - * An abstract thread that handles selecting on a set of transports and - * {@link FrameBuffer FrameBuffers} associated with selected keys - * corresponding to requests. + * An abstract thread that handles selecting on a set of transports and {@link FrameBuffer + * FrameBuffers} associated with selected keys corresponding to requests. */ protected abstract class AbstractSelectThread extends Thread { protected Selector selector; @@ -161,17 +147,15 @@ public abstract class AbstractNonblockingServer extends TServer { this.selector = SelectorProvider.provider().openSelector(); } - /** - * If the selector is blocked, wake it up. - */ + /** If the selector is blocked, wake it up. */ public void wakeupSelector() { selector.wakeup(); } /** - * Add FrameBuffer to the list of select interest changes and wake up the - * selector if it's blocked. When the select() call exits, it'll give the - * FrameBuffer a chance to change its interests. + * Add FrameBuffer to the list of select interest changes and wake up the selector if it's + * blocked. When the select() call exits, it'll give the FrameBuffer a chance to change its + * interests. */ public void requestSelectInterestChange(FrameBuffer frameBuffer) { synchronized (selectInterestChanges) { @@ -182,8 +166,8 @@ public abstract class AbstractNonblockingServer extends TServer { } /** - * Check to see if there are any FrameBuffers that have switched their - * interest type from read to write or vice versa. + * Check to see if there are any FrameBuffers that have switched their interest type from read + * to write or vice versa. */ protected void processInterestChanges() { synchronized (selectInterestChanges) { @@ -195,8 +179,8 @@ public abstract class AbstractNonblockingServer extends TServer { } /** - * Do the work required to read from a readable client. If the frame is - * fully read, then invoke the method call. + * Do the work required to read from a readable client. If the frame is fully read, then invoke + * the method call. */ protected void handleRead(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); @@ -213,9 +197,7 @@ public abstract class AbstractNonblockingServer extends TServer { } } - /** - * Let a writable client get written, if there's data to be written. - */ + /** Let a writable client get written, if there's data to be written. */ protected void handleWrite(SelectionKey key) { FrameBuffer buffer = (FrameBuffer) key.attachment(); if (!buffer.write()) { @@ -223,9 +205,7 @@ public abstract class AbstractNonblockingServer extends TServer { } } - /** - * Do connection-close cleanup on a given SelectionKey. - */ + /** Do connection-close cleanup on a given SelectionKey. */ protected void cleanupSelectionKey(SelectionKey key) { // remove the records from the two maps FrameBuffer buffer = (FrameBuffer) key.attachment(); @@ -238,9 +218,7 @@ public abstract class AbstractNonblockingServer extends TServer { } } // SelectThread - /** - * Possible states for the FrameBuffer state machine. - */ + /** Possible states for the FrameBuffer state machine. */ private enum FrameBufferState { // in the midst of reading the frame size off the wire READING_FRAME_SIZE, @@ -260,13 +238,12 @@ public abstract class AbstractNonblockingServer extends TServer { } /** - * Class that implements a sort of state machine around the interaction with a - * client and an invoker. It manages reading the frame size and frame data, - * getting it handed off as wrapped transports, and then the writing of - * response data back to the client. In the process it manages flipping the - * read and write bits on the selection key for its client. + * Class that implements a sort of state machine around the interaction with a client and an + * invoker. It manages reading the frame size and frame data, getting it handed off as wrapped + * transports, and then the writing of response data back to the client. In the process it manages + * flipping the read and write bits on the selection key for its client. */ - public class FrameBuffer { + public class FrameBuffer { private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); // the actual transport hooked up to the client. @@ -303,9 +280,11 @@ public abstract class AbstractNonblockingServer extends TServer { // context associated with this connection protected final ServerContext context_; - public FrameBuffer(final TNonblockingTransport trans, + public FrameBuffer( + final TNonblockingTransport trans, final SelectionKey selectionKey, - final AbstractSelectThread selectThread) throws TTransportException { + final AbstractSelectThread selectThread) + throws TTransportException { trans_ = trans; selectionKey_ = selectionKey; selectThread_ = selectThread; @@ -321,16 +300,15 @@ public abstract class AbstractNonblockingServer extends TServer { if (eventHandler_ != null) { context_ = eventHandler_.createContext(inProt_, outProt_); } else { - context_ = null; + context_ = null; } } /** - * Give this FrameBuffer a chance to read. The selector loop should have - * received a read event for this FrameBuffer. + * Give this FrameBuffer a chance to read. The selector loop should have received a read event + * for this FrameBuffer. * - * @return true if the connection should live on, false if it should be - * closed + * @return true if the connection should live on, false if it should be closed */ public boolean read() { if (state_ == FrameBufferState.READING_FRAME_SIZE) { @@ -345,17 +323,22 @@ public abstract class AbstractNonblockingServer extends TServer { // pull out the frame size as an integer. int frameSize = buffer_.getInt(0); if (frameSize <= 0) { - LOGGER.error("Read an invalid frame size of " + frameSize - + ". Are you using TFramedTransport on the client side?"); + LOGGER.error( + "Read an invalid frame size of " + + frameSize + + ". Are you using TFramedTransport on the client side?"); return false; } // if this frame will always be too large for this server, log the // error and close the connection. if (frameSize > trans_.getMaxFrameSize()) { - LOGGER.error("Read a frame size of " + frameSize - + ", which is bigger than the maximum allowable frame size " - + trans_.getMaxFrameSize() + " for ALL connections."); + LOGGER.error( + "Read a frame size of " + + frameSize + + ", which is bigger than the maximum allowable frame size " + + trans_.getMaxFrameSize() + + " for ALL connections."); return false; } @@ -406,9 +389,7 @@ public abstract class AbstractNonblockingServer extends TServer { return false; } - /** - * Give this FrameBuffer a chance to write its output to the final client. - */ + /** Give this FrameBuffer a chance to write its output to the final client. */ public boolean write() { if (state_ == FrameBufferState.WRITING) { try { @@ -431,40 +412,33 @@ public abstract class AbstractNonblockingServer extends TServer { return false; } - /** - * Give this FrameBuffer a chance to set its interest to write, once data - * has come in. - */ + /** Give this FrameBuffer a chance to set its interest to write, once data has come in. */ public void changeSelectInterests() { switch (state_) { - case AWAITING_REGISTER_WRITE: - // set the OP_WRITE interest - selectionKey_.interestOps(SelectionKey.OP_WRITE); - state_ = FrameBufferState.WRITING; - break; - case AWAITING_REGISTER_READ: - prepareRead(); - break; - case AWAITING_CLOSE: - close(); - selectionKey_.cancel(); - break; - default: - LOGGER.error( - "changeSelectInterest was called, but state is invalid ({})", - state_); + case AWAITING_REGISTER_WRITE: + // set the OP_WRITE interest + selectionKey_.interestOps(SelectionKey.OP_WRITE); + state_ = FrameBufferState.WRITING; + break; + case AWAITING_REGISTER_READ: + prepareRead(); + break; + case AWAITING_CLOSE: + close(); + selectionKey_.cancel(); + break; + default: + LOGGER.error("changeSelectInterest was called, but state is invalid ({})", state_); } } - /** - * Shut the connection down. - */ + /** Shut the connection down. */ public void close() { // if we're being closed due to an error, we might have allocated a // buffer that we need to subtract for our memory accounting. - if (state_ == FrameBufferState.READING_FRAME || - state_ == FrameBufferState.READ_FRAME_COMPLETE || - state_ == FrameBufferState.AWAITING_CLOSE) { + if (state_ == FrameBufferState.READING_FRAME + || state_ == FrameBufferState.READ_FRAME_COMPLETE + || state_ == FrameBufferState.AWAITING_CLOSE) { readBufferBytesAllocated.addAndGet(-buffer_.array().length); } trans_.close(); @@ -473,19 +447,16 @@ public abstract class AbstractNonblockingServer extends TServer { } } - /** - * Check if this FrameBuffer has a full frame read. - */ + /** Check if this FrameBuffer has a full frame read. */ public boolean isFrameFullyRead() { return state_ == FrameBufferState.READ_FRAME_COMPLETE; } /** - * After the processor has processed the invocation, whatever thread is - * managing invocations should call this method on this FrameBuffer so we - * know it's time to start trying to write again. Also, if it turns out that - * there actually isn't any data in the response buffer, we'll skip trying - * to write and instead go back to reading. + * After the processor has processed the invocation, whatever thread is managing invocations + * should call this method on this FrameBuffer so we know it's time to start trying to write + * again. Also, if it turns out that there actually isn't any data in the response buffer, we'll + * skip trying to write and instead go back to reading. */ public void responseReady() { // the read buffer is definitely no longer in use, so we will decrement @@ -511,9 +482,7 @@ public abstract class AbstractNonblockingServer extends TServer { requestSelectInterestChange(); } - /** - * Actually invoke the method signified by this FrameBuffer. - */ + /** Actually invoke the method signified by this FrameBuffer. */ public void invoke() { frameTrans_.reset(buffer_.array()); response_.reset(); @@ -538,22 +507,18 @@ public abstract class AbstractNonblockingServer extends TServer { /** * Perform a read into buffer. * - * @return true if the read succeeded, false if there was an error or the - * connection closed. + * @return true if the read succeeded, false if there was an error or the connection closed. */ private boolean internalRead() { try { - return trans_.read(buffer_) >= 0; + return trans_.read(buffer_) >= 0; } catch (TTransportException e) { LOGGER.warn("Got an Exception in internalRead", e); return false; } } - /** - * We're done writing, so reset our interest ops and change state - * accordingly. - */ + /** We're done writing, so reset our interest ops and change state accordingly. */ private void prepareRead() { // we can set our interest directly without using the queue because // we're in the select thread. @@ -564,11 +529,10 @@ public abstract class AbstractNonblockingServer extends TServer { } /** - * When this FrameBuffer needs to change its select interests and execution - * might not be in its select thread, then this method will make sure the - * interest change gets done when the select thread wakes back up. When the - * current thread is this FrameBuffer's select thread, then it just does the - * interest change immediately. + * When this FrameBuffer needs to change its select interests and execution might not be in its + * select thread, then this method will make sure the interest change gets done when the select + * thread wakes back up. When the current thread is this FrameBuffer's select thread, then it + * just does the interest change immediately. */ protected void requestSelectInterestChange() { if (Thread.currentThread() == this.selectThread_) { @@ -580,19 +544,20 @@ public abstract class AbstractNonblockingServer extends TServer { } // FrameBuffer public class AsyncFrameBuffer extends FrameBuffer { - public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) throws TTransportException { + public AsyncFrameBuffer( + TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) + throws TTransportException { super(trans, selectionKey, selectThread); } public TProtocol getInputProtocol() { - return inProt_; + return inProt_; } public TProtocol getOutputProtocol() { return outProt_; } - public void invoke() { frameTrans_.reset(buffer_.array()); response_.reset(); @@ -601,7 +566,7 @@ public abstract class AbstractNonblockingServer extends TServer { if (eventHandler_ != null) { eventHandler_.processContext(context_, inTrans_, outTrans_); } - ((TAsyncProcessor)processorFactory_.getProcessor(inTrans_)).process(this); + ((TAsyncProcessor) processorFactory_.getProcessor(inTrans_)).process(this); return; } catch (TException te) { LOGGER.warn("Exception while invoking!", te); |