diff options
author | David Mollitor <dmollitor@apache.org> | 2020-10-02 21:10:50 +0200 |
---|---|---|
committer | Jens Geyer <jensg@apache.org> | 2020-11-19 22:35:37 +0100 |
commit | 6e6bb84be9d8ace4be9744d5637fbb59f58db463 (patch) | |
tree | 4bc659965da5d206e8d6326ed43d22225a75ddc5 | |
parent | 47e4923a9967b6ba3bbb5377809075c1940ecfc1 (diff) | |
download | thrift-6e6bb84be9d8ace4be9744d5637fbb59f58db463.tar.gz |
THRIFT-5288: Move Support for ByteBuffer into TTransport
Client: Java
Patch: David Mollitor
This closes #2254
9 files changed, 81 insertions, 45 deletions
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index d5c608d87..a119f23e0 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -217,9 +217,9 @@ public abstract class TAsyncMethodCall<T> { state = State.ERROR; } - private void doReadingResponseBody(SelectionKey key) throws IOException { + private void doReadingResponseBody(SelectionKey key) throws TTransportException { if (transport.read(frameBuffer) < 0) { - throw new IOException("Read call frame failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame failed"); } if (frameBuffer.remaining() == 0) { cleanUpAndFireCallback(key); @@ -241,9 +241,9 @@ public abstract class TAsyncMethodCall<T> { } } - private void doReadingResponseSize() throws IOException { + private void doReadingResponseSize() throws TTransportException { if (transport.read(sizeBuffer) < 0) { - throw new IOException("Read call frame size failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Read call frame size failed"); } if (sizeBuffer.remaining() == 0) { state = State.READING_RESPONSE_BODY; @@ -251,9 +251,9 @@ public abstract class TAsyncMethodCall<T> { } } - private void doWritingRequestBody(SelectionKey key) throws IOException { + private void doWritingRequestBody(SelectionKey key) throws TTransportException { if (transport.write(frameBuffer) < 0) { - throw new IOException("Write call frame failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame failed"); } if (frameBuffer.remaining() == 0) { if (isOneway) { @@ -266,9 +266,9 @@ public abstract class TAsyncMethodCall<T> { } } - private void doWritingRequestSize() throws IOException { + private void doWritingRequestSize() throws TTransportException { if (transport.write(sizeBuffer) < 0) { - throw new IOException("Write call frame size failed"); + throw new TTransportException(TTransportException.END_OF_FILE, "Write call frame size failed"); } if (sizeBuffer.remaining() == 0) { state = State.WRITING_REQUEST_BODY; diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java index 0dfcf25d1..4f4e21f50 100644 --- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java @@ -363,20 +363,17 @@ public class TCompactProtocol extends TProtocol { */ public void writeString(String str) throws TException { byte[] bytes = str.getBytes(StandardCharsets.UTF_8); - writeBinary(bytes, 0, bytes.length); + writeVarint32(bytes.length); + trans_.write(bytes, 0, bytes.length); } /** * Write a byte array, using a varint for the size. */ public void writeBinary(ByteBuffer bin) throws TException { - int length = bin.limit() - bin.position(); - writeBinary(bin.array(), bin.position() + bin.arrayOffset(), length); - } - - private void writeBinary(byte[] buf, int offset, int length) throws TException { - writeVarint32(length); - trans_.write(buf, offset, length); + ByteBuffer bb = bin.asReadOnlyBuffer(); + writeVarint32(bb.remaining()); + trans_.write(bb); } // @@ -694,12 +691,13 @@ public class TCompactProtocol extends TProtocol { } /** - * Read a byte[] from the wire. + * Read a ByteBuffer from the wire. */ public ByteBuffer readBinary() throws TException { int length = readVarint32(); - - if (length == 0) return EMPTY_BUFFER; + if (length == 0) { + return EMPTY_BUFFER; + } getTransport().checkReadBytesAvailable(length); if (trans_.getBytesRemainingInBuffer() >= length) { ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), length); diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index 4aae803f2..f91e8254f 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -414,8 +414,8 @@ public abstract class AbstractNonblockingServer extends TServer { if (trans_.write(buffer_) < 0) { return false; } - } catch (IOException e) { - LOGGER.warn("Got an IOException during write!", e); + } catch (TTransportException e) { + LOGGER.warn("Got an Exception during write", e); return false; } @@ -543,8 +543,8 @@ public abstract class AbstractNonblockingServer extends TServer { private boolean internalRead() { try { return trans_.read(buffer_) >= 0; - } catch (IOException e) { - LOGGER.warn("Got an IOException in internalRead!", e); + } catch (TTransportException e) { + LOGGER.warn("Got an Exception in internalRead", e); return false; } } diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java index 76ed02cbb..13c858648 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -144,11 +144,14 @@ public class TNonblockingSocket extends TNonblockingTransport { /** * Perform a nonblocking read into buffer. */ - public int read(ByteBuffer buffer) throws IOException { - return socketChannel_.read(buffer); + public int read(ByteBuffer buffer) throws TTransportException { + try { + return socketChannel_.read(buffer); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } } - /** * Reads from the underlying input stream if not null. */ @@ -167,8 +170,12 @@ public class TNonblockingSocket extends TNonblockingTransport { /** * Perform a nonblocking write of the data in buffer; */ - public int write(ByteBuffer buffer) throws IOException { - return socketChannel_.write(buffer); + public int write(ByteBuffer buffer) throws TTransportException { + try { + return socketChannel_.write(buffer); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } } /** @@ -179,11 +186,7 @@ public class TNonblockingSocket extends TNonblockingTransport { throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to write-only socket channel"); } - try { - socketChannel_.write(ByteBuffer.wrap(buf, off, len)); - } catch (IOException iox) { - throw new TTransportException(TTransportException.UNKNOWN, iox); - } + write(ByteBuffer.wrap(buf, off, len)); } /** diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java index 255595d6c..30ec9d25c 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -23,7 +23,6 @@ import org.apache.thrift.TConfiguration; import java.io.IOException; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -47,7 +46,4 @@ public abstract class TNonblockingTransport extends TEndpointTransport { public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; - public abstract int read(ByteBuffer buffer) throws IOException; - - public abstract int write(ByteBuffer buffer) throws IOException; } diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java index 5645f7fa1..ee070243d 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -22,6 +22,7 @@ package org.apache.thrift.transport; import org.apache.thrift.TConfiguration; import java.io.Closeable; +import java.nio.ByteBuffer; /** * Generic class that encapsulates the I/O layer. This is basically a thin @@ -60,6 +61,26 @@ public abstract class TTransport implements Closeable { public abstract void close(); /** + * Reads a sequence of bytes from this channel into the given buffer. An + * attempt is made to read up to the number of bytes remaining in the buffer, + * that is, dst.remaining(), at the moment this method is invoked. Upon return + * the buffer's position will move forward the number of bytes read; its limit + * will not have changed. Subclasses are encouraged to provide a more + * efficient implementation of this method. + * + * @param dst The buffer into which bytes are to be transferred + * @return The number of bytes read, possibly zero, or -1 if the channel has + * reached end-of-stream + * @throws TTransportException if there was an error reading data + */ + public int read(ByteBuffer dst) throws TTransportException { + byte[] arr = new byte[dst.remaining()]; + int n = read(arr, 0, arr.length); + dst.put(arr, 0, n); + return n; + } + + /** * Reads up to len bytes into buffer buf, starting at offset off. * * @param buf Array to read into @@ -121,6 +142,24 @@ public abstract class TTransport implements Closeable { throws TTransportException; /** + * Writes a sequence of bytes to the buffer. An attempt is made to write all + * remaining bytes in the buffer, that is, src.remaining(), at the moment this + * method is invoked. Upon return the buffer's position will updated; its limit + * will not have changed. Subclasses are encouraged to provide a more efficient + * implementation of this method. + * + * @param src The buffer from which bytes are to be retrieved + * @return The number of bytes written, possibly zero + * @throws TTransportException if there was an error writing data + */ + public int write(ByteBuffer src) throws TTransportException { + byte[] arr = new byte[src.remaining()]; + src.get(arr); + write(arr, 0, arr.length); + return arr.length; + } + + /** * Flush any pending data out of a transport buffer. * * @throws TTransportException if there was an error writing out data. diff --git a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java index e5feba01f..4357f13e1 100644 --- a/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java +++ b/lib/java/src/org/apache/thrift/transport/sasl/FrameWriter.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransportException; /** * Write frame (header and payload) to transport in a nonblocking way. @@ -99,9 +100,9 @@ public abstract class FrameWriter { /** * Nonblocking write to the underlying transport. * - * @throws IOException + * @throws TTransportException */ - public void write(TNonblockingTransport transport) throws IOException { + public void write(TNonblockingTransport transport) throws TTransportException { transport.write(frameBytes); } diff --git a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java index 45571469b..d73c3ec18 100644 --- a/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java +++ b/lib/java/src/org/apache/thrift/transport/sasl/NonblockingSaslHandler.java @@ -19,7 +19,6 @@ package org.apache.thrift.transport.sasl; -import java.io.IOException; import java.nio.channels.SelectionKey; import java.nio.charset.StandardCharsets; @@ -364,7 +363,7 @@ public class NonblockingSaslHandler { saslChallenge.clear(); nextPhase = Phase.READING_SASL_RESPONSE; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } @@ -378,7 +377,7 @@ public class NonblockingSaslHandler { saslResponse = null; nextPhase = Phase.READING_REQUEST; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } @@ -389,7 +388,7 @@ public class NonblockingSaslHandler { if (saslChallenge.isComplete()) { nextPhase = Phase.CLOSING; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } @@ -401,7 +400,7 @@ public class NonblockingSaslHandler { responseWriter.clear(); nextPhase = Phase.READING_REQUEST; } - } catch (IOException e) { + } catch (TTransportException e) { fail(e); } } diff --git a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java index d2425939f..60fe5c96b 100644 --- a/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java +++ b/lib/java/test/org/apache/thrift/transport/sasl/TestDataFrameWriter.java @@ -72,7 +72,7 @@ public class TestDataFrameWriter { } @Test - public void testWrite() throws IOException { + public void testWrite() throws Exception { DataFrameWriter frameWriter = new DataFrameWriter(); frameWriter.withOnlyPayload(BYTES); // Slow socket which writes one byte per call. |