diff options
author | Zezeng Wang <51382517@qq.com> | 2020-09-18 16:05:44 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-18 16:05:44 +0800 |
commit | 119030848c4296ddef43d66ffa0cca1354fb357b (patch) | |
tree | 5861c617491cf94b83b48f9f4412dd7a1aa6622f /lib/java/src/org/apache/thrift/transport | |
parent | c77941c60da01f466827ff619d571055ff76351f (diff) | |
parent | 63213c17ad3fece91fdaaca8f59165ca3f41c5c1 (diff) | |
download | thrift-119030848c4296ddef43d66ffa0cca1354fb357b.tar.gz |
Merge pull request #2191 from zeshuai007/Implements_TConfig
THRIFT-5237 Implement MAX_MESSAGE_SIZE and consolidate limits into a TConfiguration class(JAVA)
Diffstat (limited to 'lib/java/src/org/apache/thrift/transport')
25 files changed, 578 insertions, 187 deletions
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java index fc3aa92df..b355d11ca 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java @@ -27,7 +27,7 @@ import java.util.Arrays; * rate slightly faster than the requested capacity with the (untested) * objective of avoiding expensive buffer allocations and copies. */ -class AutoExpandingBuffer { +public class AutoExpandingBuffer { private byte[] array; public AutoExpandingBuffer(int initialCapacity) { diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java index d06fec78c..6fd4075b9 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java @@ -18,17 +18,20 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + /** * TTransport for reading from an AutoExpandingBuffer. */ -public class AutoExpandingBufferReadTransport extends TTransport { +public class AutoExpandingBufferReadTransport extends TEndpointTransport { private final AutoExpandingBuffer buf; private int pos = 0; private int limit = 0; - public AutoExpandingBufferReadTransport(int initialCapacity) { + public AutoExpandingBufferReadTransport(TConfiguration config, int initialCapacity) throws TTransportException { + super(config); this.buf = new AutoExpandingBuffer(initialCapacity); } diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java index ec7e7d45a..25f974a73 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java @@ -18,10 +18,12 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + /** * TTransport for writing to an AutoExpandingBuffer. */ -public final class AutoExpandingBufferWriteTransport extends TTransport { +public final class AutoExpandingBufferWriteTransport extends TEndpointTransport { private final AutoExpandingBuffer buf; private int pos; @@ -38,7 +40,8 @@ public final class AutoExpandingBufferWriteTransport extends TTransport { * @throws IllegalArgumentException if frontReserve is less than zero * @throws IllegalArgumentException if frontReserve is greater than initialCapacity */ - public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) { + public AutoExpandingBufferWriteTransport(TConfiguration config, int initialCapacity, int frontReserve) throws TTransportException { + super(config); if (initialCapacity < 1) { throw new IllegalArgumentException("initialCapacity"); } diff --git a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java index b6b065748..c792f3b1c 100644 --- a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java @@ -1,5 +1,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -7,14 +9,16 @@ import java.nio.ByteBuffer; /** * ByteBuffer-backed implementation of TTransport. */ -public final class TByteBuffer extends TTransport { +public final class TByteBuffer extends TEndpointTransport { private final ByteBuffer byteBuffer; /** * Creates a new TByteBuffer wrapping a given NIO ByteBuffer. */ - public TByteBuffer(ByteBuffer byteBuffer) { + public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException { + super(new TConfiguration()); this.byteBuffer = byteBuffer; + updateKnownMessageSize(byteBuffer.capacity()); } @Override @@ -32,6 +36,9 @@ public final class TByteBuffer extends TTransport { @Override public int read(byte[] buf, int off, int len) throws TTransportException { + // + checkReadBytesAvailable(len); + final int n = Math.min(byteBuffer.remaining(), len); if (n > 0) { try { diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java new file mode 100644 index 000000000..f32efae90 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import org.apache.thrift.TConfiguration; + +import java.util.Objects; + +public abstract class TEndpointTransport extends TTransport{ + + protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); } + + protected long knownMessageSize; + protected long remainingMessageSize; + + private TConfiguration _configuration; + public TConfiguration getConfiguration() { + return _configuration; + } + + public TEndpointTransport( TConfiguration config) throws TTransportException { + _configuration = Objects.isNull(config) ? new TConfiguration() : config; + + resetConsumedMessageSize(-1); + } + + /** + * Resets RemainingMessageSize to the configured maximum + * @param newSize + */ + protected void resetConsumedMessageSize(long newSize) throws TTransportException { + // full reset + if (newSize < 0) + { + knownMessageSize = getMaxMessageSize(); + remainingMessageSize = getMaxMessageSize(); + return; + } + + // update only: message size can shrink, but not grow + if (newSize > knownMessageSize) + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + + knownMessageSize = newSize; + remainingMessageSize = newSize; + } + + /** + * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport). + * Will throw if we already consumed too many bytes or if the new size is larger than allowed. + * @param size + */ + public void updateKnownMessageSize(long size) throws TTransportException { + long consumed = knownMessageSize - remainingMessageSize; + resetConsumedMessageSize(size == 0 ? -1 : size); + countConsumedMessageBytes(consumed); + } + + /** + * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data + * @param numBytes + */ + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + if (remainingMessageSize < numBytes) + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + } + + /** + * Consumes numBytes from the RemainingMessageSize. + * @param numBytes + */ + protected void countConsumedMessageBytes(long numBytes) throws TTransportException { + if (remainingMessageSize >= numBytes) + { + remainingMessageSize -= numBytes; + } + else + { + remainingMessageSize = 0; + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + } + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/org/apache/thrift/transport/TFileTransport.java index 88b73e54d..85f97084c 100644 --- a/lib/java/src/org/apache/thrift/transport/TFileTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFileTransport.java @@ -26,13 +26,14 @@ import java.io.OutputStream; import java.io.IOException; import java.util.Random; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * FileTransport implementation of the TTransport interface. * Currently this is a straightforward port of the cpp implementation - * + * * It may make better sense to provide a basic stream access on top of the framed file format * The FileTransport can then be a user of this framed file format with some additional logic * for chunking. @@ -44,7 +45,7 @@ public class TFileTransport extends TTransport { public static class TruncableBufferedInputStream extends BufferedInputStream { public void trunc() { pos = count = 0; - } + } public TruncableBufferedInputStream(InputStream in) { super(in); } @@ -62,7 +63,7 @@ public class TFileTransport extends TTransport { /** * Initialize an event. Initially, it has no valid contents * - * @param buf byte array buffer to store event + * @param buf byte array buffer to store event */ public Event(byte[] buf) { buf_ = buf; @@ -88,9 +89,9 @@ public class TFileTransport extends TTransport { return(ndesired); } - }; + } - public static class ChunkState { + public static class ChunkState { /** * Chunk Size. Must be same across all implementations */ @@ -111,7 +112,7 @@ public class TFileTransport extends TTransport { public long getOffset() { return (offset_);} } - public static enum TailPolicy { + public enum TailPolicy { NOWAIT(0, 0), WAIT_FOREVER(500, -1); @@ -148,13 +149,13 @@ public class TFileTransport extends TTransport { TailPolicy currentPolicy_ = TailPolicy.NOWAIT; - /** + /** * Underlying file being read */ protected TSeekableFile inputFile_ = null; - /** - * Underlying outputStream + /** + * Underlying outputStream */ protected OutputStream outputStream_ = null; @@ -181,7 +182,7 @@ public class TFileTransport extends TTransport { /** * Get File Tailing Policy - * + * * @return current read policy */ public TailPolicy getTailPolicy() { @@ -190,7 +191,7 @@ public class TFileTransport extends TTransport { /** * Set file Tailing Policy - * + * * @param policy New policy to set * @return Old policy */ @@ -203,7 +204,7 @@ public class TFileTransport extends TTransport { /** * Initialize read input stream - * + * * @return input stream to read from file */ private InputStream createInputStream() throws TTransportException { @@ -223,7 +224,7 @@ public class TFileTransport extends TTransport { /** * Read (potentially tailing) an input stream - * + * * @param is InputStream to read from * @param buf Buffer to read into * @param off Offset in buffer to read into @@ -232,7 +233,7 @@ public class TFileTransport extends TTransport { * * @return number of bytes read */ - private int tailRead(InputStream is, byte[] buf, + private int tailRead(InputStream is, byte[] buf, int off, int len, TailPolicy tp) throws TTransportException { int orig_len = len; try { @@ -322,7 +323,7 @@ public class TFileTransport extends TTransport { // check if event is corrupted and do recovery as required if(esize > cs.getRemaining()) { throw new TTransportException("FileTransport error: bad event size"); - /* + /* if(performRecovery()) { esize=0; } else { @@ -361,7 +362,7 @@ public class TFileTransport extends TTransport { * Files are not opened in ctor - but in explicit open call */ public void open() throws TTransportException { - if (isOpen()) + if (isOpen()) throw new TTransportException(TTransportException.ALREADY_OPEN); try { @@ -406,7 +407,7 @@ public class TFileTransport extends TTransport { * * @param path File path to read and write from * @param readOnly Whether this is a read-only transport - */ + */ public TFileTransport(final String path, boolean readOnly) throws IOException { inputFile_ = new TStandardFile(path); readOnly_ = readOnly; @@ -457,8 +458,8 @@ public class TFileTransport extends TTransport { * @throws TTransportException if there was an error reading data */ public int read(byte[] buf, int off, int len) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before reading"); if(currentEvent_.getRemaining() == 0) { @@ -471,14 +472,14 @@ public class TFileTransport extends TTransport { } public int getNumChunks() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getNumChunks"); try { long len = inputFile_.length(); if(len == 0) return 0; - else + else return (((int)(len/cs.getChunkSize())) + 1); } catch (IOException iox) { @@ -487,8 +488,8 @@ public class TFileTransport extends TTransport { } public int getCurChunk() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getCurChunk"); return (cs.getChunkNum()); @@ -496,8 +497,8 @@ public class TFileTransport extends TTransport { public void seekToChunk(int chunk) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); int numChunks = getNumChunks(); @@ -527,7 +528,7 @@ public class TFileTransport extends TTransport { } if(chunk*cs.getChunkSize() != cs.getOffset()) { - try { inputFile_.seek((long)chunk*cs.getChunkSize()); } + try { inputFile_.seek((long)chunk*cs.getChunkSize()); } catch (IOException iox) { throw new TTransportException("Seek to chunk " + chunk + " " +iox.getMessage(), iox); @@ -549,8 +550,8 @@ public class TFileTransport extends TTransport { } public void seekToEnd() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); seekToChunk(getNumChunks()); } @@ -577,9 +578,25 @@ public class TFileTransport extends TTransport { throw new TTransportException("Not Supported"); } + + @Override + public TConfiguration getConfiguration() { + return null; + } + + @Override + public void updateKnownMessageSize(long size) throws TTransportException { + + } + + @Override + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + + } + /** * test program - * + * */ public static void main(String[] args) throws Exception { @@ -594,7 +611,7 @@ public class TFileTransport extends TTransport { try { num_chunks = Integer.parseInt(args[1]); } catch (Exception e) { - LOGGER.error("Cannot parse " + args[1]); + LOGGER.error("Cannot parse " + args[1]); printUsage(); } } diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java index c3063fe43..7d61b5c8e 100644 --- a/lib/java/src/org/apache/thrift/transport/THttpClient.java +++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java @@ -37,6 +37,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.params.CoreConnectionPNames; +import org.apache.thrift.TConfiguration; /** * HTTP implementation of the TTransport interface. Used for working with a @@ -51,7 +52,7 @@ import org.apache.http.params.CoreConnectionPNames; * HttpClient to THttpClient(String url, HttpClient client) will create an * instance which will use HttpURLConnection. * - * When using HttpClient, the following configuration leads to 5-15% + * When using HttpClient, the following configuration leads to 5-15% * better performance than the HttpURLConnection implementation: * * http.protocol.version=HttpVersion.HTTP_1_1 @@ -65,7 +66,7 @@ import org.apache.http.params.CoreConnectionPNames; * @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a> */ -public class THttpClient extends TTransport { +public class THttpClient extends TEndpointTransport { private URL url_ = null; @@ -80,14 +81,14 @@ public class THttpClient extends TTransport { private Map<String,String> customHeaders_ = null; private final HttpHost host; - + private final HttpClient client; - + public static class Factory extends TTransportFactory { - + private final String url; private final HttpClient client; - + public Factory(String url) { this.url = url; this.client = null; @@ -97,14 +98,14 @@ public class THttpClient extends TTransport { this.url = url; this.client = client; } - + @Override public TTransport getTransport(TTransport trans) { try { if (null != client) { - return new THttpClient(url, client); + return new THttpClient(trans.getConfiguration(), url, client); } else { - return new THttpClient(url); + return new THttpClient(trans.getConfiguration(), url); } } catch (TTransportException tte) { return null; @@ -112,7 +113,19 @@ public class THttpClient extends TTransport { } } + public THttpClient(TConfiguration config, String url) throws TTransportException { + super(config); + try { + url_ = new URL(url); + this.client = null; + this.host = null; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + public THttpClient(String url) throws TTransportException { + super(new TConfiguration()); try { url_ = new URL(url); this.client = null; @@ -122,7 +135,19 @@ public class THttpClient extends TTransport { } } + public THttpClient(TConfiguration config, String url, HttpClient client) throws TTransportException { + super(config); + try { + url_ = new URL(url); + this.client = client; + this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol()); + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + public THttpClient(String url, HttpClient client) throws TTransportException { + super(new TConfiguration()); try { url_ = new URL(url); this.client = client; @@ -168,7 +193,6 @@ public class THttpClient extends TTransport { try { inputStream_.close(); } catch (IOException ioe) { - ; } inputStream_ = null; } @@ -182,11 +206,16 @@ public class THttpClient extends TTransport { if (inputStream_ == null) { throw new TTransportException("Response buffer is empty, no request."); } + + checkReadBytesAvailable(len); + try { int ret = inputStream_.read(buf, off, len); if (ret == -1) { throw new TTransportException("No more data available."); } + countConsumedMessageBytes(ret); + return ret; } catch (IOException iox) { throw new TTransportException(iox); @@ -214,7 +243,7 @@ public class THttpClient extends TTransport { } private void flushUsingHttpClient() throws TTransportException { - + if (null == this.client) { throw new TTransportException("Null HttpClient, aborting."); } @@ -224,22 +253,22 @@ public class THttpClient extends TTransport { requestBuffer_.reset(); HttpPost post = null; - + InputStream is = null; - - try { + + try { // Set request to path + query string post = new HttpPost(this.url_.getFile()); - + // // Headers are added to the HttpPost instance, not // to HttpClient. // - + post.setHeader("Content-Type", "application/x-thrift"); post.setHeader("Accept", "application/x-thrift"); post.setHeader("User-Agent", "Java/THttpClient/HC"); - + if (null != customHeaders_) { for (Map.Entry<String, String> header : customHeaders_.entrySet()) { post.setHeader(header.getKey(), header.getValue()); @@ -247,17 +276,17 @@ public class THttpClient extends TTransport { } post.setEntity(new ByteArrayEntity(data)); - + HttpResponse response = this.client.execute(this.host, post); int responseCode = response.getStatusLine().getStatusCode(); - // + // // Retrieve the inputstream BEFORE checking the status code so // resources get freed in the finally clause. // is = response.getEntity().getContent(); - + if (responseCode != HttpStatus.SC_OK) { throw new TTransportException("HTTP Response code: " + responseCode); } @@ -268,10 +297,10 @@ public class THttpClient extends TTransport { // thrift struct is being read up the chain). // Proceeding differently might lead to exhaustion of connections and thus // to app failure. - + byte[] buf = new byte[1024]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); - + int len = 0; do { len = is.read(buf); @@ -279,7 +308,7 @@ public class THttpClient extends TTransport { baos.write(buf, 0, len); } } while (-1 != len); - + try { // Indicate we're done with the content. consume(response.getEntity()); @@ -287,7 +316,7 @@ public class THttpClient extends TTransport { // We ignore this exception, it might only mean the server has no // keep-alive capability. } - + inputStream_ = new ByteArrayInputStream(baos.toByteArray()); } catch (IOException ioe) { // Abort method so the connection gets released back to the connection manager @@ -296,6 +325,7 @@ public class THttpClient extends TTransport { } throw new TTransportException(ioe); } finally { + resetConsumedMessageSize(-1); if (null != is) { // Close the entity's input stream, this will release the underlying connection try { @@ -357,6 +387,8 @@ public class THttpClient extends TTransport { } catch (IOException iox) { throw new TTransportException(iox); + } finally { + resetConsumedMessageSize(-1); } } } diff --git a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java index d97d5063e..763e66ad6 100644 --- a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,7 @@ import java.io.OutputStream; * has to provide a variety of types of streams. * */ -public class TIOStreamTransport extends TTransport { +public class TIOStreamTransport extends TEndpointTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName()); @@ -47,33 +48,80 @@ public class TIOStreamTransport extends TTransport { * Subclasses can invoke the default constructor and then assign the input * streams in the open method. */ - protected TIOStreamTransport() {} + protected TIOStreamTransport(TConfiguration config) throws TTransportException { + super(config); + } + + /** + * Subclasses can invoke the default constructor and then assign the input + * streams in the open method. + */ + protected TIOStreamTransport() throws TTransportException { + super(new TConfiguration()); + } /** * Input stream constructor, constructs an input only transport. * + * @param config * @param is Input stream to read from */ - public TIOStreamTransport(InputStream is) { + public TIOStreamTransport(TConfiguration config, InputStream is) throws TTransportException { + super(config); inputStream_ = is; } + /** + * Input stream constructor, constructs an input only transport. + * + * @param is Input stream to read from + */ + public TIOStreamTransport(InputStream is) throws TTransportException { + super(new TConfiguration()); + inputStream_ = is; + } + + /** + * Output stream constructor, constructs an output only transport. + * + * @param config + * @param os Output stream to write to + */ + public TIOStreamTransport(TConfiguration config, OutputStream os) throws TTransportException { + super(config); + outputStream_ = os; + } /** * Output stream constructor, constructs an output only transport. * * @param os Output stream to write to */ - public TIOStreamTransport(OutputStream os) { + public TIOStreamTransport(OutputStream os) throws TTransportException { + super(new TConfiguration()); outputStream_ = os; } /** * Two-way stream constructor. * + * @param config * @param is Input stream to read from * @param os Output stream to read from */ - public TIOStreamTransport(InputStream is, OutputStream os) { + public TIOStreamTransport(TConfiguration config, InputStream is, OutputStream os) throws TTransportException { + super(config); + inputStream_ = is; + outputStream_ = os; + } + + /** + * Two-way stream constructor. + * + * @param is Input stream to read from + * @param os Output stream to read from + */ + public TIOStreamTransport(InputStream is, OutputStream os) throws TTransportException { + super(new TConfiguration()); inputStream_ = is; outputStream_ = os; } @@ -158,6 +206,9 @@ public class TIOStreamTransport extends TTransport { } try { outputStream_.flush(); + + resetConsumedMessageSize(-1); + } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java index b19ac86d2..c3a3eb4e1 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java @@ -20,21 +20,39 @@ package org.apache.thrift.transport; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; + import java.nio.charset.Charset; /** * Memory buffer-based implementation of the TTransport interface. */ -public class TMemoryBuffer extends TTransport { +public class TMemoryBuffer extends TEndpointTransport { + /** + * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The + * internal buffer will grow as necessary to accommodate the size of the data + * being written to it. + * + * @param size the initial size of the buffer + */ + public TMemoryBuffer(int size) throws TTransportException { + super(new TConfiguration()); + arr_ = new TByteArrayOutputStream(size); + updateKnownMessageSize(size); + } + /** * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The * internal buffer will grow as necessary to accommodate the size of the data * being written to it. * + * @param config * @param size the initial size of the buffer */ - public TMemoryBuffer(int size) { + public TMemoryBuffer(TConfiguration config, int size) throws TTransportException { + super(config); arr_ = new TByteArrayOutputStream(size); + updateKnownMessageSize(size); } @Override @@ -53,9 +71,11 @@ public class TMemoryBuffer extends TTransport { } @Override - public int read(byte[] buf, int off, int len) { + public int read(byte[] buf, int off, int len) throws TTransportException { + checkReadBytesAvailable(len); byte[] src = arr_.get(); int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len); + if (amtToRead > 0) { System.arraycopy(src, pos_, buf, off, amtToRead); pos_ += amtToRead; diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java index 2530dcc36..6cb06fc37 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java @@ -18,21 +18,38 @@ */ package org.apache.thrift.transport; -public final class TMemoryInputTransport extends TTransport { +import org.apache.thrift.TConfiguration; + +public final class TMemoryInputTransport extends TEndpointTransport { private byte[] buf_; private int pos_; private int endPos_; - public TMemoryInputTransport() { + public TMemoryInputTransport() throws TTransportException { + this(new TConfiguration()); + } + + public TMemoryInputTransport(TConfiguration _configuration) throws TTransportException { + this(_configuration, new byte[0]); + } + + public TMemoryInputTransport(byte[] buf) throws TTransportException { + this(new TConfiguration(), buf); } - public TMemoryInputTransport(byte[] buf) { - reset(buf); + public TMemoryInputTransport(TConfiguration _configuration, byte[] buf) throws TTransportException { + this(_configuration, buf, 0, buf.length); } - public TMemoryInputTransport(byte[] buf, int offset, int length) { + public TMemoryInputTransport(byte[] buf, int offset, int length) throws TTransportException { + this(new TConfiguration(), buf, offset, length); + } + + public TMemoryInputTransport(TConfiguration _configuration, byte[] buf, int offset, int length) throws TTransportException { + super(_configuration); reset(buf, offset, length); + updateKnownMessageSize(length); } public void reset(byte[] buf) { @@ -43,10 +60,20 @@ public final class TMemoryInputTransport extends TTransport { buf_ = buf; pos_ = offset; endPos_ = offset + length; + try { + resetConsumedMessageSize(-1); + } catch (TTransportException e) { + // ignore + } } public void clear() { buf_ = null; + try { + resetConsumedMessageSize(-1); + } catch (TTransportException e) { + // ignore + } } @Override @@ -67,6 +94,7 @@ public final class TMemoryInputTransport extends TTransport { if (amtToRead > 0) { System.arraycopy(buf_, pos_, buf, off, amtToRead); consumeBuffer(amtToRead); + countConsumedMessageBytes(amtToRead); } return amtToRead; } diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java index f41bc09c8..0172ca816 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java @@ -22,18 +22,28 @@ package org.apache.thrift.transport; import java.nio.ByteBuffer; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; /** * In memory transport with separate buffers for input and output. */ -public class TMemoryTransport extends TTransport { +public class TMemoryTransport extends TEndpointTransport { private final ByteBuffer inputBuffer; private final TByteArrayOutputStream outputBuffer; - public TMemoryTransport(byte[] input) { + public TMemoryTransport(byte[] input) throws TTransportException { + super(new TConfiguration()); inputBuffer = ByteBuffer.wrap(input); outputBuffer = new TByteArrayOutputStream(1024); + updateKnownMessageSize(input.length); + } + + public TMemoryTransport(TConfiguration config, byte[] input) throws TTransportException { + super(config); + inputBuffer = ByteBuffer.wrap(input); + outputBuffer = new TByteArrayOutputStream(1024); + updateKnownMessageSize(input.length); } @Override @@ -56,6 +66,7 @@ public class TMemoryTransport extends TTransport { @Override public int read(byte[] buf, int off, int len) throws TTransportException { + checkReadBytesAvailable(len); int remaining = inputBuffer.remaining(); if (remaining < len) { throw new TTransportException(TTransportException.END_OF_FILE, diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java index 37a66d614..76ed02cbb 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -30,6 +30,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public class TNonblockingSocket extends TNonblockingTransport { private final SocketChannel socketChannel_; - public TNonblockingSocket(String host, int port) throws IOException { + public TNonblockingSocket(String host, int port) throws IOException, TTransportException { this(host, port, 0); } @@ -57,7 +58,7 @@ public class TNonblockingSocket extends TNonblockingTransport { * @param port * @throws IOException */ - public TNonblockingSocket(String host, int port, int timeout) throws IOException { + public TNonblockingSocket(String host, int port, int timeout) throws IOException, TTransportException { this(SocketChannel.open(), timeout, new InetSocketAddress(host, port)); } @@ -67,13 +68,19 @@ public class TNonblockingSocket extends TNonblockingTransport { * @param socketChannel Already created SocketChannel object * @throws IOException if there is an error setting up the streams */ - public TNonblockingSocket(SocketChannel socketChannel) throws IOException { + public TNonblockingSocket(SocketChannel socketChannel) throws IOException, TTransportException { this(socketChannel, 0, null); if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected"); } private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress) - throws IOException { + throws IOException, TTransportException { + this(new TConfiguration(), socketChannel, timeout, socketAddress); + } + + private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel, int timeout, SocketAddress socketAddress) + throws IOException, TTransportException { + super(config); socketChannel_ = socketChannel; socketAddress_ = socketAddress; diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java index 43c130688..255595d6c 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -19,13 +19,19 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -public abstract class TNonblockingTransport extends TTransport { +public abstract class TNonblockingTransport extends TEndpointTransport { + + public TNonblockingTransport(TConfiguration config) throws TTransportException { + super(config); + } /** * Non-blocking connection initialization. diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java index 570f53373..3389e4d2a 100644 --- a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; * TSocket and TServerSocket */ public class TSSLTransportFactory { - + private static final Logger LOGGER = LoggerFactory.getLogger(TSSLTransportFactory.class); @@ -350,7 +350,7 @@ public class TSSLTransportFactory { } isKeyStoreSet = true; } - + /** * Set the keystore, password, certificate type and the store type * @@ -363,7 +363,7 @@ public class TSSLTransportFactory { this.keyStoreStream = keyStoreStream; setKeyStore("", keyPass, keyManagerType, keyStoreType); } - + /** * Set the keystore and password * @@ -373,7 +373,7 @@ public class TSSLTransportFactory { public void setKeyStore(String keyStore, String keyPass) { setKeyStore(keyStore, keyPass, null, null); } - + /** * Set the keystore and password * @@ -383,7 +383,7 @@ public class TSSLTransportFactory { public void setKeyStore(InputStream keyStoreStream, String keyPass) { setKeyStore(keyStoreStream, keyPass, null, null); } - + /** * Set the truststore, password, certificate type and the store type * @@ -403,7 +403,7 @@ public class TSSLTransportFactory { } isTrustStoreSet = true; } - + /** * Set the truststore, password, certificate type and the store type * @@ -426,7 +426,7 @@ public class TSSLTransportFactory { public void setTrustStore(String trustStore, String trustPass) { setTrustStore(trustStore, trustPass, null, null); } - + /** * Set the truststore and password * diff --git a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java index 5fc7cff9b..e5ca41831 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java @@ -47,14 +47,14 @@ public class TSaslClientTransport extends TSaslTransport { /** * Uses the given <code>SaslClient</code>. - * + * * @param saslClient * The <code>SaslClient</code> to use for the subsequent SASL * negotiation. * @param transport * Transport underlying this one. */ - public TSaslClientTransport(SaslClient saslClient, TTransport transport) { + public TSaslClientTransport(SaslClient saslClient, TTransport transport) throws TTransportException { super(saslClient, transport); mechanism = saslClient.getMechanismName(); } @@ -63,14 +63,14 @@ public class TSaslClientTransport extends TSaslTransport { * Creates a <code>SaslClient</code> using the given SASL-specific parameters. * See the Java documentation for <code>Sasl.createSaslClient</code> for the * details of the parameters. - * + * * @param transport * The underlying Thrift transport. * @throws SaslException */ public TSaslClientTransport(String mechanism, String authorizationId, String protocol, String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport) - throws SaslException { + throws SaslException, TTransportException { super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName, props, cbh), transport); this.mechanism = mechanism; diff --git a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java index 31f309ef8..9111712a4 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java @@ -58,7 +58,7 @@ public class TSaslServerTransport extends TSaslTransport { * @param transport * Transport underlying this one. */ - public TSaslServerTransport(TTransport transport) { + public TSaslServerTransport(TTransport transport) throws TTransportException { super(transport); } @@ -71,12 +71,12 @@ public class TSaslServerTransport extends TSaslTransport { * The underlying Thrift transport. */ public TSaslServerTransport(String mechanism, String protocol, String serverName, - Map<String, String> props, CallbackHandler cbh, TTransport transport) { + Map<String, String> props, CallbackHandler cbh, TTransport transport) throws TTransportException { super(transport); addServerDefinition(mechanism, protocol, serverName, props, cbh); } - private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) { + private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) throws TTransportException { super(transport); this.serverDefinitionMap.putAll(serverDefinitionMap); } @@ -190,7 +190,7 @@ public class TSaslServerTransport extends TSaslTransport { * receives the same <code>TSaslServerTransport</code>. */ @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { WeakReference<TSaslServerTransport> ret = transportMap.get(base); if (ret == null || ret.get() == null) { LOGGER.debug("transport map does not contain key", base); diff --git a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java index d1a3d3115..b106c7004 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java @@ -20,6 +20,7 @@ package org.apache.thrift.transport; import java.nio.charset.StandardCharsets; +import java.util.Objects; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -28,6 +29,8 @@ import javax.security.sasl.SaslServer; import org.apache.thrift.EncodingUtils; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.sasl.NegotiationStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +39,7 @@ import org.slf4j.LoggerFactory; * A superclass for SASL client/server thrift transports. A subclass need only * implement the <code>open</code> method. */ -abstract class TSaslTransport extends TTransport { +abstract class TSaslTransport extends TEndpointTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class); @@ -83,7 +86,8 @@ abstract class TSaslTransport extends TTransport { * @param underlyingTransport * The thrift transport which this transport is wrapping. */ - protected TSaslTransport(TTransport underlyingTransport) { + protected TSaslTransport(TTransport underlyingTransport) throws TTransportException { + super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration()); this.underlyingTransport = underlyingTransport; } @@ -96,7 +100,8 @@ abstract class TSaslTransport extends TTransport { * @param underlyingTransport * The thrift transport which this transport is wrapping. */ - protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) { + protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) throws TTransportException { + super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration()); sasl = new SaslParticipant(saslClient); this.underlyingTransport = underlyingTransport; } @@ -151,7 +156,7 @@ abstract class TSaslTransport extends TTransport { } int payloadBytes = EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES); - if (payloadBytes < 0 || payloadBytes > 104857600 /* 100 MB */) { + if (payloadBytes < 0 || payloadBytes > getConfiguration().getMaxMessageSize() /* 100 MB */) { throw sendAndThrowMessage( NegotiationStatus.ERROR, "Invalid payload header length: " + payloadBytes); } diff --git a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java index 42102d9e8..c1bbd4853 100644 --- a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java @@ -18,6 +18,8 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.io.RandomAccessFile; @@ -25,26 +27,43 @@ import java.io.RandomAccessFile; /** * Basic file support for the TTransport interface */ -public final class TSimpleFileTransport extends TTransport { +public final class TSimpleFileTransport extends TEndpointTransport { + + private RandomAccessFile file = null; + private boolean readable; + private boolean writable; + private String path_; - private RandomAccessFile file = null; - private boolean readable; - private boolean writable; - private String path_; + /** + * Create a transport backed by a simple file + * + * @param path the path to the file to open/create + * @param read true to support read operations + * @param write true to support write operations + * @param openFile true to open the file on construction + * @throws TTransportException if file open fails + */ + public TSimpleFileTransport(String path, boolean read, + boolean write, boolean openFile) + throws TTransportException { + this(new TConfiguration(), path, read, write, openFile); + } /** - * Create a transport backed by a simple file - * + * Create a transport backed by a simple file + * + * @param config * @param path the path to the file to open/create * @param read true to support read operations * @param write true to support write operations * @param openFile true to open the file on construction * @throws TTransportException if file open fails */ - public TSimpleFileTransport(String path, boolean read, + public TSimpleFileTransport(TConfiguration config, String path, boolean read, boolean write, boolean openFile) throws TTransportException { + super(config); if (path.length() <= 0) { throw new TTransportException("No path specified"); } @@ -58,11 +77,11 @@ public final class TSimpleFileTransport extends TTransport { open(); } } - + /** - * Create a transport backed by a simple file + * Create a transport backed by a simple file * Implicitly opens file to conform to C++ behavior. - * + * * @param path the path to the file to open/create * @param read true to support read operations * @param write true to support write operations @@ -72,7 +91,7 @@ public final class TSimpleFileTransport extends TTransport { throws TTransportException { this(path, read, write, true); } - + /** * Create a transport backed by a simple read only disk file (implicitly opens * file) @@ -95,7 +114,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Open file if not previously opened. + * Open file if not previously opened. * * @throws TTransportException if open fails */ @@ -111,7 +130,7 @@ public final class TSimpleFileTransport extends TTransport { } catch (IOException ioe) { file = null; throw new TTransportException(ioe.getMessage()); - } + } } } @@ -131,7 +150,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Read up to len many bytes into buf at offset + * Read up to len many bytes into buf at offset * * @param buf houses bytes read * @param off offset into buff to begin writing to @@ -144,6 +163,7 @@ public final class TSimpleFileTransport extends TTransport { if (!readable) { throw new TTransportException("Read operation on write only file"); } + checkReadBytesAvailable(len); int iBytesRead = 0; try { iBytesRead = file.read(buf, off, len); @@ -155,7 +175,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Write len many bytes from buff starting at offset + * Write len many bytes from buff starting at offset * * @param buf buffer containing bytes to write * @param off offset into buffer to begin writing from @@ -213,4 +233,4 @@ public final class TSimpleFileTransport extends TTransport { throw new TTransportException(ex.getMessage()); } } -}
\ No newline at end of file +} diff --git a/lib/java/src/org/apache/thrift/transport/TSocket.java b/lib/java/src/org/apache/thrift/transport/TSocket.java index b20b32b78..eb73e8e76 100644 --- a/lib/java/src/org/apache/thrift/transport/TSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TSocket.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ public class TSocket extends TIOStreamTransport { * @throws TTransportException if there is an error setting up the streams */ public TSocket(Socket socket) throws TTransportException { + super(new TConfiguration()); socket_ = socket; try { socket_.setSoLinger(false, 0); @@ -93,23 +95,36 @@ public class TSocket extends TIOStreamTransport { * Creates a new unconnected socket that will connect to the given host * on the given port. * + * @param config check config * @param host Remote host * @param port Remote port */ - public TSocket(String host, int port) { - this(host, port, 0); + public TSocket(TConfiguration config, String host, int port) throws TTransportException { + this(config, host, port, 0); } /** * Creates a new unconnected socket that will connect to the given host * on the given port. * + * @param host Remote host + * @param port Remote port + */ + public TSocket(String host, int port) throws TTransportException { + this(new TConfiguration(), host, port, 0); + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param config check config * @param host Remote host * @param port Remote port * @param timeout Socket timeout and connection timeout */ - public TSocket(String host, int port, int timeout) { - this(host, port, timeout, timeout); + public TSocket(TConfiguration config, String host, int port, int timeout) throws TTransportException { + this(config, host, port, timeout, timeout); } /** @@ -117,12 +132,14 @@ public class TSocket extends TIOStreamTransport { * on the given port, with a specific connection timeout and a * specific socket timeout. * + * @param config check config * @param host Remote host * @param port Remote port * @param socketTimeout Socket timeout * @param connectTimeout Connection timeout */ - public TSocket(String host, int port, int socketTimeout, int connectTimeout) { + public TSocket(TConfiguration config, String host, int port, int socketTimeout, int connectTimeout) throws TTransportException { + super(config); host_ = host; port_ = port; socketTimeout_ = socketTimeout; diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java index 73ad730ce..5645f7fa1 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -19,6 +19,8 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.Closeable; /** @@ -160,4 +162,10 @@ public abstract class TTransport implements Closeable { * @param len */ public void consumeBuffer(int len) {} + + public abstract TConfiguration getConfiguration(); + + public abstract void updateKnownMessageSize(long size) throws TTransportException; + + public abstract void checkReadBytesAvailable(long numBytes) throws TTransportException; } diff --git a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java index 3e71630ae..e068b4beb 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java @@ -34,7 +34,7 @@ public class TTransportFactory { * @param trans The base transport * @return Wrapped Transport */ - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return trans; } diff --git a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java index e755aa532..73b21aa3f 100644 --- a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java @@ -18,9 +18,12 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Objects; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; @@ -38,7 +41,7 @@ public class TZlibTransport extends TIOStreamTransport { } @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { return new TZlibTransport(base); } } @@ -47,7 +50,7 @@ public class TZlibTransport extends TIOStreamTransport { * Constructs a new TZlibTransport instance. * @param transport the underlying transport to read from and write to */ - public TZlibTransport(TTransport transport) { + public TZlibTransport(TTransport transport) throws TTransportException { this(transport, Deflater.BEST_COMPRESSION); } @@ -56,7 +59,8 @@ public class TZlibTransport extends TIOStreamTransport { * @param transport the underlying transport to read from and write to * @param compressionLevel 0 for no compression, 9 for maximum compression */ - public TZlibTransport(TTransport transport, int compressionLevel) { + public TZlibTransport(TTransport transport, int compressionLevel) throws TTransportException { + super(Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration()); transport_ = transport; inputStream_ = new InflaterInputStream(new TTransportInputStream(transport_), new Inflater()); outputStream_ = new DeflaterOutputStream(new TTransportOutputStream(transport_), new Deflater(compressionLevel, false), true); diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java index a1fd2490a..29bf39c14 100644 --- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java @@ -16,7 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.thrift.transport; +package org.apache.thrift.transport.layered; + + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.*; + +import java.util.Objects; /** * This transport is wire compatible with {@link TFramedTransport}, but makes @@ -27,18 +33,18 @@ package org.apache.thrift.transport; * * This implementation is NOT threadsafe. */ -public class TFastFramedTransport extends TTransport { +public class TFastFramedTransport extends TLayeredTransport { public static class Factory extends TTransportFactory { private final int initialCapacity; private final int maxLength; public Factory() { - this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); + this(DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } public Factory(int initialCapacity) { - this(initialCapacity, DEFAULT_MAX_LENGTH); + this(initialCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } public Factory(int initialCapacity, int maxLength) { @@ -47,7 +53,7 @@ public class TFastFramedTransport extends TTransport { } @Override - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return new TFastFramedTransport(trans, initialCapacity, maxLength); @@ -58,12 +64,7 @@ public class TFastFramedTransport extends TTransport { * How big should the default read and write buffers be? */ public static final int DEFAULT_BUF_CAPACITY = 1024; - /** - * How big is the largest allowable frame? Defaults to 16MB. - */ - public static final int DEFAULT_MAX_LENGTH = 16384000; - private final TTransport underlying; private final AutoExpandingBufferWriteTransport writeBuffer; private AutoExpandingBufferReadTransport readBuffer; private final int initialBufferCapacity; @@ -75,8 +76,8 @@ public class TFastFramedTransport extends TTransport { * for initial buffer size and max frame length. * @param underlying Transport that real reads and writes will go through to. */ - public TFastFramedTransport(TTransport underlying) { - this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); + public TFastFramedTransport(TTransport underlying) throws TTransportException { + this(underlying, DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } /** @@ -87,8 +88,8 @@ public class TFastFramedTransport extends TTransport { * In practice, it's not critical to set this unless you know in advance that * your messages are going to be very large. */ - public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) { - this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH); + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) throws TTransportException { + this(underlying, initialBufferCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } /** @@ -102,27 +103,29 @@ public class TFastFramedTransport extends TTransport { * @param maxLength The max frame size you are willing to read. You can use * this parameter to limit how much memory can be allocated. */ - public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) { - this.underlying = underlying; + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) throws TTransportException { + super(underlying); + TConfiguration config = Objects.isNull(underlying.getConfiguration()) ? new TConfiguration() : underlying.getConfiguration(); this.maxLength = maxLength; + config.setMaxFrameSize(maxLength); this.initialBufferCapacity = initialBufferCapacity; - readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); - writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4); + readBuffer = new AutoExpandingBufferReadTransport(config, initialBufferCapacity); + writeBuffer = new AutoExpandingBufferWriteTransport(config, initialBufferCapacity, 4); } @Override public void close() { - underlying.close(); + getInnerTransport().close(); } @Override public boolean isOpen() { - return underlying.isOpen(); + return getInnerTransport().isOpen(); } @Override public void open() throws TTransportException { - underlying.open(); + getInnerTransport().open(); } @Override @@ -139,7 +142,7 @@ public class TFastFramedTransport extends TTransport { } private void readFrame() throws TTransportException { - underlying.readAll(i32buf , 0, 4); + getInnerTransport().readAll(i32buf , 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); if (size < 0) { @@ -147,13 +150,13 @@ public class TFastFramedTransport extends TTransport { throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); } - if (size > maxLength) { + if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) { close(); throw new TTransportException(TTransportException.CORRUPTED_DATA, "Frame size (" + size + ") larger than max length (" + maxLength + ")!"); } - readBuffer.fill(underlying, size); + readBuffer.fill(getInnerTransport(), size); } @Override @@ -169,18 +172,18 @@ public class TFastFramedTransport extends TTransport { /** * Only clears the read buffer! */ - public void clear() { - readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); + public void clear() throws TTransportException { + readBuffer = new AutoExpandingBufferReadTransport(getConfiguration(), initialBufferCapacity); } @Override public void flush() throws TTransportException { - int payloadLength = writeBuffer.getLength() - 4; + int payloadLength = writeBuffer.getLength() - 4; byte[] data = writeBuffer.getBuf().array(); TFramedTransport.encodeFrameSize(payloadLength, data); - underlying.write(data, 0, payloadLength + 4); + getInnerTransport().write(data, 0, payloadLength + 4); writeBuffer.reset(); - underlying.flush(); + getInnerTransport().flush(); } @Override diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java index a006c3a6a..10a9a1c17 100644 --- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java @@ -17,24 +17,22 @@ * under the License. */ -package org.apache.thrift.transport; +package org.apache.thrift.transport.layered; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +import java.util.Objects; /** * TFramedTransport is a buffered TTransport that ensures a fully read message * every time by preceding messages with a 4-byte frame size. */ -public class TFramedTransport extends TTransport { - - protected static final int DEFAULT_MAX_LENGTH = 16384000; - - private int maxLength_; - - /** - * Underlying transport - */ - private TTransport transport_ = null; +public class TFramedTransport extends TLayeredTransport { /** * Buffer for output @@ -45,14 +43,13 @@ public class TFramedTransport extends TTransport { /** * Buffer for input */ - private final TMemoryInputTransport readBuffer_ = - new TMemoryInputTransport(new byte[0]); + private final TMemoryInputTransport readBuffer_; public static class Factory extends TTransportFactory { private int maxLength_; public Factory() { - maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; + maxLength_ = TConfiguration.DEFAULT_MAX_FRAME_SIZE; } public Factory(int maxLength) { @@ -60,7 +57,7 @@ public class TFramedTransport extends TTransport { } @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { return new TFramedTransport(base, maxLength_); } } @@ -75,28 +72,28 @@ public class TFramedTransport extends TTransport { /** * Constructor wraps around another transport */ - public TFramedTransport(TTransport transport, int maxLength) { - transport_ = transport; - maxLength_ = maxLength; + public TFramedTransport(TTransport transport, int maxLength) throws TTransportException { + super(transport); + TConfiguration _configuration = Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration(); + _configuration.setMaxFrameSize(maxLength); writeBuffer_.write(sizeFiller_, 0, 4); + readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]); } - public TFramedTransport(TTransport transport) { - transport_ = transport; - maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; - writeBuffer_.write(sizeFiller_, 0, 4); + public TFramedTransport(TTransport transport) throws TTransportException { + this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } public void open() throws TTransportException { - transport_.open(); + getInnerTransport().open(); } public boolean isOpen() { - return transport_.isOpen(); + return getInnerTransport().isOpen(); } public void close() { - transport_.close(); + getInnerTransport().close(); } public int read(byte[] buf, int off, int len) throws TTransportException { @@ -138,7 +135,7 @@ public class TFramedTransport extends TTransport { private final byte[] i32buf = new byte[4]; private void readFrame() throws TTransportException { - transport_.readAll(i32buf, 0, 4); + getInnerTransport().readAll(i32buf, 0, 4); int size = decodeFrameSize(i32buf); if (size < 0) { @@ -146,14 +143,14 @@ public class TFramedTransport extends TTransport { throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); } - if (size > maxLength_) { + if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) { close(); throw new TTransportException(TTransportException.CORRUPTED_DATA, - "Frame size (" + size + ") larger than max length (" + maxLength_ + ")!"); + "Frame size (" + size + ") larger than max length (" + getInnerTransport().getConfiguration().getMaxFrameSize() + ")!"); } byte[] buff = new byte[size]; - transport_.readAll(buff, 0, size); + getInnerTransport().readAll(buff, 0, size); readBuffer_.reset(buff); } @@ -169,8 +166,8 @@ public class TFramedTransport extends TTransport { writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data encodeFrameSize(len, buf); // this is the frame length without the filler - transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data - transport_.flush(); + getInnerTransport().write(buf, 0, len + 4); // we have to write the frame size and frame data + getInnerTransport().flush(); } public static final void encodeFrameSize(final int frameSize, final byte[] buf) { diff --git a/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java new file mode 100644 index 000000000..69ec824ee --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport.layered; + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import java.util.Objects; + +public abstract class TLayeredTransport extends TTransport{ + + private TTransport innerTransport; + + public TConfiguration getConfiguration() { + return innerTransport.getConfiguration(); + } + + public TLayeredTransport(TTransport transport) + { + Objects.requireNonNull(transport, "TTransport cannot be null."); + innerTransport = transport; + } + + public void updateKnownMessageSize(long size) throws TTransportException { + innerTransport.updateKnownMessageSize(size); + } + + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + innerTransport.checkReadBytesAvailable(numBytes); + } + + public TTransport getInnerTransport() { + return innerTransport; + } +} |