diff options
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/async')
5 files changed, 94 insertions, 72 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java index 4ebde0741..3939bc227 100644 --- a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java +++ b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java @@ -19,33 +19,29 @@ package org.apache.thrift.async; /** - * A handler interface asynchronous clients can implement to receive future - * notice of the results of an asynchronous method call. + * A handler interface asynchronous clients can implement to receive future notice of the results of + * an asynchronous method call. * * @param <T> The return type of the asynchronously invoked method. */ public interface AsyncMethodCallback<T> { /** - * This method will be called when the remote side has completed invoking - * your method call and the result is fully read. For {@code oneway} method - * calls, this method will be called as soon as we have completed writing out - * the request. + * This method will be called when the remote side has completed invoking your method call and the + * result is fully read. For {@code oneway} method calls, this method will be called as soon as we + * have completed writing out the request. * - * @param response The return value of the asynchronously invoked method; - * {@code null} for void methods which includes - * {@code oneway} methods. + * @param response The return value of the asynchronously invoked method; {@code null} for void + * methods which includes {@code oneway} methods. */ void onComplete(T response); /** - * This method will be called when there is either an unexpected client-side - * exception like an IOException or else when the remote method raises an - * exception, either declared in the IDL or due to an unexpected server-side - * error. + * This method will be called when there is either an unexpected client-side exception like an + * IOException or else when the remote method raises an exception, either declared in the IDL or + * due to an unexpected server-side error. * - * @param exception The exception encountered processing the the asynchronous - * method call, may be a local exception or an unmarshalled - * remote exception. + * @param exception The exception encountered processing the the asynchronous method call, may be + * a local exception or an unmarshalled remote exception. */ void onError(Exception exception); } diff --git a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java index 0bee3a7cf..202af7bff 100644 --- a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java +++ b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java @@ -3,33 +3,33 @@ package org.apache.thrift.async; import java.util.concurrent.CompletableFuture; /** - * A simple adapter that bridges {@link AsyncMethodCallback} with {@link CompletableFuture}-returning style clients. - * Compiler generated code will invoke this adapter to implement {@code FutureClient}s. + * A simple adapter that bridges {@link AsyncMethodCallback} with {@link + * CompletableFuture}-returning style clients. Compiler generated code will invoke this adapter to + * implement {@code FutureClient}s. * * @param <T> return type (can be {@link Void}). */ public final class AsyncMethodFutureAdapter<T> implements AsyncMethodCallback<T> { - private AsyncMethodFutureAdapter() { - } + private AsyncMethodFutureAdapter() {} - public static <T> AsyncMethodFutureAdapter<T> create() { - return new AsyncMethodFutureAdapter<>(); - } + public static <T> AsyncMethodFutureAdapter<T> create() { + return new AsyncMethodFutureAdapter<>(); + } - private final CompletableFuture<T> future = new CompletableFuture<>(); + private final CompletableFuture<T> future = new CompletableFuture<>(); - public CompletableFuture<T> getFuture() { - return future; - } + public CompletableFuture<T> getFuture() { + return future; + } - @Override - public void onComplete(T response) { - future.complete(response); - } + @Override + public void onComplete(T response) { + future.complete(response); + } - @Override - public void onError(Exception exception) { - future.completeExceptionally(exception); - } + @Override + public void onError(Exception exception) { + future.completeExceptionally(exception); + } } diff --git a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java index 8ba135642..06d6f6382 100644 --- a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java +++ b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java @@ -29,11 +29,18 @@ public abstract class TAsyncClient { private Exception ___error; private long ___timeout; - public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) { + public TAsyncClient( + TProtocolFactory protocolFactory, + TAsyncClientManager manager, + TNonblockingTransport transport) { this(protocolFactory, manager, transport, 0); } - public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) { + public TAsyncClient( + TProtocolFactory protocolFactory, + TAsyncClientManager manager, + TNonblockingTransport transport, + long timeout) { this.___protocolFactory = protocolFactory; this.___manager = manager; this.___transport = transport; @@ -58,6 +65,7 @@ public abstract class TAsyncClient { /** * Is the client in an error state? + * * @return If client in an error state? */ public boolean hasError() { @@ -66,7 +74,9 @@ public abstract class TAsyncClient { /** * Get the client's error - returns null if no error - * @return Get the client's error. <p> returns null if no error + * + * @return Get the client's error. + * <p>returns null if no error */ public Exception getError() { return ___error; @@ -75,7 +85,8 @@ public abstract class TAsyncClient { protected void checkReady() { // Ensure we are not currently executing a method if (___currentMethod != null) { - throw new IllegalStateException("Client is currently executing another method: " + ___currentMethod.getClass().getName()); + throw new IllegalStateException( + "Client is currently executing another method: " + ___currentMethod.getClass().getName()); } // Ensure we're not in an error state @@ -84,9 +95,7 @@ public abstract class TAsyncClient { } } - /** - * Called by delegate method when finished - */ + /** Called by delegate method when finished */ protected void onComplete() { ___currentMethod = null; } diff --git a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java index c07ccd540..ba6a5fd2e 100644 --- a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java +++ b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java @@ -29,19 +29,17 @@ import java.util.Iterator; import java.util.TreeSet; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeoutException; - import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Contains selector thread which transitions method call objects - */ +/** Contains selector thread which transitions method call objects */ public class TAsyncClientManager { private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName()); private final SelectThread selectThread; - private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>(); + private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = + new ConcurrentLinkedQueue<TAsyncMethodCall>(); public TAsyncClientManager() throws IOException { this.selectThread = new SelectThread(); @@ -68,7 +66,8 @@ public class TAsyncClientManager { private class SelectThread extends Thread { private final Selector selector; private volatile boolean running; - private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator()); + private final TreeSet<TAsyncMethodCall> timeoutWatchSet = + new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator()); public SelectThread() throws IOException { this.selector = SelectorProvider.provider().openSelector(); @@ -96,7 +95,8 @@ public class TAsyncClientManager { // No timeouts, so select indefinitely selector.select(); } else { - // We have a timeout pending, so calculate the time until then and select appropriately + // We have a timeout pending, so calculate the time until then and select + // appropriately long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp(); long selectTime = nextTimeout - System.currentTimeMillis(); if (selectTime > 0) { @@ -139,7 +139,7 @@ public class TAsyncClientManager { // just skip continue; } - TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment(); + TAsyncMethodCall methodCall = (TAsyncMethodCall) key.attachment(); methodCall.transition(key); // If done or error occurred, remove from timeout watch set @@ -160,7 +160,13 @@ public class TAsyncClientManager { TAsyncMethodCall methodCall = iterator.next(); if (currentTime >= methodCall.getTimeoutTimestamp()) { iterator.remove(); - methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms.")); + methodCall.onError( + new TimeoutException( + "Operation " + + methodCall.getClass() + + " timed out after " + + (currentTime - methodCall.getStartTime()) + + " ms.")); } else { break; } @@ -189,12 +195,13 @@ public class TAsyncClientManager { } /** Comparator used in TreeSet */ - private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall>, Serializable { + private static class TAsyncMethodCallTimeoutComparator + implements Comparator<TAsyncMethodCall>, Serializable { public int compare(TAsyncMethodCall left, TAsyncMethodCall right) { if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) { - return (int)(left.getSequenceId() - right.getSequenceId()); + return (int) (left.getSequenceId() - right.getSequenceId()); } else { - return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp()); + return (int) (left.getTimeoutTimestamp() - right.getTimeoutTimestamp()); } } } diff --git a/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java index a119f23e0..df586b4ec 100644 --- a/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java @@ -23,22 +23,22 @@ import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.util.concurrent.atomic.AtomicLong; - import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.layered.TFramedTransport; /** * Encapsulates an async method call. - * <p> - * Need to generate: + * + * <p>Need to generate: + * * <ul> - * <li>protected abstract void write_args(TProtocol protocol)</li> - * <li>protected abstract T getResult() throws <Exception_1>, <Exception_2>, ...</li> + * <li>protected abstract void write_args(TProtocol protocol) + * <li>protected abstract T getResult() throws <Exception_1>, <Exception_2>, ... * </ul> * * @param <T> The return type of the encapsulated method call. @@ -58,9 +58,7 @@ public abstract class TAsyncMethodCall<T> { ERROR; } - /** - * Next step in the call, initialized by start() - */ + /** Next step in the call, initialized by start() */ private State state = null; protected final TNonblockingTransport transport; @@ -77,7 +75,12 @@ public abstract class TAsyncMethodCall<T> { private long startTime = System.currentTimeMillis(); - protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) { + protected TAsyncMethodCall( + TAsyncClient client, + TProtocolFactory protocolFactory, + TNonblockingTransport transport, + AsyncMethodCallback<T> callback, + boolean isOneway) { this.transport = transport; this.callback = callback; this.protocolFactory = protocolFactory; @@ -121,6 +124,7 @@ public abstract class TAsyncMethodCall<T> { /** * Initialize buffers. + * * @throws TException if buffer initialization fails */ protected void prepareMethodCall() throws TException { @@ -137,6 +141,7 @@ public abstract class TAsyncMethodCall<T> { /** * Register with selector and start first state, which could be either connecting or writing. + * * @throws IOException if register or starting fails */ void start(Selector sel) throws IOException { @@ -168,9 +173,10 @@ public abstract class TAsyncMethodCall<T> { } /** - * Transition to next state, doing whatever work is required. Since this - * method is only called by the selector thread, we can make changes to our - * select interests without worrying about concurrency. + * Transition to next state, doing whatever work is required. Since this method is only called by + * the selector thread, we can make changes to our select interests without worrying about + * concurrency. + * * @param key */ void transition(SelectionKey key) { @@ -201,8 +207,10 @@ public abstract class TAsyncMethodCall<T> { doReadingResponseBody(key); break; default: // RESPONSE_READ, ERROR, or bug - throw new IllegalStateException("Method call in state " + state - + " but selector called transition method. Seems like a bug..."); + throw new IllegalStateException( + "Method call in state " + + state + + " but selector called transition method. Seems like a bug..."); } } catch (Exception e) { key.cancel(); @@ -260,7 +268,7 @@ public abstract class TAsyncMethodCall<T> { cleanUpAndFireCallback(key); } else { state = State.READING_RESPONSE_SIZE; - sizeBuffer.rewind(); // Prepare to read incoming frame size + sizeBuffer.rewind(); // Prepare to read incoming frame size key.interestOps(SelectionKey.OP_READ); } } @@ -268,7 +276,8 @@ public abstract class TAsyncMethodCall<T> { private void doWritingRequestSize() throws TTransportException { if (transport.write(sizeBuffer) < 0) { - throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed"); + throw new TTransportException( + TTransportException.END_OF_FILE, "Write call frame size failed"); } if (sizeBuffer.remaining() == 0) { state = State.WRITING_REQUEST_BODY; @@ -277,7 +286,8 @@ public abstract class TAsyncMethodCall<T> { private void doConnecting(SelectionKey key) throws IOException { if (!key.isConnectable() || !transport.finishConnect()) { - throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT"); + throw new IOException( + "not connectable or finishConnect returned false after we got an OP_CONNECT"); } registerForFirstWrite(key); } |