From 63213c17ad3fece91fdaaca8f59165ca3f41c5c1 Mon Sep 17 00:00:00 2001 From: zeshuai007 <51382517@qq.com> Date: Wed, 16 Sep 2020 16:56:51 +0800 Subject: Implements TConfiguration for java --- .../thrift/transport/AutoExpandingBuffer.java | 2 +- .../AutoExpandingBufferReadTransport.java | 7 +- .../AutoExpandingBufferWriteTransport.java | 7 +- .../org/apache/thrift/transport/TByteBuffer.java | 11 +- .../thrift/transport/TEndpointTransport.java | 100 ++++++++++ .../thrift/transport/TFastFramedTransport.java | 200 -------------------- .../apache/thrift/transport/TFileTransport.java | 79 ++++---- .../apache/thrift/transport/TFramedTransport.java | 190 ------------------- .../org/apache/thrift/transport/THttpClient.java | 80 +++++--- .../thrift/transport/TIOStreamTransport.java | 61 ++++++- .../org/apache/thrift/transport/TMemoryBuffer.java | 26 ++- .../thrift/transport/TMemoryInputTransport.java | 38 +++- .../apache/thrift/transport/TMemoryTransport.java | 15 +- .../thrift/transport/TNonblockingSocket.java | 15 +- .../thrift/transport/TNonblockingTransport.java | 8 +- .../thrift/transport/TSSLTransportFactory.java | 14 +- .../thrift/transport/TSaslClientTransport.java | 8 +- .../thrift/transport/TSaslServerTransport.java | 8 +- .../apache/thrift/transport/TSaslTransport.java | 13 +- .../thrift/transport/TSimpleFileTransport.java | 54 ++++-- .../src/org/apache/thrift/transport/TSocket.java | 27 ++- .../org/apache/thrift/transport/TTransport.java | 8 + .../apache/thrift/transport/TTransportFactory.java | 2 +- .../apache/thrift/transport/TZlibTransport.java | 10 +- .../transport/layered/TFastFramedTransport.java | 203 +++++++++++++++++++++ .../thrift/transport/layered/TFramedTransport.java | 187 +++++++++++++++++++ .../transport/layered/TLayeredTransport.java | 52 ++++++ 27 files changed, 908 insertions(+), 517 deletions(-) create mode 100644 lib/java/src/org/apache/thrift/transport/TEndpointTransport.java delete mode 100644 lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java delete mode 100644 lib/java/src/org/apache/thrift/transport/TFramedTransport.java create mode 100644 lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java create mode 100644 lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java create mode 100644 lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java (limited to 'lib/java/src/org/apache/thrift/transport') diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java index fc3aa92df..b355d11ca 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java @@ -27,7 +27,7 @@ import java.util.Arrays; * rate slightly faster than the requested capacity with the (untested) * objective of avoiding expensive buffer allocations and copies. */ -class AutoExpandingBuffer { +public class AutoExpandingBuffer { private byte[] array; public AutoExpandingBuffer(int initialCapacity) { diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java index d06fec78c..6fd4075b9 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java @@ -18,17 +18,20 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + /** * TTransport for reading from an AutoExpandingBuffer. */ -public class AutoExpandingBufferReadTransport extends TTransport { +public class AutoExpandingBufferReadTransport extends TEndpointTransport { private final AutoExpandingBuffer buf; private int pos = 0; private int limit = 0; - public AutoExpandingBufferReadTransport(int initialCapacity) { + public AutoExpandingBufferReadTransport(TConfiguration config, int initialCapacity) throws TTransportException { + super(config); this.buf = new AutoExpandingBuffer(initialCapacity); } diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java index ec7e7d45a..25f974a73 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java @@ -18,10 +18,12 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + /** * TTransport for writing to an AutoExpandingBuffer. */ -public final class AutoExpandingBufferWriteTransport extends TTransport { +public final class AutoExpandingBufferWriteTransport extends TEndpointTransport { private final AutoExpandingBuffer buf; private int pos; @@ -38,7 +40,8 @@ public final class AutoExpandingBufferWriteTransport extends TTransport { * @throws IllegalArgumentException if frontReserve is less than zero * @throws IllegalArgumentException if frontReserve is greater than initialCapacity */ - public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) { + public AutoExpandingBufferWriteTransport(TConfiguration config, int initialCapacity, int frontReserve) throws TTransportException { + super(config); if (initialCapacity < 1) { throw new IllegalArgumentException("initialCapacity"); } diff --git a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java index b6b065748..c792f3b1c 100644 --- a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java @@ -1,5 +1,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -7,14 +9,16 @@ import java.nio.ByteBuffer; /** * ByteBuffer-backed implementation of TTransport. */ -public final class TByteBuffer extends TTransport { +public final class TByteBuffer extends TEndpointTransport { private final ByteBuffer byteBuffer; /** * Creates a new TByteBuffer wrapping a given NIO ByteBuffer. */ - public TByteBuffer(ByteBuffer byteBuffer) { + public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException { + super(new TConfiguration()); this.byteBuffer = byteBuffer; + updateKnownMessageSize(byteBuffer.capacity()); } @Override @@ -32,6 +36,9 @@ public final class TByteBuffer extends TTransport { @Override public int read(byte[] buf, int off, int len) throws TTransportException { + // + checkReadBytesAvailable(len); + final int n = Math.min(byteBuffer.remaining(), len); if (n > 0) { try { diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java new file mode 100644 index 000000000..f32efae90 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import org.apache.thrift.TConfiguration; + +import java.util.Objects; + +public abstract class TEndpointTransport extends TTransport{ + + protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); } + + protected long knownMessageSize; + protected long remainingMessageSize; + + private TConfiguration _configuration; + public TConfiguration getConfiguration() { + return _configuration; + } + + public TEndpointTransport( TConfiguration config) throws TTransportException { + _configuration = Objects.isNull(config) ? new TConfiguration() : config; + + resetConsumedMessageSize(-1); + } + + /** + * Resets RemainingMessageSize to the configured maximum + * @param newSize + */ + protected void resetConsumedMessageSize(long newSize) throws TTransportException { + // full reset + if (newSize < 0) + { + knownMessageSize = getMaxMessageSize(); + remainingMessageSize = getMaxMessageSize(); + return; + } + + // update only: message size can shrink, but not grow + if (newSize > knownMessageSize) + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + + knownMessageSize = newSize; + remainingMessageSize = newSize; + } + + /** + * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport). + * Will throw if we already consumed too many bytes or if the new size is larger than allowed. + * @param size + */ + public void updateKnownMessageSize(long size) throws TTransportException { + long consumed = knownMessageSize - remainingMessageSize; + resetConsumedMessageSize(size == 0 ? -1 : size); + countConsumedMessageBytes(consumed); + } + + /** + * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data + * @param numBytes + */ + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + if (remainingMessageSize < numBytes) + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + } + + /** + * Consumes numBytes from the RemainingMessageSize. + * @param numBytes + */ + protected void countConsumedMessageBytes(long numBytes) throws TTransportException { + if (remainingMessageSize >= numBytes) + { + remainingMessageSize -= numBytes; + } + else + { + remainingMessageSize = 0; + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + } + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java deleted file mode 100644 index a1fd2490a..000000000 --- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.thrift.transport; - -/** - * This transport is wire compatible with {@link TFramedTransport}, but makes - * use of reusable, expanding read and write buffers in order to avoid - * allocating new byte[]s all the time. Since the buffers only expand, you - * should probably only use this transport if your messages are not too variably - * large, unless the persistent memory cost is not an issue. - * - * This implementation is NOT threadsafe. - */ -public class TFastFramedTransport extends TTransport { - - public static class Factory extends TTransportFactory { - private final int initialCapacity; - private final int maxLength; - - public Factory() { - this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); - } - - public Factory(int initialCapacity) { - this(initialCapacity, DEFAULT_MAX_LENGTH); - } - - public Factory(int initialCapacity, int maxLength) { - this.initialCapacity = initialCapacity; - this.maxLength = maxLength; - } - - @Override - public TTransport getTransport(TTransport trans) { - return new TFastFramedTransport(trans, - initialCapacity, - maxLength); - } - } - - /** - * How big should the default read and write buffers be? - */ - public static final int DEFAULT_BUF_CAPACITY = 1024; - /** - * How big is the largest allowable frame? Defaults to 16MB. - */ - public static final int DEFAULT_MAX_LENGTH = 16384000; - - private final TTransport underlying; - private final AutoExpandingBufferWriteTransport writeBuffer; - private AutoExpandingBufferReadTransport readBuffer; - private final int initialBufferCapacity; - private final byte[] i32buf = new byte[4]; - private final int maxLength; - - /** - * Create a new {@link TFastFramedTransport}. Use the defaults - * for initial buffer size and max frame length. - * @param underlying Transport that real reads and writes will go through to. - */ - public TFastFramedTransport(TTransport underlying) { - this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); - } - - /** - * Create a new {@link TFastFramedTransport}. Use the specified - * initial buffer capacity and the default max frame length. - * @param underlying Transport that real reads and writes will go through to. - * @param initialBufferCapacity The initial size of the read and write buffers. - * In practice, it's not critical to set this unless you know in advance that - * your messages are going to be very large. - */ - public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) { - this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH); - } - - /** - * - * @param underlying Transport that real reads and writes will go through to. - * @param initialBufferCapacity The initial size of the read and write buffers. - * In practice, it's not critical to set this unless you know in advance that - * your messages are going to be very large. (You can pass - * TFramedTransportWithReusableBuffer.DEFAULT_BUF_CAPACITY if you're only - * using this constructor because you want to set the maxLength.) - * @param maxLength The max frame size you are willing to read. You can use - * this parameter to limit how much memory can be allocated. - */ - public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) { - this.underlying = underlying; - this.maxLength = maxLength; - this.initialBufferCapacity = initialBufferCapacity; - readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); - writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4); - } - - @Override - public void close() { - underlying.close(); - } - - @Override - public boolean isOpen() { - return underlying.isOpen(); - } - - @Override - public void open() throws TTransportException { - underlying.open(); - } - - @Override - public int read(byte[] buf, int off, int len) throws TTransportException { - int got = readBuffer.read(buf, off, len); - if (got > 0) { - return got; - } - - // Read another frame of data - readFrame(); - - return readBuffer.read(buf, off, len); - } - - private void readFrame() throws TTransportException { - underlying.readAll(i32buf , 0, 4); - int size = TFramedTransport.decodeFrameSize(i32buf); - - if (size < 0) { - close(); - throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); - } - - if (size > maxLength) { - close(); - throw new TTransportException(TTransportException.CORRUPTED_DATA, - "Frame size (" + size + ") larger than max length (" + maxLength + ")!"); - } - - readBuffer.fill(underlying, size); - } - - @Override - public void write(byte[] buf, int off, int len) throws TTransportException { - writeBuffer.write(buf, off, len); - } - - @Override - public void consumeBuffer(int len) { - readBuffer.consumeBuffer(len); - } - - /** - * Only clears the read buffer! - */ - public void clear() { - readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); - } - - @Override - public void flush() throws TTransportException { - int payloadLength = writeBuffer.getLength() - 4; - byte[] data = writeBuffer.getBuf().array(); - TFramedTransport.encodeFrameSize(payloadLength, data); - underlying.write(data, 0, payloadLength + 4); - writeBuffer.reset(); - underlying.flush(); - } - - @Override - public byte[] getBuffer() { - return readBuffer.getBuffer(); - } - - @Override - public int getBufferPosition() { - return readBuffer.getBufferPosition(); - } - - @Override - public int getBytesRemainingInBuffer() { - return readBuffer.getBytesRemainingInBuffer(); - } -} diff --git a/lib/java/src/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/org/apache/thrift/transport/TFileTransport.java index 88b73e54d..85f97084c 100644 --- a/lib/java/src/org/apache/thrift/transport/TFileTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFileTransport.java @@ -26,13 +26,14 @@ import java.io.OutputStream; import java.io.IOException; import java.util.Random; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * FileTransport implementation of the TTransport interface. * Currently this is a straightforward port of the cpp implementation - * + * * It may make better sense to provide a basic stream access on top of the framed file format * The FileTransport can then be a user of this framed file format with some additional logic * for chunking. @@ -44,7 +45,7 @@ public class TFileTransport extends TTransport { public static class TruncableBufferedInputStream extends BufferedInputStream { public void trunc() { pos = count = 0; - } + } public TruncableBufferedInputStream(InputStream in) { super(in); } @@ -62,7 +63,7 @@ public class TFileTransport extends TTransport { /** * Initialize an event. Initially, it has no valid contents * - * @param buf byte array buffer to store event + * @param buf byte array buffer to store event */ public Event(byte[] buf) { buf_ = buf; @@ -88,9 +89,9 @@ public class TFileTransport extends TTransport { return(ndesired); } - }; + } - public static class ChunkState { + public static class ChunkState { /** * Chunk Size. Must be same across all implementations */ @@ -111,7 +112,7 @@ public class TFileTransport extends TTransport { public long getOffset() { return (offset_);} } - public static enum TailPolicy { + public enum TailPolicy { NOWAIT(0, 0), WAIT_FOREVER(500, -1); @@ -148,13 +149,13 @@ public class TFileTransport extends TTransport { TailPolicy currentPolicy_ = TailPolicy.NOWAIT; - /** + /** * Underlying file being read */ protected TSeekableFile inputFile_ = null; - /** - * Underlying outputStream + /** + * Underlying outputStream */ protected OutputStream outputStream_ = null; @@ -181,7 +182,7 @@ public class TFileTransport extends TTransport { /** * Get File Tailing Policy - * + * * @return current read policy */ public TailPolicy getTailPolicy() { @@ -190,7 +191,7 @@ public class TFileTransport extends TTransport { /** * Set file Tailing Policy - * + * * @param policy New policy to set * @return Old policy */ @@ -203,7 +204,7 @@ public class TFileTransport extends TTransport { /** * Initialize read input stream - * + * * @return input stream to read from file */ private InputStream createInputStream() throws TTransportException { @@ -223,7 +224,7 @@ public class TFileTransport extends TTransport { /** * Read (potentially tailing) an input stream - * + * * @param is InputStream to read from * @param buf Buffer to read into * @param off Offset in buffer to read into @@ -232,7 +233,7 @@ public class TFileTransport extends TTransport { * * @return number of bytes read */ - private int tailRead(InputStream is, byte[] buf, + private int tailRead(InputStream is, byte[] buf, int off, int len, TailPolicy tp) throws TTransportException { int orig_len = len; try { @@ -322,7 +323,7 @@ public class TFileTransport extends TTransport { // check if event is corrupted and do recovery as required if(esize > cs.getRemaining()) { throw new TTransportException("FileTransport error: bad event size"); - /* + /* if(performRecovery()) { esize=0; } else { @@ -361,7 +362,7 @@ public class TFileTransport extends TTransport { * Files are not opened in ctor - but in explicit open call */ public void open() throws TTransportException { - if (isOpen()) + if (isOpen()) throw new TTransportException(TTransportException.ALREADY_OPEN); try { @@ -406,7 +407,7 @@ public class TFileTransport extends TTransport { * * @param path File path to read and write from * @param readOnly Whether this is a read-only transport - */ + */ public TFileTransport(final String path, boolean readOnly) throws IOException { inputFile_ = new TStandardFile(path); readOnly_ = readOnly; @@ -457,8 +458,8 @@ public class TFileTransport extends TTransport { * @throws TTransportException if there was an error reading data */ public int read(byte[] buf, int off, int len) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before reading"); if(currentEvent_.getRemaining() == 0) { @@ -471,14 +472,14 @@ public class TFileTransport extends TTransport { } public int getNumChunks() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getNumChunks"); try { long len = inputFile_.length(); if(len == 0) return 0; - else + else return (((int)(len/cs.getChunkSize())) + 1); } catch (IOException iox) { @@ -487,8 +488,8 @@ public class TFileTransport extends TTransport { } public int getCurChunk() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getCurChunk"); return (cs.getChunkNum()); @@ -496,8 +497,8 @@ public class TFileTransport extends TTransport { public void seekToChunk(int chunk) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); int numChunks = getNumChunks(); @@ -527,7 +528,7 @@ public class TFileTransport extends TTransport { } if(chunk*cs.getChunkSize() != cs.getOffset()) { - try { inputFile_.seek((long)chunk*cs.getChunkSize()); } + try { inputFile_.seek((long)chunk*cs.getChunkSize()); } catch (IOException iox) { throw new TTransportException("Seek to chunk " + chunk + " " +iox.getMessage(), iox); @@ -549,8 +550,8 @@ public class TFileTransport extends TTransport { } public void seekToEnd() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); seekToChunk(getNumChunks()); } @@ -577,9 +578,25 @@ public class TFileTransport extends TTransport { throw new TTransportException("Not Supported"); } + + @Override + public TConfiguration getConfiguration() { + return null; + } + + @Override + public void updateKnownMessageSize(long size) throws TTransportException { + + } + + @Override + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + + } + /** * test program - * + * */ public static void main(String[] args) throws Exception { @@ -594,7 +611,7 @@ public class TFileTransport extends TTransport { try { num_chunks = Integer.parseInt(args[1]); } catch (Exception e) { - LOGGER.error("Cannot parse " + args[1]); + LOGGER.error("Cannot parse " + args[1]); printUsage(); } } diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java deleted file mode 100644 index a006c3a6a..000000000 --- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.thrift.transport; - -import org.apache.thrift.TByteArrayOutputStream; - -/** - * TFramedTransport is a buffered TTransport that ensures a fully read message - * every time by preceding messages with a 4-byte frame size. - */ -public class TFramedTransport extends TTransport { - - protected static final int DEFAULT_MAX_LENGTH = 16384000; - - private int maxLength_; - - /** - * Underlying transport - */ - private TTransport transport_ = null; - - /** - * Buffer for output - */ - private final TByteArrayOutputStream writeBuffer_ = - new TByteArrayOutputStream(1024); - - /** - * Buffer for input - */ - private final TMemoryInputTransport readBuffer_ = - new TMemoryInputTransport(new byte[0]); - - public static class Factory extends TTransportFactory { - private int maxLength_; - - public Factory() { - maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; - } - - public Factory(int maxLength) { - maxLength_ = maxLength; - } - - @Override - public TTransport getTransport(TTransport base) { - return new TFramedTransport(base, maxLength_); - } - } - - /** - * Something to fill in the first four bytes of the buffer - * to make room for the frame size. This allows the - * implementation to write once instead of twice. - */ - private static final byte[] sizeFiller_ = new byte[] { 0x00, 0x00, 0x00, 0x00 }; - - /** - * Constructor wraps around another transport - */ - public TFramedTransport(TTransport transport, int maxLength) { - transport_ = transport; - maxLength_ = maxLength; - writeBuffer_.write(sizeFiller_, 0, 4); - } - - public TFramedTransport(TTransport transport) { - transport_ = transport; - maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; - writeBuffer_.write(sizeFiller_, 0, 4); - } - - public void open() throws TTransportException { - transport_.open(); - } - - public boolean isOpen() { - return transport_.isOpen(); - } - - public void close() { - transport_.close(); - } - - public int read(byte[] buf, int off, int len) throws TTransportException { - int got = readBuffer_.read(buf, off, len); - if (got > 0) { - return got; - } - - // Read another frame of data - readFrame(); - - return readBuffer_.read(buf, off, len); - } - - @Override - public byte[] getBuffer() { - return readBuffer_.getBuffer(); - } - - @Override - public int getBufferPosition() { - return readBuffer_.getBufferPosition(); - } - - @Override - public int getBytesRemainingInBuffer() { - return readBuffer_.getBytesRemainingInBuffer(); - } - - @Override - public void consumeBuffer(int len) { - readBuffer_.consumeBuffer(len); - } - - public void clear() { - readBuffer_.clear(); - } - - private final byte[] i32buf = new byte[4]; - - private void readFrame() throws TTransportException { - transport_.readAll(i32buf, 0, 4); - int size = decodeFrameSize(i32buf); - - if (size < 0) { - close(); - throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); - } - - if (size > maxLength_) { - close(); - throw new TTransportException(TTransportException.CORRUPTED_DATA, - "Frame size (" + size + ") larger than max length (" + maxLength_ + ")!"); - } - - byte[] buff = new byte[size]; - transport_.readAll(buff, 0, size); - readBuffer_.reset(buff); - } - - public void write(byte[] buf, int off, int len) throws TTransportException { - writeBuffer_.write(buf, off, len); - } - - @Override - public void flush() throws TTransportException { - byte[] buf = writeBuffer_.get(); - int len = writeBuffer_.len() - 4; // account for the prepended frame size - writeBuffer_.reset(); - writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data - - encodeFrameSize(len, buf); // this is the frame length without the filler - transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data - transport_.flush(); - } - - public static final void encodeFrameSize(final int frameSize, final byte[] buf) { - buf[0] = (byte)(0xff & (frameSize >> 24)); - buf[1] = (byte)(0xff & (frameSize >> 16)); - buf[2] = (byte)(0xff & (frameSize >> 8)); - buf[3] = (byte)(0xff & (frameSize)); - } - - public static final int decodeFrameSize(final byte[] buf) { - return - ((buf[0] & 0xff) << 24) | - ((buf[1] & 0xff) << 16) | - ((buf[2] & 0xff) << 8) | - ((buf[3] & 0xff)); - } -} diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java index c3063fe43..7d61b5c8e 100644 --- a/lib/java/src/org/apache/thrift/transport/THttpClient.java +++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java @@ -37,6 +37,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.params.CoreConnectionPNames; +import org.apache.thrift.TConfiguration; /** * HTTP implementation of the TTransport interface. Used for working with a @@ -51,7 +52,7 @@ import org.apache.http.params.CoreConnectionPNames; * HttpClient to THttpClient(String url, HttpClient client) will create an * instance which will use HttpURLConnection. * - * When using HttpClient, the following configuration leads to 5-15% + * When using HttpClient, the following configuration leads to 5-15% * better performance than the HttpURLConnection implementation: * * http.protocol.version=HttpVersion.HTTP_1_1 @@ -65,7 +66,7 @@ import org.apache.http.params.CoreConnectionPNames; * @see THRIFT-970 */ -public class THttpClient extends TTransport { +public class THttpClient extends TEndpointTransport { private URL url_ = null; @@ -80,14 +81,14 @@ public class THttpClient extends TTransport { private Map customHeaders_ = null; private final HttpHost host; - + private final HttpClient client; - + public static class Factory extends TTransportFactory { - + private final String url; private final HttpClient client; - + public Factory(String url) { this.url = url; this.client = null; @@ -97,14 +98,14 @@ public class THttpClient extends TTransport { this.url = url; this.client = client; } - + @Override public TTransport getTransport(TTransport trans) { try { if (null != client) { - return new THttpClient(url, client); + return new THttpClient(trans.getConfiguration(), url, client); } else { - return new THttpClient(url); + return new THttpClient(trans.getConfiguration(), url); } } catch (TTransportException tte) { return null; @@ -112,7 +113,19 @@ public class THttpClient extends TTransport { } } + public THttpClient(TConfiguration config, String url) throws TTransportException { + super(config); + try { + url_ = new URL(url); + this.client = null; + this.host = null; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + public THttpClient(String url) throws TTransportException { + super(new TConfiguration()); try { url_ = new URL(url); this.client = null; @@ -122,7 +135,19 @@ public class THttpClient extends TTransport { } } + public THttpClient(TConfiguration config, String url, HttpClient client) throws TTransportException { + super(config); + try { + url_ = new URL(url); + this.client = client; + this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol()); + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + public THttpClient(String url, HttpClient client) throws TTransportException { + super(new TConfiguration()); try { url_ = new URL(url); this.client = client; @@ -168,7 +193,6 @@ public class THttpClient extends TTransport { try { inputStream_.close(); } catch (IOException ioe) { - ; } inputStream_ = null; } @@ -182,11 +206,16 @@ public class THttpClient extends TTransport { if (inputStream_ == null) { throw new TTransportException("Response buffer is empty, no request."); } + + checkReadBytesAvailable(len); + try { int ret = inputStream_.read(buf, off, len); if (ret == -1) { throw new TTransportException("No more data available."); } + countConsumedMessageBytes(ret); + return ret; } catch (IOException iox) { throw new TTransportException(iox); @@ -214,7 +243,7 @@ public class THttpClient extends TTransport { } private void flushUsingHttpClient() throws TTransportException { - + if (null == this.client) { throw new TTransportException("Null HttpClient, aborting."); } @@ -224,22 +253,22 @@ public class THttpClient extends TTransport { requestBuffer_.reset(); HttpPost post = null; - + InputStream is = null; - - try { + + try { // Set request to path + query string post = new HttpPost(this.url_.getFile()); - + // // Headers are added to the HttpPost instance, not // to HttpClient. // - + post.setHeader("Content-Type", "application/x-thrift"); post.setHeader("Accept", "application/x-thrift"); post.setHeader("User-Agent", "Java/THttpClient/HC"); - + if (null != customHeaders_) { for (Map.Entry header : customHeaders_.entrySet()) { post.setHeader(header.getKey(), header.getValue()); @@ -247,17 +276,17 @@ public class THttpClient extends TTransport { } post.setEntity(new ByteArrayEntity(data)); - + HttpResponse response = this.client.execute(this.host, post); int responseCode = response.getStatusLine().getStatusCode(); - // + // // Retrieve the inputstream BEFORE checking the status code so // resources get freed in the finally clause. // is = response.getEntity().getContent(); - + if (responseCode != HttpStatus.SC_OK) { throw new TTransportException("HTTP Response code: " + responseCode); } @@ -268,10 +297,10 @@ public class THttpClient extends TTransport { // thrift struct is being read up the chain). // Proceeding differently might lead to exhaustion of connections and thus // to app failure. - + byte[] buf = new byte[1024]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); - + int len = 0; do { len = is.read(buf); @@ -279,7 +308,7 @@ public class THttpClient extends TTransport { baos.write(buf, 0, len); } } while (-1 != len); - + try { // Indicate we're done with the content. consume(response.getEntity()); @@ -287,7 +316,7 @@ public class THttpClient extends TTransport { // We ignore this exception, it might only mean the server has no // keep-alive capability. } - + inputStream_ = new ByteArrayInputStream(baos.toByteArray()); } catch (IOException ioe) { // Abort method so the connection gets released back to the connection manager @@ -296,6 +325,7 @@ public class THttpClient extends TTransport { } throw new TTransportException(ioe); } finally { + resetConsumedMessageSize(-1); if (null != is) { // Close the entity's input stream, this will release the underlying connection try { @@ -357,6 +387,8 @@ public class THttpClient extends TTransport { } catch (IOException iox) { throw new TTransportException(iox); + } finally { + resetConsumedMessageSize(-1); } } } diff --git a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java index d97d5063e..763e66ad6 100644 --- a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,7 @@ import java.io.OutputStream; * has to provide a variety of types of streams. * */ -public class TIOStreamTransport extends TTransport { +public class TIOStreamTransport extends TEndpointTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName()); @@ -47,33 +48,80 @@ public class TIOStreamTransport extends TTransport { * Subclasses can invoke the default constructor and then assign the input * streams in the open method. */ - protected TIOStreamTransport() {} + protected TIOStreamTransport(TConfiguration config) throws TTransportException { + super(config); + } + + /** + * Subclasses can invoke the default constructor and then assign the input + * streams in the open method. + */ + protected TIOStreamTransport() throws TTransportException { + super(new TConfiguration()); + } /** * Input stream constructor, constructs an input only transport. * + * @param config * @param is Input stream to read from */ - public TIOStreamTransport(InputStream is) { + public TIOStreamTransport(TConfiguration config, InputStream is) throws TTransportException { + super(config); inputStream_ = is; } + /** + * Input stream constructor, constructs an input only transport. + * + * @param is Input stream to read from + */ + public TIOStreamTransport(InputStream is) throws TTransportException { + super(new TConfiguration()); + inputStream_ = is; + } + + /** + * Output stream constructor, constructs an output only transport. + * + * @param config + * @param os Output stream to write to + */ + public TIOStreamTransport(TConfiguration config, OutputStream os) throws TTransportException { + super(config); + outputStream_ = os; + } /** * Output stream constructor, constructs an output only transport. * * @param os Output stream to write to */ - public TIOStreamTransport(OutputStream os) { + public TIOStreamTransport(OutputStream os) throws TTransportException { + super(new TConfiguration()); outputStream_ = os; } /** * Two-way stream constructor. * + * @param config * @param is Input stream to read from * @param os Output stream to read from */ - public TIOStreamTransport(InputStream is, OutputStream os) { + public TIOStreamTransport(TConfiguration config, InputStream is, OutputStream os) throws TTransportException { + super(config); + inputStream_ = is; + outputStream_ = os; + } + + /** + * Two-way stream constructor. + * + * @param is Input stream to read from + * @param os Output stream to read from + */ + public TIOStreamTransport(InputStream is, OutputStream os) throws TTransportException { + super(new TConfiguration()); inputStream_ = is; outputStream_ = os; } @@ -158,6 +206,9 @@ public class TIOStreamTransport extends TTransport { } try { outputStream_.flush(); + + resetConsumedMessageSize(-1); + } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java index b19ac86d2..c3a3eb4e1 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java @@ -20,21 +20,39 @@ package org.apache.thrift.transport; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; + import java.nio.charset.Charset; /** * Memory buffer-based implementation of the TTransport interface. */ -public class TMemoryBuffer extends TTransport { +public class TMemoryBuffer extends TEndpointTransport { + /** + * Create a TMemoryBuffer with an initial buffer size of size. The + * internal buffer will grow as necessary to accommodate the size of the data + * being written to it. + * + * @param size the initial size of the buffer + */ + public TMemoryBuffer(int size) throws TTransportException { + super(new TConfiguration()); + arr_ = new TByteArrayOutputStream(size); + updateKnownMessageSize(size); + } + /** * Create a TMemoryBuffer with an initial buffer size of size. The * internal buffer will grow as necessary to accommodate the size of the data * being written to it. * + * @param config * @param size the initial size of the buffer */ - public TMemoryBuffer(int size) { + public TMemoryBuffer(TConfiguration config, int size) throws TTransportException { + super(config); arr_ = new TByteArrayOutputStream(size); + updateKnownMessageSize(size); } @Override @@ -53,9 +71,11 @@ public class TMemoryBuffer extends TTransport { } @Override - public int read(byte[] buf, int off, int len) { + public int read(byte[] buf, int off, int len) throws TTransportException { + checkReadBytesAvailable(len); byte[] src = arr_.get(); int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len); + if (amtToRead > 0) { System.arraycopy(src, pos_, buf, off, amtToRead); pos_ += amtToRead; diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java index 2530dcc36..6cb06fc37 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java @@ -18,21 +18,38 @@ */ package org.apache.thrift.transport; -public final class TMemoryInputTransport extends TTransport { +import org.apache.thrift.TConfiguration; + +public final class TMemoryInputTransport extends TEndpointTransport { private byte[] buf_; private int pos_; private int endPos_; - public TMemoryInputTransport() { + public TMemoryInputTransport() throws TTransportException { + this(new TConfiguration()); + } + + public TMemoryInputTransport(TConfiguration _configuration) throws TTransportException { + this(_configuration, new byte[0]); + } + + public TMemoryInputTransport(byte[] buf) throws TTransportException { + this(new TConfiguration(), buf); } - public TMemoryInputTransport(byte[] buf) { - reset(buf); + public TMemoryInputTransport(TConfiguration _configuration, byte[] buf) throws TTransportException { + this(_configuration, buf, 0, buf.length); } - public TMemoryInputTransport(byte[] buf, int offset, int length) { + public TMemoryInputTransport(byte[] buf, int offset, int length) throws TTransportException { + this(new TConfiguration(), buf, offset, length); + } + + public TMemoryInputTransport(TConfiguration _configuration, byte[] buf, int offset, int length) throws TTransportException { + super(_configuration); reset(buf, offset, length); + updateKnownMessageSize(length); } public void reset(byte[] buf) { @@ -43,10 +60,20 @@ public final class TMemoryInputTransport extends TTransport { buf_ = buf; pos_ = offset; endPos_ = offset + length; + try { + resetConsumedMessageSize(-1); + } catch (TTransportException e) { + // ignore + } } public void clear() { buf_ = null; + try { + resetConsumedMessageSize(-1); + } catch (TTransportException e) { + // ignore + } } @Override @@ -67,6 +94,7 @@ public final class TMemoryInputTransport extends TTransport { if (amtToRead > 0) { System.arraycopy(buf_, pos_, buf, off, amtToRead); consumeBuffer(amtToRead); + countConsumedMessageBytes(amtToRead); } return amtToRead; } diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java index f41bc09c8..0172ca816 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java @@ -22,18 +22,28 @@ package org.apache.thrift.transport; import java.nio.ByteBuffer; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; /** * In memory transport with separate buffers for input and output. */ -public class TMemoryTransport extends TTransport { +public class TMemoryTransport extends TEndpointTransport { private final ByteBuffer inputBuffer; private final TByteArrayOutputStream outputBuffer; - public TMemoryTransport(byte[] input) { + public TMemoryTransport(byte[] input) throws TTransportException { + super(new TConfiguration()); inputBuffer = ByteBuffer.wrap(input); outputBuffer = new TByteArrayOutputStream(1024); + updateKnownMessageSize(input.length); + } + + public TMemoryTransport(TConfiguration config, byte[] input) throws TTransportException { + super(config); + inputBuffer = ByteBuffer.wrap(input); + outputBuffer = new TByteArrayOutputStream(1024); + updateKnownMessageSize(input.length); } @Override @@ -56,6 +66,7 @@ public class TMemoryTransport extends TTransport { @Override public int read(byte[] buf, int off, int len) throws TTransportException { + checkReadBytesAvailable(len); int remaining = inputBuffer.remaining(); if (remaining < len) { throw new TTransportException(TTransportException.END_OF_FILE, diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java index 37a66d614..76ed02cbb 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -30,6 +30,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public class TNonblockingSocket extends TNonblockingTransport { private final SocketChannel socketChannel_; - public TNonblockingSocket(String host, int port) throws IOException { + public TNonblockingSocket(String host, int port) throws IOException, TTransportException { this(host, port, 0); } @@ -57,7 +58,7 @@ public class TNonblockingSocket extends TNonblockingTransport { * @param port * @throws IOException */ - public TNonblockingSocket(String host, int port, int timeout) throws IOException { + public TNonblockingSocket(String host, int port, int timeout) throws IOException, TTransportException { this(SocketChannel.open(), timeout, new InetSocketAddress(host, port)); } @@ -67,13 +68,19 @@ public class TNonblockingSocket extends TNonblockingTransport { * @param socketChannel Already created SocketChannel object * @throws IOException if there is an error setting up the streams */ - public TNonblockingSocket(SocketChannel socketChannel) throws IOException { + public TNonblockingSocket(SocketChannel socketChannel) throws IOException, TTransportException { this(socketChannel, 0, null); if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected"); } private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress) - throws IOException { + throws IOException, TTransportException { + this(new TConfiguration(), socketChannel, timeout, socketAddress); + } + + private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel, int timeout, SocketAddress socketAddress) + throws IOException, TTransportException { + super(config); socketChannel_ = socketChannel; socketAddress_ = socketAddress; diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java index 43c130688..255595d6c 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -19,13 +19,19 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -public abstract class TNonblockingTransport extends TTransport { +public abstract class TNonblockingTransport extends TEndpointTransport { + + public TNonblockingTransport(TConfiguration config) throws TTransportException { + super(config); + } /** * Non-blocking connection initialization. diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java index 570f53373..3389e4d2a 100644 --- a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; * TSocket and TServerSocket */ public class TSSLTransportFactory { - + private static final Logger LOGGER = LoggerFactory.getLogger(TSSLTransportFactory.class); @@ -350,7 +350,7 @@ public class TSSLTransportFactory { } isKeyStoreSet = true; } - + /** * Set the keystore, password, certificate type and the store type * @@ -363,7 +363,7 @@ public class TSSLTransportFactory { this.keyStoreStream = keyStoreStream; setKeyStore("", keyPass, keyManagerType, keyStoreType); } - + /** * Set the keystore and password * @@ -373,7 +373,7 @@ public class TSSLTransportFactory { public void setKeyStore(String keyStore, String keyPass) { setKeyStore(keyStore, keyPass, null, null); } - + /** * Set the keystore and password * @@ -383,7 +383,7 @@ public class TSSLTransportFactory { public void setKeyStore(InputStream keyStoreStream, String keyPass) { setKeyStore(keyStoreStream, keyPass, null, null); } - + /** * Set the truststore, password, certificate type and the store type * @@ -403,7 +403,7 @@ public class TSSLTransportFactory { } isTrustStoreSet = true; } - + /** * Set the truststore, password, certificate type and the store type * @@ -426,7 +426,7 @@ public class TSSLTransportFactory { public void setTrustStore(String trustStore, String trustPass) { setTrustStore(trustStore, trustPass, null, null); } - + /** * Set the truststore and password * diff --git a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java index 5fc7cff9b..e5ca41831 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java @@ -47,14 +47,14 @@ public class TSaslClientTransport extends TSaslTransport { /** * Uses the given SaslClient. - * + * * @param saslClient * The SaslClient to use for the subsequent SASL * negotiation. * @param transport * Transport underlying this one. */ - public TSaslClientTransport(SaslClient saslClient, TTransport transport) { + public TSaslClientTransport(SaslClient saslClient, TTransport transport) throws TTransportException { super(saslClient, transport); mechanism = saslClient.getMechanismName(); } @@ -63,14 +63,14 @@ public class TSaslClientTransport extends TSaslTransport { * Creates a SaslClient using the given SASL-specific parameters. * See the Java documentation for Sasl.createSaslClient for the * details of the parameters. - * + * * @param transport * The underlying Thrift transport. * @throws SaslException */ public TSaslClientTransport(String mechanism, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh, TTransport transport) - throws SaslException { + throws SaslException, TTransportException { super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName, props, cbh), transport); this.mechanism = mechanism; diff --git a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java index 31f309ef8..9111712a4 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java @@ -58,7 +58,7 @@ public class TSaslServerTransport extends TSaslTransport { * @param transport * Transport underlying this one. */ - public TSaslServerTransport(TTransport transport) { + public TSaslServerTransport(TTransport transport) throws TTransportException { super(transport); } @@ -71,12 +71,12 @@ public class TSaslServerTransport extends TSaslTransport { * The underlying Thrift transport. */ public TSaslServerTransport(String mechanism, String protocol, String serverName, - Map props, CallbackHandler cbh, TTransport transport) { + Map props, CallbackHandler cbh, TTransport transport) throws TTransportException { super(transport); addServerDefinition(mechanism, protocol, serverName, props, cbh); } - private TSaslServerTransport(Map serverDefinitionMap, TTransport transport) { + private TSaslServerTransport(Map serverDefinitionMap, TTransport transport) throws TTransportException { super(transport); this.serverDefinitionMap.putAll(serverDefinitionMap); } @@ -190,7 +190,7 @@ public class TSaslServerTransport extends TSaslTransport { * receives the same TSaslServerTransport. */ @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { WeakReference ret = transportMap.get(base); if (ret == null || ret.get() == null) { LOGGER.debug("transport map does not contain key", base); diff --git a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java index d1a3d3115..b106c7004 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java @@ -20,6 +20,7 @@ package org.apache.thrift.transport; import java.nio.charset.StandardCharsets; +import java.util.Objects; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -28,6 +29,8 @@ import javax.security.sasl.SaslServer; import org.apache.thrift.EncodingUtils; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.sasl.NegotiationStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +39,7 @@ import org.slf4j.LoggerFactory; * A superclass for SASL client/server thrift transports. A subclass need only * implement the open method. */ -abstract class TSaslTransport extends TTransport { +abstract class TSaslTransport extends TEndpointTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class); @@ -83,7 +86,8 @@ abstract class TSaslTransport extends TTransport { * @param underlyingTransport * The thrift transport which this transport is wrapping. */ - protected TSaslTransport(TTransport underlyingTransport) { + protected TSaslTransport(TTransport underlyingTransport) throws TTransportException { + super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration()); this.underlyingTransport = underlyingTransport; } @@ -96,7 +100,8 @@ abstract class TSaslTransport extends TTransport { * @param underlyingTransport * The thrift transport which this transport is wrapping. */ - protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) { + protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) throws TTransportException { + super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration()); sasl = new SaslParticipant(saslClient); this.underlyingTransport = underlyingTransport; } @@ -151,7 +156,7 @@ abstract class TSaslTransport extends TTransport { } int payloadBytes = EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES); - if (payloadBytes < 0 || payloadBytes > 104857600 /* 100 MB */) { + if (payloadBytes < 0 || payloadBytes > getConfiguration().getMaxMessageSize() /* 100 MB */) { throw sendAndThrowMessage( NegotiationStatus.ERROR, "Invalid payload header length: " + payloadBytes); } diff --git a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java index 42102d9e8..c1bbd4853 100644 --- a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java @@ -18,6 +18,8 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.io.RandomAccessFile; @@ -25,26 +27,43 @@ import java.io.RandomAccessFile; /** * Basic file support for the TTransport interface */ -public final class TSimpleFileTransport extends TTransport { +public final class TSimpleFileTransport extends TEndpointTransport { + + private RandomAccessFile file = null; + private boolean readable; + private boolean writable; + private String path_; - private RandomAccessFile file = null; - private boolean readable; - private boolean writable; - private String path_; + /** + * Create a transport backed by a simple file + * + * @param path the path to the file to open/create + * @param read true to support read operations + * @param write true to support write operations + * @param openFile true to open the file on construction + * @throws TTransportException if file open fails + */ + public TSimpleFileTransport(String path, boolean read, + boolean write, boolean openFile) + throws TTransportException { + this(new TConfiguration(), path, read, write, openFile); + } /** - * Create a transport backed by a simple file - * + * Create a transport backed by a simple file + * + * @param config * @param path the path to the file to open/create * @param read true to support read operations * @param write true to support write operations * @param openFile true to open the file on construction * @throws TTransportException if file open fails */ - public TSimpleFileTransport(String path, boolean read, + public TSimpleFileTransport(TConfiguration config, String path, boolean read, boolean write, boolean openFile) throws TTransportException { + super(config); if (path.length() <= 0) { throw new TTransportException("No path specified"); } @@ -58,11 +77,11 @@ public final class TSimpleFileTransport extends TTransport { open(); } } - + /** - * Create a transport backed by a simple file + * Create a transport backed by a simple file * Implicitly opens file to conform to C++ behavior. - * + * * @param path the path to the file to open/create * @param read true to support read operations * @param write true to support write operations @@ -72,7 +91,7 @@ public final class TSimpleFileTransport extends TTransport { throws TTransportException { this(path, read, write, true); } - + /** * Create a transport backed by a simple read only disk file (implicitly opens * file) @@ -95,7 +114,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Open file if not previously opened. + * Open file if not previously opened. * * @throws TTransportException if open fails */ @@ -111,7 +130,7 @@ public final class TSimpleFileTransport extends TTransport { } catch (IOException ioe) { file = null; throw new TTransportException(ioe.getMessage()); - } + } } } @@ -131,7 +150,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Read up to len many bytes into buf at offset + * Read up to len many bytes into buf at offset * * @param buf houses bytes read * @param off offset into buff to begin writing to @@ -144,6 +163,7 @@ public final class TSimpleFileTransport extends TTransport { if (!readable) { throw new TTransportException("Read operation on write only file"); } + checkReadBytesAvailable(len); int iBytesRead = 0; try { iBytesRead = file.read(buf, off, len); @@ -155,7 +175,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Write len many bytes from buff starting at offset + * Write len many bytes from buff starting at offset * * @param buf buffer containing bytes to write * @param off offset into buffer to begin writing from @@ -213,4 +233,4 @@ public final class TSimpleFileTransport extends TTransport { throw new TTransportException(ex.getMessage()); } } -} \ No newline at end of file +} diff --git a/lib/java/src/org/apache/thrift/transport/TSocket.java b/lib/java/src/org/apache/thrift/transport/TSocket.java index b20b32b78..eb73e8e76 100644 --- a/lib/java/src/org/apache/thrift/transport/TSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TSocket.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ public class TSocket extends TIOStreamTransport { * @throws TTransportException if there is an error setting up the streams */ public TSocket(Socket socket) throws TTransportException { + super(new TConfiguration()); socket_ = socket; try { socket_.setSoLinger(false, 0); @@ -93,23 +95,36 @@ public class TSocket extends TIOStreamTransport { * Creates a new unconnected socket that will connect to the given host * on the given port. * + * @param config check config * @param host Remote host * @param port Remote port */ - public TSocket(String host, int port) { - this(host, port, 0); + public TSocket(TConfiguration config, String host, int port) throws TTransportException { + this(config, host, port, 0); } /** * Creates a new unconnected socket that will connect to the given host * on the given port. * + * @param host Remote host + * @param port Remote port + */ + public TSocket(String host, int port) throws TTransportException { + this(new TConfiguration(), host, port, 0); + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param config check config * @param host Remote host * @param port Remote port * @param timeout Socket timeout and connection timeout */ - public TSocket(String host, int port, int timeout) { - this(host, port, timeout, timeout); + public TSocket(TConfiguration config, String host, int port, int timeout) throws TTransportException { + this(config, host, port, timeout, timeout); } /** @@ -117,12 +132,14 @@ public class TSocket extends TIOStreamTransport { * on the given port, with a specific connection timeout and a * specific socket timeout. * + * @param config check config * @param host Remote host * @param port Remote port * @param socketTimeout Socket timeout * @param connectTimeout Connection timeout */ - public TSocket(String host, int port, int socketTimeout, int connectTimeout) { + public TSocket(TConfiguration config, String host, int port, int socketTimeout, int connectTimeout) throws TTransportException { + super(config); host_ = host; port_ = port; socketTimeout_ = socketTimeout; diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java index 73ad730ce..5645f7fa1 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -19,6 +19,8 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.Closeable; /** @@ -160,4 +162,10 @@ public abstract class TTransport implements Closeable { * @param len */ public void consumeBuffer(int len) {} + + public abstract TConfiguration getConfiguration(); + + public abstract void updateKnownMessageSize(long size) throws TTransportException; + + public abstract void checkReadBytesAvailable(long numBytes) throws TTransportException; } diff --git a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java index 3e71630ae..e068b4beb 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java @@ -34,7 +34,7 @@ public class TTransportFactory { * @param trans The base transport * @return Wrapped Transport */ - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return trans; } diff --git a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java index e755aa532..73b21aa3f 100644 --- a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java @@ -18,9 +18,12 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Objects; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; @@ -38,7 +41,7 @@ public class TZlibTransport extends TIOStreamTransport { } @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { return new TZlibTransport(base); } } @@ -47,7 +50,7 @@ public class TZlibTransport extends TIOStreamTransport { * Constructs a new TZlibTransport instance. * @param transport the underlying transport to read from and write to */ - public TZlibTransport(TTransport transport) { + public TZlibTransport(TTransport transport) throws TTransportException { this(transport, Deflater.BEST_COMPRESSION); } @@ -56,7 +59,8 @@ public class TZlibTransport extends TIOStreamTransport { * @param transport the underlying transport to read from and write to * @param compressionLevel 0 for no compression, 9 for maximum compression */ - public TZlibTransport(TTransport transport, int compressionLevel) { + public TZlibTransport(TTransport transport, int compressionLevel) throws TTransportException { + super(Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration()); transport_ = transport; inputStream_ = new InflaterInputStream(new TTransportInputStream(transport_), new Inflater()); outputStream_ = new DeflaterOutputStream(new TTransportOutputStream(transport_), new Deflater(compressionLevel, false), true); diff --git a/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java new file mode 100644 index 000000000..29bf39c14 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport.layered; + + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.*; + +import java.util.Objects; + +/** + * This transport is wire compatible with {@link TFramedTransport}, but makes + * use of reusable, expanding read and write buffers in order to avoid + * allocating new byte[]s all the time. Since the buffers only expand, you + * should probably only use this transport if your messages are not too variably + * large, unless the persistent memory cost is not an issue. + * + * This implementation is NOT threadsafe. + */ +public class TFastFramedTransport extends TLayeredTransport { + + public static class Factory extends TTransportFactory { + private final int initialCapacity; + private final int maxLength; + + public Factory() { + this(DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + public Factory(int initialCapacity) { + this(initialCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + public Factory(int initialCapacity, int maxLength) { + this.initialCapacity = initialCapacity; + this.maxLength = maxLength; + } + + @Override + public TTransport getTransport(TTransport trans) throws TTransportException { + return new TFastFramedTransport(trans, + initialCapacity, + maxLength); + } + } + + /** + * How big should the default read and write buffers be? + */ + public static final int DEFAULT_BUF_CAPACITY = 1024; + + private final AutoExpandingBufferWriteTransport writeBuffer; + private AutoExpandingBufferReadTransport readBuffer; + private final int initialBufferCapacity; + private final byte[] i32buf = new byte[4]; + private final int maxLength; + + /** + * Create a new {@link TFastFramedTransport}. Use the defaults + * for initial buffer size and max frame length. + * @param underlying Transport that real reads and writes will go through to. + */ + public TFastFramedTransport(TTransport underlying) throws TTransportException { + this(underlying, DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + /** + * Create a new {@link TFastFramedTransport}. Use the specified + * initial buffer capacity and the default max frame length. + * @param underlying Transport that real reads and writes will go through to. + * @param initialBufferCapacity The initial size of the read and write buffers. + * In practice, it's not critical to set this unless you know in advance that + * your messages are going to be very large. + */ + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) throws TTransportException { + this(underlying, initialBufferCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + /** + * + * @param underlying Transport that real reads and writes will go through to. + * @param initialBufferCapacity The initial size of the read and write buffers. + * In practice, it's not critical to set this unless you know in advance that + * your messages are going to be very large. (You can pass + * TFramedTransportWithReusableBuffer.DEFAULT_BUF_CAPACITY if you're only + * using this constructor because you want to set the maxLength.) + * @param maxLength The max frame size you are willing to read. You can use + * this parameter to limit how much memory can be allocated. + */ + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) throws TTransportException { + super(underlying); + TConfiguration config = Objects.isNull(underlying.getConfiguration()) ? new TConfiguration() : underlying.getConfiguration(); + this.maxLength = maxLength; + config.setMaxFrameSize(maxLength); + this.initialBufferCapacity = initialBufferCapacity; + readBuffer = new AutoExpandingBufferReadTransport(config, initialBufferCapacity); + writeBuffer = new AutoExpandingBufferWriteTransport(config, initialBufferCapacity, 4); + } + + @Override + public void close() { + getInnerTransport().close(); + } + + @Override + public boolean isOpen() { + return getInnerTransport().isOpen(); + } + + @Override + public void open() throws TTransportException { + getInnerTransport().open(); + } + + @Override + public int read(byte[] buf, int off, int len) throws TTransportException { + int got = readBuffer.read(buf, off, len); + if (got > 0) { + return got; + } + + // Read another frame of data + readFrame(); + + return readBuffer.read(buf, off, len); + } + + private void readFrame() throws TTransportException { + getInnerTransport().readAll(i32buf , 0, 4); + int size = TFramedTransport.decodeFrameSize(i32buf); + + if (size < 0) { + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); + } + + if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) { + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, + "Frame size (" + size + ") larger than max length (" + maxLength + ")!"); + } + + readBuffer.fill(getInnerTransport(), size); + } + + @Override + public void write(byte[] buf, int off, int len) throws TTransportException { + writeBuffer.write(buf, off, len); + } + + @Override + public void consumeBuffer(int len) { + readBuffer.consumeBuffer(len); + } + + /** + * Only clears the read buffer! + */ + public void clear() throws TTransportException { + readBuffer = new AutoExpandingBufferReadTransport(getConfiguration(), initialBufferCapacity); + } + + @Override + public void flush() throws TTransportException { + int payloadLength = writeBuffer.getLength() - 4; + byte[] data = writeBuffer.getBuf().array(); + TFramedTransport.encodeFrameSize(payloadLength, data); + getInnerTransport().write(data, 0, payloadLength + 4); + writeBuffer.reset(); + getInnerTransport().flush(); + } + + @Override + public byte[] getBuffer() { + return readBuffer.getBuffer(); + } + + @Override + public int getBufferPosition() { + return readBuffer.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() { + return readBuffer.getBytesRemainingInBuffer(); + } +} diff --git a/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java new file mode 100644 index 000000000..10a9a1c17 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport.layered; + +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +import java.util.Objects; + +/** + * TFramedTransport is a buffered TTransport that ensures a fully read message + * every time by preceding messages with a 4-byte frame size. + */ +public class TFramedTransport extends TLayeredTransport { + + /** + * Buffer for output + */ + private final TByteArrayOutputStream writeBuffer_ = + new TByteArrayOutputStream(1024); + + /** + * Buffer for input + */ + private final TMemoryInputTransport readBuffer_; + + public static class Factory extends TTransportFactory { + private int maxLength_; + + public Factory() { + maxLength_ = TConfiguration.DEFAULT_MAX_FRAME_SIZE; + } + + public Factory(int maxLength) { + maxLength_ = maxLength; + } + + @Override + public TTransport getTransport(TTransport base) throws TTransportException { + return new TFramedTransport(base, maxLength_); + } + } + + /** + * Something to fill in the first four bytes of the buffer + * to make room for the frame size. This allows the + * implementation to write once instead of twice. + */ + private static final byte[] sizeFiller_ = new byte[] { 0x00, 0x00, 0x00, 0x00 }; + + /** + * Constructor wraps around another transport + */ + public TFramedTransport(TTransport transport, int maxLength) throws TTransportException { + super(transport); + TConfiguration _configuration = Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration(); + _configuration.setMaxFrameSize(maxLength); + writeBuffer_.write(sizeFiller_, 0, 4); + readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]); + } + + public TFramedTransport(TTransport transport) throws TTransportException { + this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE); + } + + public void open() throws TTransportException { + getInnerTransport().open(); + } + + public boolean isOpen() { + return getInnerTransport().isOpen(); + } + + public void close() { + getInnerTransport().close(); + } + + public int read(byte[] buf, int off, int len) throws TTransportException { + int got = readBuffer_.read(buf, off, len); + if (got > 0) { + return got; + } + + // Read another frame of data + readFrame(); + + return readBuffer_.read(buf, off, len); + } + + @Override + public byte[] getBuffer() { + return readBuffer_.getBuffer(); + } + + @Override + public int getBufferPosition() { + return readBuffer_.getBufferPosition(); + } + + @Override + public int getBytesRemainingInBuffer() { + return readBuffer_.getBytesRemainingInBuffer(); + } + + @Override + public void consumeBuffer(int len) { + readBuffer_.consumeBuffer(len); + } + + public void clear() { + readBuffer_.clear(); + } + + private final byte[] i32buf = new byte[4]; + + private void readFrame() throws TTransportException { + getInnerTransport().readAll(i32buf, 0, 4); + int size = decodeFrameSize(i32buf); + + if (size < 0) { + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); + } + + if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) { + close(); + throw new TTransportException(TTransportException.CORRUPTED_DATA, + "Frame size (" + size + ") larger than max length (" + getInnerTransport().getConfiguration().getMaxFrameSize() + ")!"); + } + + byte[] buff = new byte[size]; + getInnerTransport().readAll(buff, 0, size); + readBuffer_.reset(buff); + } + + public void write(byte[] buf, int off, int len) throws TTransportException { + writeBuffer_.write(buf, off, len); + } + + @Override + public void flush() throws TTransportException { + byte[] buf = writeBuffer_.get(); + int len = writeBuffer_.len() - 4; // account for the prepended frame size + writeBuffer_.reset(); + writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data + + encodeFrameSize(len, buf); // this is the frame length without the filler + getInnerTransport().write(buf, 0, len + 4); // we have to write the frame size and frame data + getInnerTransport().flush(); + } + + public static final void encodeFrameSize(final int frameSize, final byte[] buf) { + buf[0] = (byte)(0xff & (frameSize >> 24)); + buf[1] = (byte)(0xff & (frameSize >> 16)); + buf[2] = (byte)(0xff & (frameSize >> 8)); + buf[3] = (byte)(0xff & (frameSize)); + } + + public static final int decodeFrameSize(final byte[] buf) { + return + ((buf[0] & 0xff) << 24) | + ((buf[1] & 0xff) << 16) | + ((buf[2] & 0xff) << 8) | + ((buf[3] & 0xff)); + } +} diff --git a/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java new file mode 100644 index 000000000..69ec824ee --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport.layered; + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import java.util.Objects; + +public abstract class TLayeredTransport extends TTransport{ + + private TTransport innerTransport; + + public TConfiguration getConfiguration() { + return innerTransport.getConfiguration(); + } + + public TLayeredTransport(TTransport transport) + { + Objects.requireNonNull(transport, "TTransport cannot be null."); + innerTransport = transport; + } + + public void updateKnownMessageSize(long size) throws TTransportException { + innerTransport.updateKnownMessageSize(size); + } + + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + innerTransport.checkReadBytesAvailable(numBytes); + } + + public TTransport getInnerTransport() { + return innerTransport; + } +} -- cgit v1.2.1