summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Mollitor <dmollitor@apache.org>2020-10-05 20:14:43 -0400
committerDavid Mollitor <dmollitor@apache.org>2020-10-05 20:14:43 -0400
commit475ae3ef495c9c77efe4da7c4806e95d8fa20ce5 (patch)
tree68eb7e443b3133b718206777c688a2c878fd2995
parenta73edc55a3e540f8e9a37fe48486b92a1738eef5 (diff)
downloadthrift-THRIFT-5288.tar.gz
Changed Read/Write ByteBuffer to more closely match ReadableByteChannel/WriteableByteChannelTHRIFT-5288
-rw-r--r--lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java16
-rw-r--r--lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java8
-rw-r--r--lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java23
-rw-r--r--lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java4
-rw-r--r--lib/java/src/org/apache/thrift/transport/TTransport.java24
-rw-r--r--lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java3
-rw-r--r--lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java9
7 files changed, 54 insertions, 33 deletions
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index d5c608d87..a4e51cd36 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -217,9 +217,9 @@ public abstract class TAsyncMethodCall<T> {
state = State.ERROR;
}
- private void doReadingResponseBody(SelectionKey key) throws IOException {
+ private void doReadingResponseBody(SelectionKey key) throws TTransportException {
if (transport.read(frameBuffer) < 0) {
- throw new IOException("Read call frame failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed");
}
if (frameBuffer.remaining() == 0) {
cleanUpAndFireCallback(key);
@@ -241,9 +241,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doReadingResponseSize() throws IOException {
+ private void doReadingResponseSize() throws TTransportException {
if (transport.read(sizeBuffer) < 0) {
- throw new IOException("Read call frame size failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.READING_RESPONSE_BODY;
@@ -251,9 +251,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doWritingRequestBody(SelectionKey key) throws IOException {
+ private void doWritingRequestBody(SelectionKey key) throws TTransportException {
if (transport.write(frameBuffer) < 0) {
- throw new IOException("Write call frame failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed");
}
if (frameBuffer.remaining() == 0) {
if (isOneway) {
@@ -266,9 +266,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doWritingRequestSize() throws IOException {
+ private void doWritingRequestSize() throws TTransportException {
if (transport.write(sizeBuffer) < 0) {
- throw new IOException("Write call frame size failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.WRITING_REQUEST_BODY;
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 4aae803f2..f91e8254f 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -414,8 +414,8 @@ public abstract class AbstractNonblockingServer extends TServer {
if (trans_.write(buffer_) < 0) {
return false;
}
- } catch (IOException e) {
- LOGGER.warn("Got an IOException during write!", e);
+ } catch (TTransportException e) {
+ LOGGER.warn("Got an Exception during write", e);
return false;
}
@@ -543,8 +543,8 @@ public abstract class AbstractNonblockingServer extends TServer {
private boolean internalRead() {
try {
return trans_.read(buffer_) >= 0;
- } catch (IOException e) {
- LOGGER.warn("Got an IOException in internalRead!", e);
+ } catch (TTransportException e) {
+ LOGGER.warn("Got an Exception in internalRead", e);
return false;
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 76ed02cbb..13c858648 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -144,11 +144,14 @@ public class TNonblockingSocket extends TNonblockingTransport {
/**
* Perform a nonblocking read into buffer.
*/
- public int read(ByteBuffer buffer) throws IOException {
- return socketChannel_.read(buffer);
+ public int read(ByteBuffer buffer) throws TTransportException {
+ try {
+ return socketChannel_.read(buffer);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
}
-
/**
* Reads from the underlying input stream if not null.
*/
@@ -167,8 +170,12 @@ public class TNonblockingSocket extends TNonblockingTransport {
/**
* Perform a nonblocking write of the data in buffer;
*/
- public int write(ByteBuffer buffer) throws IOException {
- return socketChannel_.write(buffer);
+ public int write(ByteBuffer buffer) throws TTransportException {
+ try {
+ return socketChannel_.write(buffer);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
}
/**
@@ -179,11 +186,7 @@ public class TNonblockingSocket extends TNonblockingTransport {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot write to write-only socket channel");
}
- try {
- socketChannel_.write(ByteBuffer.wrap(buf, off, len));
- } catch (IOException iox) {
- throw new TTransportException(TTransportException.UNKNOWN, iox);
- }
+ write(ByteBuffer.wrap(buf, off, len));
}
/**
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 255595d6c..30ec9d25c 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -23,7 +23,6 @@ import org.apache.thrift.TConfiguration;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -47,7 +46,4 @@ public abstract class TNonblockingTransport extends TEndpointTransport {
public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
- public abstract int read(ByteBuffer buffer) throws IOException;
-
- public abstract int write(ByteBuffer buffer) throws IOException;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java
index 3811acd7e..992ad1661 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransport.java
@@ -61,6 +61,26 @@ public abstract class TTransport implements Closeable {
public abstract void close();
/**
+ * Reads a sequence of bytes from this channel into the given buffer. An
+ * attempt is made to read up to the number of bytes remaining in the buffer,
+ * that is, dst.remaining(), at the moment this method is invoked. Upon return
+ * the buffer's position will move forward the number of bytes read; its limit
+ * will not have changed. Subclasses are encouraged to provide a more
+ * efficient implementation of this method.
+ *
+ * @param dst The buffer into which bytes are to be transferred
+ * @return The number of bytes read, possibly zero, or -1 if the channel has
+ * reached end-of-stream
+ * @throws TTransportException if there was an error reading data
+ */
+ public int read(ByteBuffer dst) throws TTransportException {
+ byte[] arr = new byte[dst.remaining()];
+ this.readAll(arr, 0, arr.length);
+ dst.put(arr);
+ return arr.length;
+ }
+
+ /**
* Reads up to len bytes into buffer buf, starting at offset off.
*
* @param buf Array to read into
@@ -129,12 +149,14 @@ public abstract class TTransport implements Closeable {
* implementation of this method.
*
* @param src The buffer from which bytes are to be retrieved
+ * @return The number of bytes written, possibly zero
* @throws TTransportException if there was an error writing data
*/
- public void write(ByteBuffer src) throws TTransportException {
+ public int write(ByteBuffer src) throws TTransportException {
byte[] arr = new byte[src.remaining()];
src.get(arr);
write(arr);
+ return arr.length;
}
/**
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
index e5feba01f..df1d93230 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
/**
* Write frame (header and payload) to transport in a nonblocking way.
@@ -101,7 +102,7 @@ public abstract class FrameWriter {
*
* @throws IOException
*/
- public void write(TNonblockingTransport transport) throws IOException {
+ public void write(TNonblockingTransport transport) throws TTransportException {
transport.write(frameBytes);
}
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
index 45571469b..d73c3ec18 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
@@ -19,7 +19,6 @@
package org.apache.thrift.transport.sasl;
-import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
@@ -364,7 +363,7 @@ public class NonblockingSaslHandler {
saslChallenge.clear();
nextPhase = Phase.READING_SASL_RESPONSE;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -378,7 +377,7 @@ public class NonblockingSaslHandler {
saslResponse = null;
nextPhase = Phase.READING_REQUEST;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -389,7 +388,7 @@ public class NonblockingSaslHandler {
if (saslChallenge.isComplete()) {
nextPhase = Phase.CLOSING;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -401,7 +400,7 @@ public class NonblockingSaslHandler {
responseWriter.clear();
nextPhase = Phase.READING_REQUEST;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}