summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Mollitor <dmollitor@apache.org>2020-10-02 21:10:50 +0200
committerJens Geyer <jensg@apache.org>2020-11-19 22:35:37 +0100
commit6e6bb84be9d8ace4be9744d5637fbb59f58db463 (patch)
tree4bc659965da5d206e8d6326ed43d22225a75ddc5
parent47e4923a9967b6ba3bbb5377809075c1940ecfc1 (diff)
downloadthrift-6e6bb84be9d8ace4be9744d5637fbb59f58db463.tar.gz
THRIFT-5288: Move Support for ByteBuffer into TTransport
Client: Java Patch: David Mollitor This closes #2254
-rw-r--r--lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java16
-rw-r--r--lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java20
-rw-r--r--lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java8
-rw-r--r--lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java23
-rw-r--r--lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java4
-rw-r--r--lib/java/src/org/apache/thrift/transport/TTransport.java39
-rw-r--r--lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java5
-rw-r--r--lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java9
-rw-r--r--lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java2
9 files changed, 81 insertions, 45 deletions
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index d5c608d87..a119f23e0 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -217,9 +217,9 @@ public abstract class TAsyncMethodCall<T> {
state = State.ERROR;
}
- private void doReadingResponseBody(SelectionKey key) throws IOException {
+ private void doReadingResponseBody(SelectionKey key) throws TTransportException {
if (transport.read(frameBuffer) < 0) {
- throw new IOException("Read call frame failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed");
}
if (frameBuffer.remaining() == 0) {
cleanUpAndFireCallback(key);
@@ -241,9 +241,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doReadingResponseSize() throws IOException {
+ private void doReadingResponseSize() throws TTransportException {
if (transport.read(sizeBuffer) < 0) {
- throw new IOException("Read call frame size failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.READING_RESPONSE_BODY;
@@ -251,9 +251,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doWritingRequestBody(SelectionKey key) throws IOException {
+ private void doWritingRequestBody(SelectionKey key) throws TTransportException {
if (transport.write(frameBuffer) < 0) {
- throw new IOException("Write call frame failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed");
}
if (frameBuffer.remaining() == 0) {
if (isOneway) {
@@ -266,9 +266,9 @@ public abstract class TAsyncMethodCall<T> {
}
}
- private void doWritingRequestSize() throws IOException {
+ private void doWritingRequestSize() throws TTransportException {
if (transport.write(sizeBuffer) < 0) {
- throw new IOException("Write call frame size failed");
+ throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed");
}
if (sizeBuffer.remaining() == 0) {
state = State.WRITING_REQUEST_BODY;
diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
index 0dfcf25d1..4f4e21f50 100644
--- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
+++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java
@@ -363,20 +363,17 @@ public class TCompactProtocol extends TProtocol {
*/
public void writeString(String str) throws TException {
byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
- writeBinary(bytes, 0, bytes.length);
+ writeVarint32(bytes.length);
+ trans_.write(bytes, 0, bytes.length);
}
/**
* Write a byte array, using a varint for the size.
*/
public void writeBinary(ByteBuffer bin) throws TException {
- int length = bin.limit() - bin.position();
- writeBinary(bin.array(), bin.position() + bin.arrayOffset(), length);
- }
-
- private void writeBinary(byte[] buf, int offset, int length) throws TException {
- writeVarint32(length);
- trans_.write(buf, offset, length);
+ ByteBuffer bb = bin.asReadOnlyBuffer();
+ writeVarint32(bb.remaining());
+ trans_.write(bb);
}
//
@@ -694,12 +691,13 @@ public class TCompactProtocol extends TProtocol {
}
/**
- * Read a byte[] from the wire.
+ * Read a ByteBuffer from the wire.
*/
public ByteBuffer readBinary() throws TException {
int length = readVarint32();
-
- if (length == 0) return EMPTY_BUFFER;
+ if (length == 0) {
+ return EMPTY_BUFFER;
+ }
getTransport().checkReadBytesAvailable(length);
if (trans_.getBytesRemainingInBuffer() >= length) {
ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), length);
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 4aae803f2..f91e8254f 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -414,8 +414,8 @@ public abstract class AbstractNonblockingServer extends TServer {
if (trans_.write(buffer_) < 0) {
return false;
}
- } catch (IOException e) {
- LOGGER.warn("Got an IOException during write!", e);
+ } catch (TTransportException e) {
+ LOGGER.warn("Got an Exception during write", e);
return false;
}
@@ -543,8 +543,8 @@ public abstract class AbstractNonblockingServer extends TServer {
private boolean internalRead() {
try {
return trans_.read(buffer_) >= 0;
- } catch (IOException e) {
- LOGGER.warn("Got an IOException in internalRead!", e);
+ } catch (TTransportException e) {
+ LOGGER.warn("Got an Exception in internalRead", e);
return false;
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 76ed02cbb..13c858648 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -144,11 +144,14 @@ public class TNonblockingSocket extends TNonblockingTransport {
/**
* Perform a nonblocking read into buffer.
*/
- public int read(ByteBuffer buffer) throws IOException {
- return socketChannel_.read(buffer);
+ public int read(ByteBuffer buffer) throws TTransportException {
+ try {
+ return socketChannel_.read(buffer);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
}
-
/**
* Reads from the underlying input stream if not null.
*/
@@ -167,8 +170,12 @@ public class TNonblockingSocket extends TNonblockingTransport {
/**
* Perform a nonblocking write of the data in buffer;
*/
- public int write(ByteBuffer buffer) throws IOException {
- return socketChannel_.write(buffer);
+ public int write(ByteBuffer buffer) throws TTransportException {
+ try {
+ return socketChannel_.write(buffer);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
}
/**
@@ -179,11 +186,7 @@ public class TNonblockingSocket extends TNonblockingTransport {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot write to write-only socket channel");
}
- try {
- socketChannel_.write(ByteBuffer.wrap(buf, off, len));
- } catch (IOException iox) {
- throw new TTransportException(TTransportException.UNKNOWN, iox);
- }
+ write(ByteBuffer.wrap(buf, off, len));
}
/**
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 255595d6c..30ec9d25c 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -23,7 +23,6 @@ import org.apache.thrift.TConfiguration;
import java.io.IOException;
import java.net.SocketAddress;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -47,7 +46,4 @@ public abstract class TNonblockingTransport extends TEndpointTransport {
public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
- public abstract int read(ByteBuffer buffer) throws IOException;
-
- public abstract int write(ByteBuffer buffer) throws IOException;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java
index 5645f7fa1..ee070243d 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransport.java
@@ -22,6 +22,7 @@ package org.apache.thrift.transport;
import org.apache.thrift.TConfiguration;
import java.io.Closeable;
+import java.nio.ByteBuffer;
/**
* Generic class that encapsulates the I/O layer. This is basically a thin
@@ -60,6 +61,26 @@ public abstract class TTransport implements Closeable {
public abstract void close();
/**
+ * Reads a sequence of bytes from this channel into the given buffer. An
+ * attempt is made to read up to the number of bytes remaining in the buffer,
+ * that is, dst.remaining(), at the moment this method is invoked. Upon return
+ * the buffer's position will move forward the number of bytes read; its limit
+ * will not have changed. Subclasses are encouraged to provide a more
+ * efficient implementation of this method.
+ *
+ * @param dst The buffer into which bytes are to be transferred
+ * @return The number of bytes read, possibly zero, or -1 if the channel has
+ * reached end-of-stream
+ * @throws TTransportException if there was an error reading data
+ */
+ public int read(ByteBuffer dst) throws TTransportException {
+ byte[] arr = new byte[dst.remaining()];
+ int n = read(arr, 0, arr.length);
+ dst.put(arr, 0, n);
+ return n;
+ }
+
+ /**
* Reads up to len bytes into buffer buf, starting at offset off.
*
* @param buf Array to read into
@@ -121,6 +142,24 @@ public abstract class TTransport implements Closeable {
throws TTransportException;
/**
+ * Writes a sequence of bytes to the buffer. An attempt is made to write all
+ * remaining bytes in the buffer, that is, src.remaining(), at the moment this
+ * method is invoked. Upon return the buffer's position will updated; its limit
+ * will not have changed. Subclasses are encouraged to provide a more efficient
+ * implementation of this method.
+ *
+ * @param src The buffer from which bytes are to be retrieved
+ * @return The number of bytes written, possibly zero
+ * @throws TTransportException if there was an error writing data
+ */
+ public int write(ByteBuffer src) throws TTransportException {
+ byte[] arr = new byte[src.remaining()];
+ src.get(arr);
+ write(arr, 0, arr.length);
+ return arr.length;
+ }
+
+ /**
* Flush any pending data out of a transport buffer.
*
* @throws TTransportException if there was an error writing out data.
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
index e5feba01f..4357f13e1 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
/**
* Write frame (header and payload) to transport in a nonblocking way.
@@ -99,9 +100,9 @@ public abstract class FrameWriter {
/**
* Nonblocking write to the underlying transport.
*
- * @throws IOException
+ * @throws TTransportException
*/
- public void write(TNonblockingTransport transport) throws IOException {
+ public void write(TNonblockingTransport transport) throws TTransportException {
transport.write(frameBytes);
}
diff --git a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
index 45571469b..d73c3ec18 100644
--- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
+++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java
@@ -19,7 +19,6 @@
package org.apache.thrift.transport.sasl;
-import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.charset.StandardCharsets;
@@ -364,7 +363,7 @@ public class NonblockingSaslHandler {
saslChallenge.clear();
nextPhase = Phase.READING_SASL_RESPONSE;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -378,7 +377,7 @@ public class NonblockingSaslHandler {
saslResponse = null;
nextPhase = Phase.READING_REQUEST;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -389,7 +388,7 @@ public class NonblockingSaslHandler {
if (saslChallenge.isComplete()) {
nextPhase = Phase.CLOSING;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
@@ -401,7 +400,7 @@ public class NonblockingSaslHandler {
responseWriter.clear();
nextPhase = Phase.READING_REQUEST;
}
- } catch (IOException e) {
+ } catch (TTransportException e) {
fail(e);
}
}
diff --git a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
index d2425939f..60fe5c96b 100644
--- a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
+++ b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java
@@ -72,7 +72,7 @@ public class TestDataFrameWriter {
}
@Test
- public void testWrite() throws IOException {
+ public void testWrite() throws Exception {
DataFrameWriter frameWriter = new DataFrameWriter();
frameWriter.withOnlyPayload(BYTES);
// Slow socket which writes one byte per call.