summaryrefslogtreecommitdiff
path: root/lib/java/src/main/java/org/apache/thrift/async
diff options
context:
space:
mode:
Diffstat (limited to 'lib/java/src/main/java/org/apache/thrift/async')
-rw-r--r--lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java28
-rw-r--r--lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java38
-rw-r--r--lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java23
-rw-r--r--lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java31
-rw-r--r--lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java46
5 files changed, 94 insertions, 72 deletions
diff --git a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java
index 4ebde0741..3939bc227 100644
--- a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java
+++ b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodCallback.java
@@ -19,33 +19,29 @@
package org.apache.thrift.async;
/**
- * A handler interface asynchronous clients can implement to receive future
- * notice of the results of an asynchronous method call.
+ * A handler interface asynchronous clients can implement to receive future notice of the results of
+ * an asynchronous method call.
*
* @param <T> The return type of the asynchronously invoked method.
*/
public interface AsyncMethodCallback<T> {
/**
- * This method will be called when the remote side has completed invoking
- * your method call and the result is fully read. For {@code oneway} method
- * calls, this method will be called as soon as we have completed writing out
- * the request.
+ * This method will be called when the remote side has completed invoking your method call and the
+ * result is fully read. For {@code oneway} method calls, this method will be called as soon as we
+ * have completed writing out the request.
*
- * @param response The return value of the asynchronously invoked method;
- * {@code null} for void methods which includes
- * {@code oneway} methods.
+ * @param response The return value of the asynchronously invoked method; {@code null} for void
+ * methods which includes {@code oneway} methods.
*/
void onComplete(T response);
/**
- * This method will be called when there is either an unexpected client-side
- * exception like an IOException or else when the remote method raises an
- * exception, either declared in the IDL or due to an unexpected server-side
- * error.
+ * This method will be called when there is either an unexpected client-side exception like an
+ * IOException or else when the remote method raises an exception, either declared in the IDL or
+ * due to an unexpected server-side error.
*
- * @param exception The exception encountered processing the the asynchronous
- * method call, may be a local exception or an unmarshalled
- * remote exception.
+ * @param exception The exception encountered processing the the asynchronous method call, may be
+ * a local exception or an unmarshalled remote exception.
*/
void onError(Exception exception);
}
diff --git a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java
index 0bee3a7cf..202af7bff 100644
--- a/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java
+++ b/lib/java/src/main/java/org/apache/thrift/async/AsyncMethodFutureAdapter.java
@@ -3,33 +3,33 @@ package org.apache.thrift.async;
import java.util.concurrent.CompletableFuture;
/**
- * A simple adapter that bridges {@link AsyncMethodCallback} with {@link CompletableFuture}-returning style clients.
- * Compiler generated code will invoke this adapter to implement {@code FutureClient}s.
+ * A simple adapter that bridges {@link AsyncMethodCallback} with {@link
+ * CompletableFuture}-returning style clients. Compiler generated code will invoke this adapter to
+ * implement {@code FutureClient}s.
*
* @param <T> return type (can be {@link Void}).
*/
public final class AsyncMethodFutureAdapter<T> implements AsyncMethodCallback<T> {
- private AsyncMethodFutureAdapter() {
- }
+ private AsyncMethodFutureAdapter() {}
- public static <T> AsyncMethodFutureAdapter<T> create() {
- return new AsyncMethodFutureAdapter<>();
- }
+ public static <T> AsyncMethodFutureAdapter<T> create() {
+ return new AsyncMethodFutureAdapter<>();
+ }
- private final CompletableFuture<T> future = new CompletableFuture<>();
+ private final CompletableFuture<T> future = new CompletableFuture<>();
- public CompletableFuture<T> getFuture() {
- return future;
- }
+ public CompletableFuture<T> getFuture() {
+ return future;
+ }
- @Override
- public void onComplete(T response) {
- future.complete(response);
- }
+ @Override
+ public void onComplete(T response) {
+ future.complete(response);
+ }
- @Override
- public void onError(Exception exception) {
- future.completeExceptionally(exception);
- }
+ @Override
+ public void onError(Exception exception) {
+ future.completeExceptionally(exception);
+ }
}
diff --git a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java
index 8ba135642..06d6f6382 100644
--- a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java
+++ b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClient.java
@@ -29,11 +29,18 @@ public abstract class TAsyncClient {
private Exception ___error;
private long ___timeout;
- public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
+ public TAsyncClient(
+ TProtocolFactory protocolFactory,
+ TAsyncClientManager manager,
+ TNonblockingTransport transport) {
this(protocolFactory, manager, transport, 0);
}
- public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) {
+ public TAsyncClient(
+ TProtocolFactory protocolFactory,
+ TAsyncClientManager manager,
+ TNonblockingTransport transport,
+ long timeout) {
this.___protocolFactory = protocolFactory;
this.___manager = manager;
this.___transport = transport;
@@ -58,6 +65,7 @@ public abstract class TAsyncClient {
/**
* Is the client in an error state?
+ *
* @return If client in an error state?
*/
public boolean hasError() {
@@ -66,7 +74,9 @@ public abstract class TAsyncClient {
/**
* Get the client's error - returns null if no error
- * @return Get the client's error. <p> returns null if no error
+ *
+ * @return Get the client's error.
+ * <p>returns null if no error
*/
public Exception getError() {
return ___error;
@@ -75,7 +85,8 @@ public abstract class TAsyncClient {
protected void checkReady() {
// Ensure we are not currently executing a method
if (___currentMethod != null) {
- throw new IllegalStateException("Client is currently executing another method: " + ___currentMethod.getClass().getName());
+ throw new IllegalStateException(
+ "Client is currently executing another method: " + ___currentMethod.getClass().getName());
}
// Ensure we're not in an error state
@@ -84,9 +95,7 @@ public abstract class TAsyncClient {
}
}
- /**
- * Called by delegate method when finished
- */
+ /** Called by delegate method when finished */
protected void onComplete() {
___currentMethod = null;
}
diff --git a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java
index c07ccd540..ba6a5fd2e 100644
--- a/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java
+++ b/lib/java/src/main/java/org/apache/thrift/async/TAsyncClientManager.java
@@ -29,19 +29,17 @@ import java.util.Iterator;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
-
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * Contains selector thread which transitions method call objects
- */
+/** Contains selector thread which transitions method call objects */
public class TAsyncClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
private final SelectThread selectThread;
- private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>();
+ private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls =
+ new ConcurrentLinkedQueue<TAsyncMethodCall>();
public TAsyncClientManager() throws IOException {
this.selectThread = new SelectThread();
@@ -68,7 +66,8 @@ public class TAsyncClientManager {
private class SelectThread extends Thread {
private final Selector selector;
private volatile boolean running;
- private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());
+ private final TreeSet<TAsyncMethodCall> timeoutWatchSet =
+ new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());
public SelectThread() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
@@ -96,7 +95,8 @@ public class TAsyncClientManager {
// No timeouts, so select indefinitely
selector.select();
} else {
- // We have a timeout pending, so calculate the time until then and select appropriately
+ // We have a timeout pending, so calculate the time until then and select
+ // appropriately
long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
long selectTime = nextTimeout - System.currentTimeMillis();
if (selectTime > 0) {
@@ -139,7 +139,7 @@ public class TAsyncClientManager {
// just skip
continue;
}
- TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
+ TAsyncMethodCall methodCall = (TAsyncMethodCall) key.attachment();
methodCall.transition(key);
// If done or error occurred, remove from timeout watch set
@@ -160,7 +160,13 @@ public class TAsyncClientManager {
TAsyncMethodCall methodCall = iterator.next();
if (currentTime >= methodCall.getTimeoutTimestamp()) {
iterator.remove();
- methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms."));
+ methodCall.onError(
+ new TimeoutException(
+ "Operation "
+ + methodCall.getClass()
+ + " timed out after "
+ + (currentTime - methodCall.getStartTime())
+ + " ms."));
} else {
break;
}
@@ -189,12 +195,13 @@ public class TAsyncClientManager {
}
/** Comparator used in TreeSet */
- private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall>, Serializable {
+ private static class TAsyncMethodCallTimeoutComparator
+ implements Comparator<TAsyncMethodCall>, Serializable {
public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
- return (int)(left.getSequenceId() - right.getSequenceId());
+ return (int) (left.getSequenceId() - right.getSequenceId());
} else {
- return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
+ return (int) (left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
}
}
}
diff --git a/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java
index a119f23e0..df586b4ec 100644
--- a/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/main/java/org/apache/thrift/async/TAsyncMethodCall.java
@@ -23,22 +23,22 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.layered.TFramedTransport;
/**
* Encapsulates an async method call.
- * <p>
- * Need to generate:
+ *
+ * <p>Need to generate:
+ *
* <ul>
- * <li>protected abstract void write_args(TProtocol protocol)</li>
- * <li>protected abstract T getResult() throws &lt;Exception_1&gt;, &lt;Exception_2&gt;, ...</li>
+ * <li>protected abstract void write_args(TProtocol protocol)
+ * <li>protected abstract T getResult() throws &lt;Exception_1&gt;, &lt;Exception_2&gt;, ...
* </ul>
*
* @param <T> The return type of the encapsulated method call.
@@ -58,9 +58,7 @@ public abstract class TAsyncMethodCall<T> {
ERROR;
}
- /**
- * Next step in the call, initialized by start()
- */
+ /** Next step in the call, initialized by start() */
private State state = null;
protected final TNonblockingTransport transport;
@@ -77,7 +75,12 @@ public abstract class TAsyncMethodCall<T> {
private long startTime = System.currentTimeMillis();
- protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
+ protected TAsyncMethodCall(
+ TAsyncClient client,
+ TProtocolFactory protocolFactory,
+ TNonblockingTransport transport,
+ AsyncMethodCallback<T> callback,
+ boolean isOneway) {
this.transport = transport;
this.callback = callback;
this.protocolFactory = protocolFactory;
@@ -121,6 +124,7 @@ public abstract class TAsyncMethodCall<T> {
/**
* Initialize buffers.
+ *
* @throws TException if buffer initialization fails
*/
protected void prepareMethodCall() throws TException {
@@ -137,6 +141,7 @@ public abstract class TAsyncMethodCall<T> {
/**
* Register with selector and start first state, which could be either connecting or writing.
+ *
* @throws IOException if register or starting fails
*/
void start(Selector sel) throws IOException {
@@ -168,9 +173,10 @@ public abstract class TAsyncMethodCall<T> {
}
/**
- * Transition to next state, doing whatever work is required. Since this
- * method is only called by the selector thread, we can make changes to our
- * select interests without worrying about concurrency.
+ * Transition to next state, doing whatever work is required. Since this method is only called by
+ * the selector thread, we can make changes to our select interests without worrying about
+ * concurrency.
+ *
* @param key
*/
void transition(SelectionKey key) {
@@ -201,8 +207,10 @@ public abstract class TAsyncMethodCall<T> {
doReadingResponseBody(key);
break;
default: // RESPONSE_READ, ERROR, or bug
- throw new IllegalStateException("Method call in state " + state
- + " but selector called transition method. Seems like a bug...");
+ throw new IllegalStateException(
+ "Method call in state "
+ + state
+ + " but selector called transition method. Seems like a bug...");
}
} catch (Exception e) {
key.cancel();
@@ -260,7 +268,7 @@ public abstract class TAsyncMethodCall<T> {
cleanUpAndFireCallback(key);
} else {
state = State.READING_RESPONSE_SIZE;
- sizeBuffer.rewind(); // Prepare to read incoming frame size
+ sizeBuffer.rewind(); // Prepare to read incoming frame size
key.interestOps(SelectionKey.OP_READ);
}
}
@@ -268,7 +276,8 @@ public abstract class TAsyncMethodCall<T> {
private void doWritingRequestSize() throws TTransportException {
if (transport.write(sizeBuffer) < 0) {
- throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed");
+ throw new TTransportException(
+ TTransportException.END_OF_FILE, "Write call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.WRITING_REQUEST_BODY;
@@ -277,7 +286,8 @@ public abstract class TAsyncMethodCall<T> {
private void doConnecting(SelectionKey key) throws IOException {
if (!key.isConnectable() || !transport.finishConnect()) {
- throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
+ throw new IOException(
+ "not connectable or finishConnect returned false after we got an OP_CONNECT");
}
registerForFirstWrite(key);
}