summaryrefslogtreecommitdiff
path: root/lib/java/src/org/apache/thrift/transport
diff options
context:
space:
mode:
authorZezeng Wang <51382517@qq.com>2020-09-18 16:05:44 +0800
committerGitHub <noreply@github.com>2020-09-18 16:05:44 +0800
commit119030848c4296ddef43d66ffa0cca1354fb357b (patch)
tree5861c617491cf94b83b48f9f4412dd7a1aa6622f /lib/java/src/org/apache/thrift/transport
parentc77941c60da01f466827ff619d571055ff76351f (diff)
parent63213c17ad3fece91fdaaca8f59165ca3f41c5c1 (diff)
downloadthrift-119030848c4296ddef43d66ffa0cca1354fb357b.tar.gz
Merge pull request #2191 from zeshuai007/Implements_TConfig
THRIFT-5237 Implement MAX_MESSAGE_SIZE and consolidate limits into a TConfiguration class(JAVA)
Diffstat (limited to 'lib/java/src/org/apache/thrift/transport')
-rw-r--r--lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java2
-rw-r--r--lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java7
-rw-r--r--lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java7
-rw-r--r--lib/java/src/org/apache/thrift/transport/TByteBuffer.java11
-rw-r--r--lib/java/src/org/apache/thrift/transport/TEndpointTransport.java100
-rw-r--r--lib/java/src/org/apache/thrift/transport/TFileTransport.java79
-rw-r--r--lib/java/src/org/apache/thrift/transport/THttpClient.java80
-rw-r--r--lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java61
-rw-r--r--lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java26
-rw-r--r--lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java38
-rw-r--r--lib/java/src/org/apache/thrift/transport/TMemoryTransport.java15
-rw-r--r--lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java15
-rw-r--r--lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java8
-rw-r--r--lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java14
-rw-r--r--lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java8
-rw-r--r--lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java8
-rw-r--r--lib/java/src/org/apache/thrift/transport/TSaslTransport.java13
-rw-r--r--lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java54
-rw-r--r--lib/java/src/org/apache/thrift/transport/TSocket.java27
-rw-r--r--lib/java/src/org/apache/thrift/transport/TTransport.java8
-rw-r--r--lib/java/src/org/apache/thrift/transport/TTransportFactory.java2
-rw-r--r--lib/java/src/org/apache/thrift/transport/TZlibTransport.java10
-rw-r--r--lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java (renamed from lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java)61
-rw-r--r--lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java (renamed from lib/java/src/org/apache/thrift/transport/TFramedTransport.java)59
-rw-r--r--lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java52
25 files changed, 578 insertions, 187 deletions
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
index fc3aa92df..b355d11ca 100644
--- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
@@ -27,7 +27,7 @@ import java.util.Arrays;
* rate slightly faster than the requested capacity with the (untested)
* objective of avoiding expensive buffer allocations and copies.
*/
-class AutoExpandingBuffer {
+public class AutoExpandingBuffer {
private byte[] array;
public AutoExpandingBuffer(int initialCapacity) {
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
index d06fec78c..6fd4075b9 100644
--- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
@@ -18,17 +18,20 @@
*/
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
/**
* TTransport for reading from an AutoExpandingBuffer.
*/
-public class AutoExpandingBufferReadTransport extends TTransport {
+public class AutoExpandingBufferReadTransport extends TEndpointTransport {
private final AutoExpandingBuffer buf;
private int pos = 0;
private int limit = 0;
- public AutoExpandingBufferReadTransport(int initialCapacity) {
+ public AutoExpandingBufferReadTransport(TConfiguration config, int initialCapacity) throws TTransportException {
+ super(config);
this.buf = new AutoExpandingBuffer(initialCapacity);
}
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
index ec7e7d45a..25f974a73 100644
--- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
@@ -18,10 +18,12 @@
*/
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
/**
* TTransport for writing to an AutoExpandingBuffer.
*/
-public final class AutoExpandingBufferWriteTransport extends TTransport {
+public final class AutoExpandingBufferWriteTransport extends TEndpointTransport {
private final AutoExpandingBuffer buf;
private int pos;
@@ -38,7 +40,8 @@ public final class AutoExpandingBufferWriteTransport extends TTransport {
* @throws IllegalArgumentException if frontReserve is less than zero
* @throws IllegalArgumentException if frontReserve is greater than initialCapacity
*/
- public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) {
+ public AutoExpandingBufferWriteTransport(TConfiguration config, int initialCapacity, int frontReserve) throws TTransportException {
+ super(config);
if (initialCapacity < 1) {
throw new IllegalArgumentException("initialCapacity");
}
diff --git a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java
index b6b065748..c792f3b1c 100644
--- a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java
+++ b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java
@@ -1,5 +1,7 @@
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
@@ -7,14 +9,16 @@ import java.nio.ByteBuffer;
/**
* ByteBuffer-backed implementation of TTransport.
*/
-public final class TByteBuffer extends TTransport {
+public final class TByteBuffer extends TEndpointTransport {
private final ByteBuffer byteBuffer;
/**
* Creates a new TByteBuffer wrapping a given NIO ByteBuffer.
*/
- public TByteBuffer(ByteBuffer byteBuffer) {
+ public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException {
+ super(new TConfiguration());
this.byteBuffer = byteBuffer;
+ updateKnownMessageSize(byteBuffer.capacity());
}
@Override
@@ -32,6 +36,9 @@ public final class TByteBuffer extends TTransport {
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
+ //
+ checkReadBytesAvailable(len);
+
final int n = Math.min(byteBuffer.remaining(), len);
if (n > 0) {
try {
diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
new file mode 100644
index 000000000..f32efae90
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.thrift.TConfiguration;
+
+import java.util.Objects;
+
+public abstract class TEndpointTransport extends TTransport{
+
+ protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); }
+
+ protected long knownMessageSize;
+ protected long remainingMessageSize;
+
+ private TConfiguration _configuration;
+ public TConfiguration getConfiguration() {
+ return _configuration;
+ }
+
+ public TEndpointTransport( TConfiguration config) throws TTransportException {
+ _configuration = Objects.isNull(config) ? new TConfiguration() : config;
+
+ resetConsumedMessageSize(-1);
+ }
+
+ /**
+ * Resets RemainingMessageSize to the configured maximum
+ * @param newSize
+ */
+ protected void resetConsumedMessageSize(long newSize) throws TTransportException {
+ // full reset
+ if (newSize < 0)
+ {
+ knownMessageSize = getMaxMessageSize();
+ remainingMessageSize = getMaxMessageSize();
+ return;
+ }
+
+ // update only: message size can shrink, but not grow
+ if (newSize > knownMessageSize)
+ throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached");
+
+ knownMessageSize = newSize;
+ remainingMessageSize = newSize;
+ }
+
+ /**
+ * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
+ * Will throw if we already consumed too many bytes or if the new size is larger than allowed.
+ * @param size
+ */
+ public void updateKnownMessageSize(long size) throws TTransportException {
+ long consumed = knownMessageSize - remainingMessageSize;
+ resetConsumedMessageSize(size == 0 ? -1 : size);
+ countConsumedMessageBytes(consumed);
+ }
+
+ /**
+ * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
+ * @param numBytes
+ */
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {
+ if (remainingMessageSize < numBytes)
+ throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached");
+ }
+
+ /**
+ * Consumes numBytes from the RemainingMessageSize.
+ * @param numBytes
+ */
+ protected void countConsumedMessageBytes(long numBytes) throws TTransportException {
+ if (remainingMessageSize >= numBytes)
+ {
+ remainingMessageSize -= numBytes;
+ }
+ else
+ {
+ remainingMessageSize = 0;
+ throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached");
+ }
+ }
+
+}
diff --git a/lib/java/src/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/org/apache/thrift/transport/TFileTransport.java
index 88b73e54d..85f97084c 100644
--- a/lib/java/src/org/apache/thrift/transport/TFileTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFileTransport.java
@@ -26,13 +26,14 @@ import java.io.OutputStream;
import java.io.IOException;
import java.util.Random;
+import org.apache.thrift.TConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* FileTransport implementation of the TTransport interface.
* Currently this is a straightforward port of the cpp implementation
- *
+ *
* It may make better sense to provide a basic stream access on top of the framed file format
* The FileTransport can then be a user of this framed file format with some additional logic
* for chunking.
@@ -44,7 +45,7 @@ public class TFileTransport extends TTransport {
public static class TruncableBufferedInputStream extends BufferedInputStream {
public void trunc() {
pos = count = 0;
- }
+ }
public TruncableBufferedInputStream(InputStream in) {
super(in);
}
@@ -62,7 +63,7 @@ public class TFileTransport extends TTransport {
/**
* Initialize an event. Initially, it has no valid contents
*
- * @param buf byte array buffer to store event
+ * @param buf byte array buffer to store event
*/
public Event(byte[] buf) {
buf_ = buf;
@@ -88,9 +89,9 @@ public class TFileTransport extends TTransport {
return(ndesired);
}
- };
+ }
- public static class ChunkState {
+ public static class ChunkState {
/**
* Chunk Size. Must be same across all implementations
*/
@@ -111,7 +112,7 @@ public class TFileTransport extends TTransport {
public long getOffset() { return (offset_);}
}
- public static enum TailPolicy {
+ public enum TailPolicy {
NOWAIT(0, 0),
WAIT_FOREVER(500, -1);
@@ -148,13 +149,13 @@ public class TFileTransport extends TTransport {
TailPolicy currentPolicy_ = TailPolicy.NOWAIT;
- /**
+ /**
* Underlying file being read
*/
protected TSeekableFile inputFile_ = null;
- /**
- * Underlying outputStream
+ /**
+ * Underlying outputStream
*/
protected OutputStream outputStream_ = null;
@@ -181,7 +182,7 @@ public class TFileTransport extends TTransport {
/**
* Get File Tailing Policy
- *
+ *
* @return current read policy
*/
public TailPolicy getTailPolicy() {
@@ -190,7 +191,7 @@ public class TFileTransport extends TTransport {
/**
* Set file Tailing Policy
- *
+ *
* @param policy New policy to set
* @return Old policy
*/
@@ -203,7 +204,7 @@ public class TFileTransport extends TTransport {
/**
* Initialize read input stream
- *
+ *
* @return input stream to read from file
*/
private InputStream createInputStream() throws TTransportException {
@@ -223,7 +224,7 @@ public class TFileTransport extends TTransport {
/**
* Read (potentially tailing) an input stream
- *
+ *
* @param is InputStream to read from
* @param buf Buffer to read into
* @param off Offset in buffer to read into
@@ -232,7 +233,7 @@ public class TFileTransport extends TTransport {
*
* @return number of bytes read
*/
- private int tailRead(InputStream is, byte[] buf,
+ private int tailRead(InputStream is, byte[] buf,
int off, int len, TailPolicy tp) throws TTransportException {
int orig_len = len;
try {
@@ -322,7 +323,7 @@ public class TFileTransport extends TTransport {
// check if event is corrupted and do recovery as required
if(esize > cs.getRemaining()) {
throw new TTransportException("FileTransport error: bad event size");
- /*
+ /*
if(performRecovery()) {
esize=0;
} else {
@@ -361,7 +362,7 @@ public class TFileTransport extends TTransport {
* Files are not opened in ctor - but in explicit open call
*/
public void open() throws TTransportException {
- if (isOpen())
+ if (isOpen())
throw new TTransportException(TTransportException.ALREADY_OPEN);
try {
@@ -406,7 +407,7 @@ public class TFileTransport extends TTransport {
*
* @param path File path to read and write from
* @param readOnly Whether this is a read-only transport
- */
+ */
public TFileTransport(final String path, boolean readOnly) throws IOException {
inputFile_ = new TStandardFile(path);
readOnly_ = readOnly;
@@ -457,8 +458,8 @@ public class TFileTransport extends TTransport {
* @throws TTransportException if there was an error reading data
*/
public int read(byte[] buf, int off, int len) throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
"Must open before reading");
if(currentEvent_.getRemaining() == 0) {
@@ -471,14 +472,14 @@ public class TFileTransport extends TTransport {
}
public int getNumChunks() throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
"Must open before getNumChunks");
try {
long len = inputFile_.length();
if(len == 0)
return 0;
- else
+ else
return (((int)(len/cs.getChunkSize())) + 1);
} catch (IOException iox) {
@@ -487,8 +488,8 @@ public class TFileTransport extends TTransport {
}
public int getCurChunk() throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
"Must open before getCurChunk");
return (cs.getChunkNum());
@@ -496,8 +497,8 @@ public class TFileTransport extends TTransport {
public void seekToChunk(int chunk) throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
"Must open before seeking");
int numChunks = getNumChunks();
@@ -527,7 +528,7 @@ public class TFileTransport extends TTransport {
}
if(chunk*cs.getChunkSize() != cs.getOffset()) {
- try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
+ try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
catch (IOException iox) {
throw new TTransportException("Seek to chunk " +
chunk + " " +iox.getMessage(), iox);
@@ -549,8 +550,8 @@ public class TFileTransport extends TTransport {
}
public void seekToEnd() throws TTransportException {
- if(!isOpen())
- throw new TTransportException(TTransportException.NOT_OPEN,
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
"Must open before seeking");
seekToChunk(getNumChunks());
}
@@ -577,9 +578,25 @@ public class TFileTransport extends TTransport {
throw new TTransportException("Not Supported");
}
+
+ @Override
+ public TConfiguration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public void updateKnownMessageSize(long size) throws TTransportException {
+
+ }
+
+ @Override
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {
+
+ }
+
/**
* test program
- *
+ *
*/
public static void main(String[] args) throws Exception {
@@ -594,7 +611,7 @@ public class TFileTransport extends TTransport {
try {
num_chunks = Integer.parseInt(args[1]);
} catch (Exception e) {
- LOGGER.error("Cannot parse " + args[1]);
+ LOGGER.error("Cannot parse " + args[1]);
printUsage();
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java
index c3063fe43..7d61b5c8e 100644
--- a/lib/java/src/org/apache/thrift/transport/THttpClient.java
+++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java
@@ -37,6 +37,7 @@ import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.params.CoreConnectionPNames;
+import org.apache.thrift.TConfiguration;
/**
* HTTP implementation of the TTransport interface. Used for working with a
@@ -51,7 +52,7 @@ import org.apache.http.params.CoreConnectionPNames;
* HttpClient to THttpClient(String url, HttpClient client) will create an
* instance which will use HttpURLConnection.
*
- * When using HttpClient, the following configuration leads to 5-15%
+ * When using HttpClient, the following configuration leads to 5-15%
* better performance than the HttpURLConnection implementation:
*
* http.protocol.version=HttpVersion.HTTP_1_1
@@ -65,7 +66,7 @@ import org.apache.http.params.CoreConnectionPNames;
* @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a>
*/
-public class THttpClient extends TTransport {
+public class THttpClient extends TEndpointTransport {
private URL url_ = null;
@@ -80,14 +81,14 @@ public class THttpClient extends TTransport {
private Map<String,String> customHeaders_ = null;
private final HttpHost host;
-
+
private final HttpClient client;
-
+
public static class Factory extends TTransportFactory {
-
+
private final String url;
private final HttpClient client;
-
+
public Factory(String url) {
this.url = url;
this.client = null;
@@ -97,14 +98,14 @@ public class THttpClient extends TTransport {
this.url = url;
this.client = client;
}
-
+
@Override
public TTransport getTransport(TTransport trans) {
try {
if (null != client) {
- return new THttpClient(url, client);
+ return new THttpClient(trans.getConfiguration(), url, client);
} else {
- return new THttpClient(url);
+ return new THttpClient(trans.getConfiguration(), url);
}
} catch (TTransportException tte) {
return null;
@@ -112,7 +113,19 @@ public class THttpClient extends TTransport {
}
}
+ public THttpClient(TConfiguration config, String url) throws TTransportException {
+ super(config);
+ try {
+ url_ = new URL(url);
+ this.client = null;
+ this.host = null;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
public THttpClient(String url) throws TTransportException {
+ super(new TConfiguration());
try {
url_ = new URL(url);
this.client = null;
@@ -122,7 +135,19 @@ public class THttpClient extends TTransport {
}
}
+ public THttpClient(TConfiguration config, String url, HttpClient client) throws TTransportException {
+ super(config);
+ try {
+ url_ = new URL(url);
+ this.client = client;
+ this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol());
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
public THttpClient(String url, HttpClient client) throws TTransportException {
+ super(new TConfiguration());
try {
url_ = new URL(url);
this.client = client;
@@ -168,7 +193,6 @@ public class THttpClient extends TTransport {
try {
inputStream_.close();
} catch (IOException ioe) {
- ;
}
inputStream_ = null;
}
@@ -182,11 +206,16 @@ public class THttpClient extends TTransport {
if (inputStream_ == null) {
throw new TTransportException("Response buffer is empty, no request.");
}
+
+ checkReadBytesAvailable(len);
+
try {
int ret = inputStream_.read(buf, off, len);
if (ret == -1) {
throw new TTransportException("No more data available.");
}
+ countConsumedMessageBytes(ret);
+
return ret;
} catch (IOException iox) {
throw new TTransportException(iox);
@@ -214,7 +243,7 @@ public class THttpClient extends TTransport {
}
private void flushUsingHttpClient() throws TTransportException {
-
+
if (null == this.client) {
throw new TTransportException("Null HttpClient, aborting.");
}
@@ -224,22 +253,22 @@ public class THttpClient extends TTransport {
requestBuffer_.reset();
HttpPost post = null;
-
+
InputStream is = null;
-
- try {
+
+ try {
// Set request to path + query string
post = new HttpPost(this.url_.getFile());
-
+
//
// Headers are added to the HttpPost instance, not
// to HttpClient.
//
-
+
post.setHeader("Content-Type", "application/x-thrift");
post.setHeader("Accept", "application/x-thrift");
post.setHeader("User-Agent", "Java/THttpClient/HC");
-
+
if (null != customHeaders_) {
for (Map.Entry<String, String> header : customHeaders_.entrySet()) {
post.setHeader(header.getKey(), header.getValue());
@@ -247,17 +276,17 @@ public class THttpClient extends TTransport {
}
post.setEntity(new ByteArrayEntity(data));
-
+
HttpResponse response = this.client.execute(this.host, post);
int responseCode = response.getStatusLine().getStatusCode();
- //
+ //
// Retrieve the inputstream BEFORE checking the status code so
// resources get freed in the finally clause.
//
is = response.getEntity().getContent();
-
+
if (responseCode != HttpStatus.SC_OK) {
throw new TTransportException("HTTP Response code: " + responseCode);
}
@@ -268,10 +297,10 @@ public class THttpClient extends TTransport {
// thrift struct is being read up the chain).
// Proceeding differently might lead to exhaustion of connections and thus
// to app failure.
-
+
byte[] buf = new byte[1024];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
+
int len = 0;
do {
len = is.read(buf);
@@ -279,7 +308,7 @@ public class THttpClient extends TTransport {
baos.write(buf, 0, len);
}
} while (-1 != len);
-
+
try {
// Indicate we're done with the content.
consume(response.getEntity());
@@ -287,7 +316,7 @@ public class THttpClient extends TTransport {
// We ignore this exception, it might only mean the server has no
// keep-alive capability.
}
-
+
inputStream_ = new ByteArrayInputStream(baos.toByteArray());
} catch (IOException ioe) {
// Abort method so the connection gets released back to the connection manager
@@ -296,6 +325,7 @@ public class THttpClient extends TTransport {
}
throw new TTransportException(ioe);
} finally {
+ resetConsumedMessageSize(-1);
if (null != is) {
// Close the entity's input stream, this will release the underlying connection
try {
@@ -357,6 +387,8 @@ public class THttpClient extends TTransport {
} catch (IOException iox) {
throw new TTransportException(iox);
+ } finally {
+ resetConsumedMessageSize(-1);
}
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java
index d97d5063e..763e66ad6 100644
--- a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java
@@ -19,6 +19,7 @@
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +34,7 @@ import java.io.OutputStream;
* has to provide a variety of types of streams.
*
*/
-public class TIOStreamTransport extends TTransport {
+public class TIOStreamTransport extends TEndpointTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName());
@@ -47,33 +48,80 @@ public class TIOStreamTransport extends TTransport {
* Subclasses can invoke the default constructor and then assign the input
* streams in the open method.
*/
- protected TIOStreamTransport() {}
+ protected TIOStreamTransport(TConfiguration config) throws TTransportException {
+ super(config);
+ }
+
+ /**
+ * Subclasses can invoke the default constructor and then assign the input
+ * streams in the open method.
+ */
+ protected TIOStreamTransport() throws TTransportException {
+ super(new TConfiguration());
+ }
/**
* Input stream constructor, constructs an input only transport.
*
+ * @param config
* @param is Input stream to read from
*/
- public TIOStreamTransport(InputStream is) {
+ public TIOStreamTransport(TConfiguration config, InputStream is) throws TTransportException {
+ super(config);
inputStream_ = is;
}
+ /**
+ * Input stream constructor, constructs an input only transport.
+ *
+ * @param is Input stream to read from
+ */
+ public TIOStreamTransport(InputStream is) throws TTransportException {
+ super(new TConfiguration());
+ inputStream_ = is;
+ }
+
+ /**
+ * Output stream constructor, constructs an output only transport.
+ *
+ * @param config
+ * @param os Output stream to write to
+ */
+ public TIOStreamTransport(TConfiguration config, OutputStream os) throws TTransportException {
+ super(config);
+ outputStream_ = os;
+ }
/**
* Output stream constructor, constructs an output only transport.
*
* @param os Output stream to write to
*/
- public TIOStreamTransport(OutputStream os) {
+ public TIOStreamTransport(OutputStream os) throws TTransportException {
+ super(new TConfiguration());
outputStream_ = os;
}
/**
* Two-way stream constructor.
*
+ * @param config
* @param is Input stream to read from
* @param os Output stream to read from
*/
- public TIOStreamTransport(InputStream is, OutputStream os) {
+ public TIOStreamTransport(TConfiguration config, InputStream is, OutputStream os) throws TTransportException {
+ super(config);
+ inputStream_ = is;
+ outputStream_ = os;
+ }
+
+ /**
+ * Two-way stream constructor.
+ *
+ * @param is Input stream to read from
+ * @param os Output stream to read from
+ */
+ public TIOStreamTransport(InputStream is, OutputStream os) throws TTransportException {
+ super(new TConfiguration());
inputStream_ = is;
outputStream_ = os;
}
@@ -158,6 +206,9 @@ public class TIOStreamTransport extends TTransport {
}
try {
outputStream_.flush();
+
+ resetConsumedMessageSize(-1);
+
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java
index b19ac86d2..c3a3eb4e1 100644
--- a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java
+++ b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java
@@ -20,21 +20,39 @@
package org.apache.thrift.transport;
import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+
import java.nio.charset.Charset;
/**
* Memory buffer-based implementation of the TTransport interface.
*/
-public class TMemoryBuffer extends TTransport {
+public class TMemoryBuffer extends TEndpointTransport {
+ /**
+ * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
+ * internal buffer will grow as necessary to accommodate the size of the data
+ * being written to it.
+ *
+ * @param size the initial size of the buffer
+ */
+ public TMemoryBuffer(int size) throws TTransportException {
+ super(new TConfiguration());
+ arr_ = new TByteArrayOutputStream(size);
+ updateKnownMessageSize(size);
+ }
+
/**
* Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The
* internal buffer will grow as necessary to accommodate the size of the data
* being written to it.
*
+ * @param config
* @param size the initial size of the buffer
*/
- public TMemoryBuffer(int size) {
+ public TMemoryBuffer(TConfiguration config, int size) throws TTransportException {
+ super(config);
arr_ = new TByteArrayOutputStream(size);
+ updateKnownMessageSize(size);
}
@Override
@@ -53,9 +71,11 @@ public class TMemoryBuffer extends TTransport {
}
@Override
- public int read(byte[] buf, int off, int len) {
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ checkReadBytesAvailable(len);
byte[] src = arr_.get();
int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len);
+
if (amtToRead > 0) {
System.arraycopy(src, pos_, buf, off, amtToRead);
pos_ += amtToRead;
diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java
index 2530dcc36..6cb06fc37 100644
--- a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java
@@ -18,21 +18,38 @@
*/
package org.apache.thrift.transport;
-public final class TMemoryInputTransport extends TTransport {
+import org.apache.thrift.TConfiguration;
+
+public final class TMemoryInputTransport extends TEndpointTransport {
private byte[] buf_;
private int pos_;
private int endPos_;
- public TMemoryInputTransport() {
+ public TMemoryInputTransport() throws TTransportException {
+ this(new TConfiguration());
+ }
+
+ public TMemoryInputTransport(TConfiguration _configuration) throws TTransportException {
+ this(_configuration, new byte[0]);
+ }
+
+ public TMemoryInputTransport(byte[] buf) throws TTransportException {
+ this(new TConfiguration(), buf);
}
- public TMemoryInputTransport(byte[] buf) {
- reset(buf);
+ public TMemoryInputTransport(TConfiguration _configuration, byte[] buf) throws TTransportException {
+ this(_configuration, buf, 0, buf.length);
}
- public TMemoryInputTransport(byte[] buf, int offset, int length) {
+ public TMemoryInputTransport(byte[] buf, int offset, int length) throws TTransportException {
+ this(new TConfiguration(), buf, offset, length);
+ }
+
+ public TMemoryInputTransport(TConfiguration _configuration, byte[] buf, int offset, int length) throws TTransportException {
+ super(_configuration);
reset(buf, offset, length);
+ updateKnownMessageSize(length);
}
public void reset(byte[] buf) {
@@ -43,10 +60,20 @@ public final class TMemoryInputTransport extends TTransport {
buf_ = buf;
pos_ = offset;
endPos_ = offset + length;
+ try {
+ resetConsumedMessageSize(-1);
+ } catch (TTransportException e) {
+ // ignore
+ }
}
public void clear() {
buf_ = null;
+ try {
+ resetConsumedMessageSize(-1);
+ } catch (TTransportException e) {
+ // ignore
+ }
}
@Override
@@ -67,6 +94,7 @@ public final class TMemoryInputTransport extends TTransport {
if (amtToRead > 0) {
System.arraycopy(buf_, pos_, buf, off, amtToRead);
consumeBuffer(amtToRead);
+ countConsumedMessageBytes(amtToRead);
}
return amtToRead;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java
index f41bc09c8..0172ca816 100644
--- a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java
@@ -22,18 +22,28 @@ package org.apache.thrift.transport;
import java.nio.ByteBuffer;
import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
/**
* In memory transport with separate buffers for input and output.
*/
-public class TMemoryTransport extends TTransport {
+public class TMemoryTransport extends TEndpointTransport {
private final ByteBuffer inputBuffer;
private final TByteArrayOutputStream outputBuffer;
- public TMemoryTransport(byte[] input) {
+ public TMemoryTransport(byte[] input) throws TTransportException {
+ super(new TConfiguration());
inputBuffer = ByteBuffer.wrap(input);
outputBuffer = new TByteArrayOutputStream(1024);
+ updateKnownMessageSize(input.length);
+ }
+
+ public TMemoryTransport(TConfiguration config, byte[] input) throws TTransportException {
+ super(config);
+ inputBuffer = ByteBuffer.wrap(input);
+ outputBuffer = new TByteArrayOutputStream(1024);
+ updateKnownMessageSize(input.length);
}
@Override
@@ -56,6 +66,7 @@ public class TMemoryTransport extends TTransport {
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
+ checkReadBytesAvailable(len);
int remaining = inputBuffer.remaining();
if (remaining < len) {
throw new TTransportException(TTransportException.END_OF_FILE,
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 37a66d614..76ed02cbb 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -30,6 +30,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import org.apache.thrift.TConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class TNonblockingSocket extends TNonblockingTransport {
private final SocketChannel socketChannel_;
- public TNonblockingSocket(String host, int port) throws IOException {
+ public TNonblockingSocket(String host, int port) throws IOException, TTransportException {
this(host, port, 0);
}
@@ -57,7 +58,7 @@ public class TNonblockingSocket extends TNonblockingTransport {
* @param port
* @throws IOException
*/
- public TNonblockingSocket(String host, int port, int timeout) throws IOException {
+ public TNonblockingSocket(String host, int port, int timeout) throws IOException, TTransportException {
this(SocketChannel.open(), timeout, new InetSocketAddress(host, port));
}
@@ -67,13 +68,19 @@ public class TNonblockingSocket extends TNonblockingTransport {
* @param socketChannel Already created SocketChannel object
* @throws IOException if there is an error setting up the streams
*/
- public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
+ public TNonblockingSocket(SocketChannel socketChannel) throws IOException, TTransportException {
this(socketChannel, 0, null);
if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
}
private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
- throws IOException {
+ throws IOException, TTransportException {
+ this(new TConfiguration(), socketChannel, timeout, socketAddress);
+ }
+
+ private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel, int timeout, SocketAddress socketAddress)
+ throws IOException, TTransportException {
+ super(config);
socketChannel_ = socketChannel;
socketAddress_ = socketAddress;
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 43c130688..255595d6c 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -19,13 +19,19 @@
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-public abstract class TNonblockingTransport extends TTransport {
+public abstract class TNonblockingTransport extends TEndpointTransport {
+
+ public TNonblockingTransport(TConfiguration config) throws TTransportException {
+ super(config);
+ }
/**
* Non-blocking connection initialization.
diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
index 570f53373..3389e4d2a 100644
--- a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
+++ b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
* TSocket and TServerSocket
*/
public class TSSLTransportFactory {
-
+
private static final Logger LOGGER =
LoggerFactory.getLogger(TSSLTransportFactory.class);
@@ -350,7 +350,7 @@ public class TSSLTransportFactory {
}
isKeyStoreSet = true;
}
-
+
/**
* Set the keystore, password, certificate type and the store type
*
@@ -363,7 +363,7 @@ public class TSSLTransportFactory {
this.keyStoreStream = keyStoreStream;
setKeyStore("", keyPass, keyManagerType, keyStoreType);
}
-
+
/**
* Set the keystore and password
*
@@ -373,7 +373,7 @@ public class TSSLTransportFactory {
public void setKeyStore(String keyStore, String keyPass) {
setKeyStore(keyStore, keyPass, null, null);
}
-
+
/**
* Set the keystore and password
*
@@ -383,7 +383,7 @@ public class TSSLTransportFactory {
public void setKeyStore(InputStream keyStoreStream, String keyPass) {
setKeyStore(keyStoreStream, keyPass, null, null);
}
-
+
/**
* Set the truststore, password, certificate type and the store type
*
@@ -403,7 +403,7 @@ public class TSSLTransportFactory {
}
isTrustStoreSet = true;
}
-
+
/**
* Set the truststore, password, certificate type and the store type
*
@@ -426,7 +426,7 @@ public class TSSLTransportFactory {
public void setTrustStore(String trustStore, String trustPass) {
setTrustStore(trustStore, trustPass, null, null);
}
-
+
/**
* Set the truststore and password
*
diff --git a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java
index 5fc7cff9b..e5ca41831 100644
--- a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java
@@ -47,14 +47,14 @@ public class TSaslClientTransport extends TSaslTransport {
/**
* Uses the given <code>SaslClient</code>.
- *
+ *
* @param saslClient
* The <code>SaslClient</code> to use for the subsequent SASL
* negotiation.
* @param transport
* Transport underlying this one.
*/
- public TSaslClientTransport(SaslClient saslClient, TTransport transport) {
+ public TSaslClientTransport(SaslClient saslClient, TTransport transport) throws TTransportException {
super(saslClient, transport);
mechanism = saslClient.getMechanismName();
}
@@ -63,14 +63,14 @@ public class TSaslClientTransport extends TSaslTransport {
* Creates a <code>SaslClient</code> using the given SASL-specific parameters.
* See the Java documentation for <code>Sasl.createSaslClient</code> for the
* details of the parameters.
- *
+ *
* @param transport
* The underlying Thrift transport.
* @throws SaslException
*/
public TSaslClientTransport(String mechanism, String authorizationId, String protocol,
String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport)
- throws SaslException {
+ throws SaslException, TTransportException {
super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName,
props, cbh), transport);
this.mechanism = mechanism;
diff --git a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java
index 31f309ef8..9111712a4 100644
--- a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java
@@ -58,7 +58,7 @@ public class TSaslServerTransport extends TSaslTransport {
* @param transport
* Transport underlying this one.
*/
- public TSaslServerTransport(TTransport transport) {
+ public TSaslServerTransport(TTransport transport) throws TTransportException {
super(transport);
}
@@ -71,12 +71,12 @@ public class TSaslServerTransport extends TSaslTransport {
* The underlying Thrift transport.
*/
public TSaslServerTransport(String mechanism, String protocol, String serverName,
- Map<String, String> props, CallbackHandler cbh, TTransport transport) {
+ Map<String, String> props, CallbackHandler cbh, TTransport transport) throws TTransportException {
super(transport);
addServerDefinition(mechanism, protocol, serverName, props, cbh);
}
- private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) {
+ private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) throws TTransportException {
super(transport);
this.serverDefinitionMap.putAll(serverDefinitionMap);
}
@@ -190,7 +190,7 @@ public class TSaslServerTransport extends TSaslTransport {
* receives the same <code>TSaslServerTransport</code>.
*/
@Override
- public TTransport getTransport(TTransport base) {
+ public TTransport getTransport(TTransport base) throws TTransportException {
WeakReference<TSaslServerTransport> ret = transportMap.get(base);
if (ret == null || ret.get() == null) {
LOGGER.debug("transport map does not contain key", base);
diff --git a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java
index d1a3d3115..b106c7004 100644
--- a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java
@@ -20,6 +20,7 @@
package org.apache.thrift.transport;
import java.nio.charset.StandardCharsets;
+import java.util.Objects;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
@@ -28,6 +29,8 @@ import javax.security.sasl.SaslServer;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.layered.TFramedTransport;
import org.apache.thrift.transport.sasl.NegotiationStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +39,7 @@ import org.slf4j.LoggerFactory;
* A superclass for SASL client/server thrift transports. A subclass need only
* implement the <code>open</code> method.
*/
-abstract class TSaslTransport extends TTransport {
+abstract class TSaslTransport extends TEndpointTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class);
@@ -83,7 +86,8 @@ abstract class TSaslTransport extends TTransport {
* @param underlyingTransport
* The thrift transport which this transport is wrapping.
*/
- protected TSaslTransport(TTransport underlyingTransport) {
+ protected TSaslTransport(TTransport underlyingTransport) throws TTransportException {
+ super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration());
this.underlyingTransport = underlyingTransport;
}
@@ -96,7 +100,8 @@ abstract class TSaslTransport extends TTransport {
* @param underlyingTransport
* The thrift transport which this transport is wrapping.
*/
- protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) {
+ protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) throws TTransportException {
+ super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration());
sasl = new SaslParticipant(saslClient);
this.underlyingTransport = underlyingTransport;
}
@@ -151,7 +156,7 @@ abstract class TSaslTransport extends TTransport {
}
int payloadBytes = EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES);
- if (payloadBytes < 0 || payloadBytes > 104857600 /* 100 MB */) {
+ if (payloadBytes < 0 || payloadBytes > getConfiguration().getMaxMessageSize() /* 100 MB */) {
throw sendAndThrowMessage(
NegotiationStatus.ERROR, "Invalid payload header length: " + payloadBytes);
}
diff --git a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java
index 42102d9e8..c1bbd4853 100644
--- a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java
@@ -18,6 +18,8 @@
*/
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -25,26 +27,43 @@ import java.io.RandomAccessFile;
/**
* Basic file support for the TTransport interface
*/
-public final class TSimpleFileTransport extends TTransport {
+public final class TSimpleFileTransport extends TEndpointTransport {
+
+ private RandomAccessFile file = null;
+ private boolean readable;
+ private boolean writable;
+ private String path_;
- private RandomAccessFile file = null;
- private boolean readable;
- private boolean writable;
- private String path_;
+ /**
+ * Create a transport backed by a simple file
+ *
+ * @param path the path to the file to open/create
+ * @param read true to support read operations
+ * @param write true to support write operations
+ * @param openFile true to open the file on construction
+ * @throws TTransportException if file open fails
+ */
+ public TSimpleFileTransport(String path, boolean read,
+ boolean write, boolean openFile)
+ throws TTransportException {
+ this(new TConfiguration(), path, read, write, openFile);
+ }
/**
- * Create a transport backed by a simple file
- *
+ * Create a transport backed by a simple file
+ *
+ * @param config
* @param path the path to the file to open/create
* @param read true to support read operations
* @param write true to support write operations
* @param openFile true to open the file on construction
* @throws TTransportException if file open fails
*/
- public TSimpleFileTransport(String path, boolean read,
+ public TSimpleFileTransport(TConfiguration config, String path, boolean read,
boolean write, boolean openFile)
throws TTransportException {
+ super(config);
if (path.length() <= 0) {
throw new TTransportException("No path specified");
}
@@ -58,11 +77,11 @@ public final class TSimpleFileTransport extends TTransport {
open();
}
}
-
+
/**
- * Create a transport backed by a simple file
+ * Create a transport backed by a simple file
* Implicitly opens file to conform to C++ behavior.
- *
+ *
* @param path the path to the file to open/create
* @param read true to support read operations
* @param write true to support write operations
@@ -72,7 +91,7 @@ public final class TSimpleFileTransport extends TTransport {
throws TTransportException {
this(path, read, write, true);
}
-
+
/**
* Create a transport backed by a simple read only disk file (implicitly opens
* file)
@@ -95,7 +114,7 @@ public final class TSimpleFileTransport extends TTransport {
}
/**
- * Open file if not previously opened.
+ * Open file if not previously opened.
*
* @throws TTransportException if open fails
*/
@@ -111,7 +130,7 @@ public final class TSimpleFileTransport extends TTransport {
} catch (IOException ioe) {
file = null;
throw new TTransportException(ioe.getMessage());
- }
+ }
}
}
@@ -131,7 +150,7 @@ public final class TSimpleFileTransport extends TTransport {
}
/**
- * Read up to len many bytes into buf at offset
+ * Read up to len many bytes into buf at offset
*
* @param buf houses bytes read
* @param off offset into buff to begin writing to
@@ -144,6 +163,7 @@ public final class TSimpleFileTransport extends TTransport {
if (!readable) {
throw new TTransportException("Read operation on write only file");
}
+ checkReadBytesAvailable(len);
int iBytesRead = 0;
try {
iBytesRead = file.read(buf, off, len);
@@ -155,7 +175,7 @@ public final class TSimpleFileTransport extends TTransport {
}
/**
- * Write len many bytes from buff starting at offset
+ * Write len many bytes from buff starting at offset
*
* @param buf buffer containing bytes to write
* @param off offset into buffer to begin writing from
@@ -213,4 +233,4 @@ public final class TSimpleFileTransport extends TTransport {
throw new TTransportException(ex.getMessage());
}
}
-} \ No newline at end of file
+}
diff --git a/lib/java/src/org/apache/thrift/transport/TSocket.java b/lib/java/src/org/apache/thrift/transport/TSocket.java
index b20b32b78..eb73e8e76 100644
--- a/lib/java/src/org/apache/thrift/transport/TSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TSocket.java
@@ -19,6 +19,7 @@
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,6 +70,7 @@ public class TSocket extends TIOStreamTransport {
* @throws TTransportException if there is an error setting up the streams
*/
public TSocket(Socket socket) throws TTransportException {
+ super(new TConfiguration());
socket_ = socket;
try {
socket_.setSoLinger(false, 0);
@@ -93,23 +95,36 @@ public class TSocket extends TIOStreamTransport {
* Creates a new unconnected socket that will connect to the given host
* on the given port.
*
+ * @param config check config
* @param host Remote host
* @param port Remote port
*/
- public TSocket(String host, int port) {
- this(host, port, 0);
+ public TSocket(TConfiguration config, String host, int port) throws TTransportException {
+ this(config, host, port, 0);
}
/**
* Creates a new unconnected socket that will connect to the given host
* on the given port.
*
+ * @param host Remote host
+ * @param port Remote port
+ */
+ public TSocket(String host, int port) throws TTransportException {
+ this(new TConfiguration(), host, port, 0);
+ }
+
+ /**
+ * Creates a new unconnected socket that will connect to the given host
+ * on the given port.
+ *
+ * @param config check config
* @param host Remote host
* @param port Remote port
* @param timeout Socket timeout and connection timeout
*/
- public TSocket(String host, int port, int timeout) {
- this(host, port, timeout, timeout);
+ public TSocket(TConfiguration config, String host, int port, int timeout) throws TTransportException {
+ this(config, host, port, timeout, timeout);
}
/**
@@ -117,12 +132,14 @@ public class TSocket extends TIOStreamTransport {
* on the given port, with a specific connection timeout and a
* specific socket timeout.
*
+ * @param config check config
* @param host Remote host
* @param port Remote port
* @param socketTimeout Socket timeout
* @param connectTimeout Connection timeout
*/
- public TSocket(String host, int port, int socketTimeout, int connectTimeout) {
+ public TSocket(TConfiguration config, String host, int port, int socketTimeout, int connectTimeout) throws TTransportException {
+ super(config);
host_ = host;
port_ = port;
socketTimeout_ = socketTimeout;
diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java
index 73ad730ce..5645f7fa1 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransport.java
@@ -19,6 +19,8 @@
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
import java.io.Closeable;
/**
@@ -160,4 +162,10 @@ public abstract class TTransport implements Closeable {
* @param len
*/
public void consumeBuffer(int len) {}
+
+ public abstract TConfiguration getConfiguration();
+
+ public abstract void updateKnownMessageSize(long size) throws TTransportException;
+
+ public abstract void checkReadBytesAvailable(long numBytes) throws TTransportException;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java
index 3e71630ae..e068b4beb 100644
--- a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java
+++ b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java
@@ -34,7 +34,7 @@ public class TTransportFactory {
* @param trans The base transport
* @return Wrapped Transport
*/
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws TTransportException {
return trans;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java
index e755aa532..73b21aa3f 100644
--- a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java
@@ -18,9 +18,12 @@
*/
package org.apache.thrift.transport;
+import org.apache.thrift.TConfiguration;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Objects;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
@@ -38,7 +41,7 @@ public class TZlibTransport extends TIOStreamTransport {
}
@Override
- public TTransport getTransport(TTransport base) {
+ public TTransport getTransport(TTransport base) throws TTransportException {
return new TZlibTransport(base);
}
}
@@ -47,7 +50,7 @@ public class TZlibTransport extends TIOStreamTransport {
* Constructs a new TZlibTransport instance.
* @param transport the underlying transport to read from and write to
*/
- public TZlibTransport(TTransport transport) {
+ public TZlibTransport(TTransport transport) throws TTransportException {
this(transport, Deflater.BEST_COMPRESSION);
}
@@ -56,7 +59,8 @@ public class TZlibTransport extends TIOStreamTransport {
* @param transport the underlying transport to read from and write to
* @param compressionLevel 0 for no compression, 9 for maximum compression
*/
- public TZlibTransport(TTransport transport, int compressionLevel) {
+ public TZlibTransport(TTransport transport, int compressionLevel) throws TTransportException {
+ super(Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration());
transport_ = transport;
inputStream_ = new InflaterInputStream(new TTransportInputStream(transport_), new Inflater());
outputStream_ = new DeflaterOutputStream(new TTransportOutputStream(transport_), new Deflater(compressionLevel, false), true);
diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java
index a1fd2490a..29bf39c14 100644
--- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java
@@ -16,7 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.thrift.transport;
+package org.apache.thrift.transport.layered;
+
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.*;
+
+import java.util.Objects;
/**
* This transport is wire compatible with {@link TFramedTransport}, but makes
@@ -27,18 +33,18 @@ package org.apache.thrift.transport;
*
* This implementation is NOT threadsafe.
*/
-public class TFastFramedTransport extends TTransport {
+public class TFastFramedTransport extends TLayeredTransport {
public static class Factory extends TTransportFactory {
private final int initialCapacity;
private final int maxLength;
public Factory() {
- this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+ this(DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
public Factory(int initialCapacity) {
- this(initialCapacity, DEFAULT_MAX_LENGTH);
+ this(initialCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
public Factory(int initialCapacity, int maxLength) {
@@ -47,7 +53,7 @@ public class TFastFramedTransport extends TTransport {
}
@Override
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws TTransportException {
return new TFastFramedTransport(trans,
initialCapacity,
maxLength);
@@ -58,12 +64,7 @@ public class TFastFramedTransport extends TTransport {
* How big should the default read and write buffers be?
*/
public static final int DEFAULT_BUF_CAPACITY = 1024;
- /**
- * How big is the largest allowable frame? Defaults to 16MB.
- */
- public static final int DEFAULT_MAX_LENGTH = 16384000;
- private final TTransport underlying;
private final AutoExpandingBufferWriteTransport writeBuffer;
private AutoExpandingBufferReadTransport readBuffer;
private final int initialBufferCapacity;
@@ -75,8 +76,8 @@ public class TFastFramedTransport extends TTransport {
* for initial buffer size and max frame length.
* @param underlying Transport that real reads and writes will go through to.
*/
- public TFastFramedTransport(TTransport underlying) {
- this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+ public TFastFramedTransport(TTransport underlying) throws TTransportException {
+ this(underlying, DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
/**
@@ -87,8 +88,8 @@ public class TFastFramedTransport extends TTransport {
* In practice, it's not critical to set this unless you know in advance that
* your messages are going to be very large.
*/
- public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) {
- this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH);
+ public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) throws TTransportException {
+ this(underlying, initialBufferCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
/**
@@ -102,27 +103,29 @@ public class TFastFramedTransport extends TTransport {
* @param maxLength The max frame size you are willing to read. You can use
* this parameter to limit how much memory can be allocated.
*/
- public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
- this.underlying = underlying;
+ public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) throws TTransportException {
+ super(underlying);
+ TConfiguration config = Objects.isNull(underlying.getConfiguration()) ? new TConfiguration() : underlying.getConfiguration();
this.maxLength = maxLength;
+ config.setMaxFrameSize(maxLength);
this.initialBufferCapacity = initialBufferCapacity;
- readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity);
- writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4);
+ readBuffer = new AutoExpandingBufferReadTransport(config, initialBufferCapacity);
+ writeBuffer = new AutoExpandingBufferWriteTransport(config, initialBufferCapacity, 4);
}
@Override
public void close() {
- underlying.close();
+ getInnerTransport().close();
}
@Override
public boolean isOpen() {
- return underlying.isOpen();
+ return getInnerTransport().isOpen();
}
@Override
public void open() throws TTransportException {
- underlying.open();
+ getInnerTransport().open();
}
@Override
@@ -139,7 +142,7 @@ public class TFastFramedTransport extends TTransport {
}
private void readFrame() throws TTransportException {
- underlying.readAll(i32buf , 0, 4);
+ getInnerTransport().readAll(i32buf , 0, 4);
int size = TFramedTransport.decodeFrameSize(i32buf);
if (size < 0) {
@@ -147,13 +150,13 @@ public class TFastFramedTransport extends TTransport {
throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
}
- if (size > maxLength) {
+ if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
"Frame size (" + size + ") larger than max length (" + maxLength + ")!");
}
- readBuffer.fill(underlying, size);
+ readBuffer.fill(getInnerTransport(), size);
}
@Override
@@ -169,18 +172,18 @@ public class TFastFramedTransport extends TTransport {
/**
* Only clears the read buffer!
*/
- public void clear() {
- readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity);
+ public void clear() throws TTransportException {
+ readBuffer = new AutoExpandingBufferReadTransport(getConfiguration(), initialBufferCapacity);
}
@Override
public void flush() throws TTransportException {
- int payloadLength = writeBuffer.getLength() - 4;
+ int payloadLength = writeBuffer.getLength() - 4;
byte[] data = writeBuffer.getBuf().array();
TFramedTransport.encodeFrameSize(payloadLength, data);
- underlying.write(data, 0, payloadLength + 4);
+ getInnerTransport().write(data, 0, payloadLength + 4);
writeBuffer.reset();
- underlying.flush();
+ getInnerTransport().flush();
}
@Override
diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java
index a006c3a6a..10a9a1c17 100644
--- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java
@@ -17,24 +17,22 @@
* under the License.
*/
-package org.apache.thrift.transport;
+package org.apache.thrift.transport.layered;
import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+import java.util.Objects;
/**
* TFramedTransport is a buffered TTransport that ensures a fully read message
* every time by preceding messages with a 4-byte frame size.
*/
-public class TFramedTransport extends TTransport {
-
- protected static final int DEFAULT_MAX_LENGTH = 16384000;
-
- private int maxLength_;
-
- /**
- * Underlying transport
- */
- private TTransport transport_ = null;
+public class TFramedTransport extends TLayeredTransport {
/**
* Buffer for output
@@ -45,14 +43,13 @@ public class TFramedTransport extends TTransport {
/**
* Buffer for input
*/
- private final TMemoryInputTransport readBuffer_ =
- new TMemoryInputTransport(new byte[0]);
+ private final TMemoryInputTransport readBuffer_;
public static class Factory extends TTransportFactory {
private int maxLength_;
public Factory() {
- maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ maxLength_ = TConfiguration.DEFAULT_MAX_FRAME_SIZE;
}
public Factory(int maxLength) {
@@ -60,7 +57,7 @@ public class TFramedTransport extends TTransport {
}
@Override
- public TTransport getTransport(TTransport base) {
+ public TTransport getTransport(TTransport base) throws TTransportException {
return new TFramedTransport(base, maxLength_);
}
}
@@ -75,28 +72,28 @@ public class TFramedTransport extends TTransport {
/**
* Constructor wraps around another transport
*/
- public TFramedTransport(TTransport transport, int maxLength) {
- transport_ = transport;
- maxLength_ = maxLength;
+ public TFramedTransport(TTransport transport, int maxLength) throws TTransportException {
+ super(transport);
+ TConfiguration _configuration = Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration();
+ _configuration.setMaxFrameSize(maxLength);
writeBuffer_.write(sizeFiller_, 0, 4);
+ readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]);
}
- public TFramedTransport(TTransport transport) {
- transport_ = transport;
- maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
- writeBuffer_.write(sizeFiller_, 0, 4);
+ public TFramedTransport(TTransport transport) throws TTransportException {
+ this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
}
public void open() throws TTransportException {
- transport_.open();
+ getInnerTransport().open();
}
public boolean isOpen() {
- return transport_.isOpen();
+ return getInnerTransport().isOpen();
}
public void close() {
- transport_.close();
+ getInnerTransport().close();
}
public int read(byte[] buf, int off, int len) throws TTransportException {
@@ -138,7 +135,7 @@ public class TFramedTransport extends TTransport {
private final byte[] i32buf = new byte[4];
private void readFrame() throws TTransportException {
- transport_.readAll(i32buf, 0, 4);
+ getInnerTransport().readAll(i32buf, 0, 4);
int size = decodeFrameSize(i32buf);
if (size < 0) {
@@ -146,14 +143,14 @@ public class TFramedTransport extends TTransport {
throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!");
}
- if (size > maxLength_) {
+ if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
- "Frame size (" + size + ") larger than max length (" + maxLength_ + ")!");
+ "Frame size (" + size + ") larger than max length (" + getInnerTransport().getConfiguration().getMaxFrameSize() + ")!");
}
byte[] buff = new byte[size];
- transport_.readAll(buff, 0, size);
+ getInnerTransport().readAll(buff, 0, size);
readBuffer_.reset(buff);
}
@@ -169,8 +166,8 @@ public class TFramedTransport extends TTransport {
writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data
encodeFrameSize(len, buf); // this is the frame length without the filler
- transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data
- transport_.flush();
+ getInnerTransport().write(buf, 0, len + 4); // we have to write the frame size and frame data
+ getInnerTransport().flush();
}
public static final void encodeFrameSize(final int frameSize, final byte[] buf) {
diff --git a/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java
new file mode 100644
index 000000000..69ec824ee
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport.layered;
+
+import org.apache.thrift.TConfiguration;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.util.Objects;
+
+public abstract class TLayeredTransport extends TTransport{
+
+ private TTransport innerTransport;
+
+ public TConfiguration getConfiguration() {
+ return innerTransport.getConfiguration();
+ }
+
+ public TLayeredTransport(TTransport transport)
+ {
+ Objects.requireNonNull(transport, "TTransport cannot be null.");
+ innerTransport = transport;
+ }
+
+ public void updateKnownMessageSize(long size) throws TTransportException {
+ innerTransport.updateKnownMessageSize(size);
+ }
+
+ public void checkReadBytesAvailable(long numBytes) throws TTransportException {
+ innerTransport.checkReadBytesAvailable(numBytes);
+ }
+
+ public TTransport getInnerTransport() {
+ return innerTransport;
+ }
+}