diff options
Diffstat (limited to 'lib/java/src/org/apache')
41 files changed, 930 insertions, 247 deletions
diff --git a/lib/java/src/org/apache/thrift/TAsyncProcessor.java b/lib/java/src/org/apache/thrift/TAsyncProcessor.java index 66f768896..5e287d5c9 100644 --- a/lib/java/src/org/apache/thrift/TAsyncProcessor.java +++ b/lib/java/src/org/apache/thrift/TAsyncProcessor.java @@ -23,7 +23,7 @@ import org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer; public interface TAsyncProcessor { /** * Process a single frame. - + * <b>Note:</b> Implementations must call fb.responseReady() once processing * is complete * diff --git a/lib/java/src/org/apache/thrift/TConfiguration.java b/lib/java/src/org/apache/thrift/TConfiguration.java new file mode 100644 index 000000000..b98274aad --- /dev/null +++ b/lib/java/src/org/apache/thrift/TConfiguration.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift; + + +public class TConfiguration { + public static final int DEFAULT_MAX_MESSAGE_SIZE = 100 * 1024 * 1024; + public static final int DEFAULT_MAX_FRAME_SIZE = 16384000; // this value is used consistently across all Thrift libraries + public static final int DEFAULT_RECURSION_DEPTH = 64; + + private int maxMessageSize; + private int maxFrameSize; + private int recursionLimit; + + public TConfiguration() { + this(DEFAULT_MAX_MESSAGE_SIZE, DEFAULT_MAX_FRAME_SIZE, DEFAULT_RECURSION_DEPTH); + } + public TConfiguration(int maxMessageSize, int maxFrameSize, int recursionLimit) { + this.maxFrameSize = maxFrameSize; + this.maxMessageSize = maxMessageSize; + this.recursionLimit = recursionLimit; + } + + public int getMaxMessageSize() { + return maxMessageSize; + } + + public int getMaxFrameSize() { + return maxFrameSize; + } + + public int getRecursionLimit() { + return recursionLimit; + } + + public void setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + } + + public void setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; + } + + public void setRecursionLimit(int recursionLimit) { + this.recursionLimit = recursionLimit; + } + + public static final TConfiguration DEFAULT = new Builder().build(); + + public static TConfiguration.Builder custom() { + return new Builder(); + } + + public static class Builder { + private int maxMessageSize ; + private int maxFrameSize; + private int recursionLimit ; + + Builder() { + super(); + this.maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + this.maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + this.recursionLimit = DEFAULT_RECURSION_DEPTH; + } + + public Builder setMaxMessageSize(int maxMessageSize) { + this.maxMessageSize = maxMessageSize; + return this; + } + + public Builder setMaxFrameSize(int maxFrameSize) { + this.maxFrameSize = maxFrameSize; + return this; + } + + public Builder setRecursionLimit(int recursionLimit) { + this.recursionLimit = recursionLimit; + return this; + } + + public TConfiguration build() { + return new TConfiguration(maxMessageSize, maxFrameSize, recursionLimit); + } + } +} diff --git a/lib/java/src/org/apache/thrift/TDeserializer.java b/lib/java/src/org/apache/thrift/TDeserializer.java index d1d396609..29be5578f 100644 --- a/lib/java/src/org/apache/thrift/TDeserializer.java +++ b/lib/java/src/org/apache/thrift/TDeserializer.java @@ -29,6 +29,7 @@ import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.protocol.TProtocolUtil; import org.apache.thrift.protocol.TType; import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransportException; /** * Generic utility for easily deserializing objects from a byte array or Java @@ -42,7 +43,7 @@ public class TDeserializer { /** * Create a new TDeserializer that uses the TBinaryProtocol by default. */ - public TDeserializer() { + public TDeserializer() throws TTransportException { this(new TBinaryProtocol.Factory()); } @@ -52,8 +53,8 @@ public class TDeserializer { * * @param protocolFactory Factory to create a protocol */ - public TDeserializer(TProtocolFactory protocolFactory) { - trans_ = new TMemoryInputTransport(); + public TDeserializer(TProtocolFactory protocolFactory) throws TTransportException { + trans_ = new TMemoryInputTransport(new TConfiguration()); protocol_ = protocolFactory.getProtocol(trans_); } @@ -105,19 +106,19 @@ public class TDeserializer { /** * Deserialize only a single Thrift object (addressed by recursively using field id) - * from a byte record. + * from a byte record. * @param tb The object to read into * @param bytes The serialized object to read from * @param fieldIdPathFirst First of the FieldId's that define a path tb * @param fieldIdPathRest The rest FieldId's that define a path tb - * @throws TException + * @throws TException */ public void partialDeserialize(TBase tb, byte[] bytes, TFieldIdEnum fieldIdPathFirst, TFieldIdEnum ... fieldIdPathRest) throws TException { try { if (locateField(bytes, fieldIdPathFirst, fieldIdPathRest) != null) { // if this line is reached, iprot will be positioned at the start of tb. tb.read(protocol_); - } + } } catch (Exception e) { throw new TException(e); } finally { diff --git a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java index 9cf873cac..78f3a571b 100644 --- a/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java +++ b/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java @@ -75,7 +75,7 @@ import java.util.concurrent.TimeoutException; * */ public class TNonblockingMultiFetchClient { - + private static final Logger LOGGER = LoggerFactory.getLogger( TNonblockingMultiFetchClient.class); @@ -86,7 +86,7 @@ public class TNonblockingMultiFetchClient { // time limit for fetching data from all servers (in second) private int fetchTimeoutSeconds; - // store request that will be sent to servers + // store request that will be sent to servers private ByteBuffer requestBuf; private ByteBuffer requestBufDuplication; @@ -104,7 +104,7 @@ public class TNonblockingMultiFetchClient { this.fetchTimeoutSeconds = fetchTimeoutSeconds; this.requestBuf = requestBuf; this.servers = servers; - + stats = new TNonblockingMultiFetchStats(); recvBuf = null; } @@ -128,7 +128,7 @@ public class TNonblockingMultiFetchClient { if (requestBufDuplication == null) { requestBufDuplication = requestBuf.duplicate(); } - return requestBufDuplication; + return requestBufDuplication; } } @@ -171,7 +171,7 @@ public class TNonblockingMultiFetchClient { task.cancel(true); LOGGER.error("Exception during fetch", ee); } catch (TimeoutException te) { - // attempt to cancel execution of the task. + // attempt to cancel execution of the task. task.cancel(true); LOGGER.error("Timeout for fetch", te); } @@ -207,10 +207,10 @@ public class TNonblockingMultiFetchClient { // buffer for receiving response from servers recvBuf = new ByteBuffer[numTotalServers]; // buffer for sending request - ByteBuffer sendBuf[] = new ByteBuffer[numTotalServers]; - long numBytesRead[] = new long[numTotalServers]; - int frameSize[] = new int[numTotalServers]; - boolean hasReadFrameSize[] = new boolean[numTotalServers]; + ByteBuffer[] sendBuf = new ByteBuffer[numTotalServers]; + long[] numBytesRead = new long[numTotalServers]; + int[] frameSize = new int[numTotalServers]; + boolean[] hasReadFrameSize = new boolean[numTotalServers]; try { selector = Selector.open(); @@ -240,10 +240,11 @@ public class TNonblockingMultiFetchClient { } catch (Exception e) { stats.incNumConnectErrorServers(); LOGGER.error("Set up socket to server {} error", server, e); + // free resource if (s != null) { try {s.close();} catch (Exception ex) {} - } + } if (key != null) { key.cancel(); } @@ -253,7 +254,7 @@ public class TNonblockingMultiFetchClient { // wait for events while (stats.getNumReadCompletedServers() + stats.getNumConnectErrorServers() < stats.getNumTotalServers()) { - // if the thread is interrupted (e.g., task is cancelled) + // if the thread is interrupted (e.g., task is cancelled) if (Thread.currentThread().isInterrupted()) { return; } @@ -380,4 +381,4 @@ public class TNonblockingMultiFetchClient { } } } -}
\ No newline at end of file +} diff --git a/lib/java/src/org/apache/thrift/TSerializer.java b/lib/java/src/org/apache/thrift/TSerializer.java index 4e1ce6129..4bf057d77 100644 --- a/lib/java/src/org/apache/thrift/TSerializer.java +++ b/lib/java/src/org/apache/thrift/TSerializer.java @@ -26,6 +26,7 @@ import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TTransportException; /** * Generic utility for easily serializing objects into a byte array or Java @@ -42,7 +43,7 @@ public class TSerializer { /** * This transport wraps that byte array */ - private final TIOStreamTransport transport_ = new TIOStreamTransport(baos_); + private final TIOStreamTransport transport_; /** * Internal protocol used for serializing objects. @@ -52,7 +53,7 @@ public class TSerializer { /** * Create a new TSerializer that uses the TBinaryProtocol by default. */ - public TSerializer() { + public TSerializer() throws TTransportException { this(new TBinaryProtocol.Factory()); } @@ -62,7 +63,8 @@ public class TSerializer { * * @param protocolFactory Factory to create a protocol */ - public TSerializer(TProtocolFactory protocolFactory) { + public TSerializer(TProtocolFactory protocolFactory) throws TTransportException { + transport_ = new TIOStreamTransport(new TConfiguration(), baos_); protocol_ = protocolFactory.getProtocol(transport_); } diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java index 3bf1747f4..d5c608d87 100644 --- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java +++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java @@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.protocol.TProtocolFactory; -import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.TMemoryBuffer; import org.apache.thrift.transport.TNonblockingTransport; import org.apache.thrift.transport.TTransportException; @@ -98,7 +98,7 @@ public abstract class TAsyncMethodCall<T> { protected long getStartTime() { return startTime; } - + protected long getSequenceId() { return sequenceId; } @@ -106,11 +106,11 @@ public abstract class TAsyncMethodCall<T> { public TAsyncClient getClient() { return client; } - + public boolean hasTimeout() { return timeout > 0; } - + public long getTimeoutTimestamp() { return timeout + startTime; } diff --git a/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java b/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java index 7924e2fe6..fc46f7c6f 100644 --- a/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * Binary protocol implementation for thrift. @@ -279,6 +280,8 @@ public class TBinaryProtocol extends TProtocol { @Override public TMap readMapBegin() throws TException { TMap map = new TMap(readByte(), readByte(), readI32()); + + checkReadBytesAvailable(map); checkContainerReadLength(map.size); return map; } @@ -289,6 +292,8 @@ public class TBinaryProtocol extends TProtocol { @Override public TList readListBegin() throws TException { TList list = new TList(readByte(), readI32()); + + checkReadBytesAvailable(list); checkContainerReadLength(list.size); return list; } @@ -299,6 +304,8 @@ public class TBinaryProtocol extends TProtocol { @Override public TSet readSetBegin() throws TException { TSet set = new TSet(readByte(), readI32()); + + checkReadBytesAvailable(set); checkContainerReadLength(set.size); return set; } @@ -393,8 +400,6 @@ public class TBinaryProtocol extends TProtocol { public String readString() throws TException { int size = readI32(); - checkStringReadLength(size); - if (trans_.getBytesRemainingInBuffer() >= size) { String s = new String(trans_.getBuffer(), trans_.getBufferPosition(), size, StandardCharsets.UTF_8); @@ -429,11 +434,14 @@ public class TBinaryProtocol extends TProtocol { return ByteBuffer.wrap(buf); } - private void checkStringReadLength(int length) throws TProtocolException { + private void checkStringReadLength(int length) throws TException { if (length < 0) { throw new TProtocolException(TProtocolException.NEGATIVE_SIZE, "Negative length: " + length); } + + getTransport().checkReadBytesAvailable(length); + if (stringLengthLimit_ != NO_LENGTH_LIMIT && length > stringLengthLimit_) { throw new TProtocolException(TProtocolException.SIZE_LIMIT, "Length exceeded max allowed: " + length); @@ -454,4 +462,28 @@ public class TBinaryProtocol extends TProtocol { private int readAll(byte[] buf, int off, int len) throws TException { return trans_.readAll(buf, off, len); } + + /** + * + * Return the minimum number of bytes a type will consume on the wire + */ + public int getMinSerializedSize(byte type) throws TTransportException { + switch (type) + { + case 0: return 0; // Stop + case 1: return 0; // Void + case 2: return 1; // Bool sizeof(byte) + case 3: return 1; // Byte sizeof(byte) + case 4: return 8; // Double sizeof(double) + case 6: return 2; // I16 sizeof(short) + case 8: return 4; // I32 sizeof(int) + case 10: return 8;// I64 sizeof(long) + case 11: return 4; // string length sizeof(int) + case 12: return 0; // empty struct + case 13: return 4; // element count Map sizeof(int) + case 14: return 4; // element count Set sizeof(int) + case 15: return 4; // element count List sizeof(int) + default: throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code"); + } + } } diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java index ee0586945..0dfcf25d1 100644 --- a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java @@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * TCompactProtocol2 is the Java implementation of the compact protocol specified @@ -579,7 +580,9 @@ public class TCompactProtocol extends TProtocol { int size = readVarint32(); checkContainerReadLength(size); byte keyAndValueType = size == 0 ? 0 : readByte(); - return new TMap(getTType((byte)(keyAndValueType >> 4)), getTType((byte)(keyAndValueType & 0xf)), size); + TMap map = new TMap(getTType((byte)(keyAndValueType >> 4)), getTType((byte)(keyAndValueType & 0xf)), size); + checkReadBytesAvailable(map); + return map; } /** @@ -595,8 +598,9 @@ public class TCompactProtocol extends TProtocol { size = readVarint32(); } checkContainerReadLength(size); - byte type = getTType(size_and_type); - return new TList(type, size); + TList list = new TList(getTType(size_and_type), size); + checkReadBytesAvailable(list); + return list; } /** @@ -694,9 +698,9 @@ public class TCompactProtocol extends TProtocol { */ public ByteBuffer readBinary() throws TException { int length = readVarint32(); - checkStringReadLength(length); + if (length == 0) return EMPTY_BUFFER; - + getTransport().checkReadBytesAvailable(length); if (trans_.getBytesRemainingInBuffer() >= length) { ByteBuffer bb = ByteBuffer.wrap(trans_.getBuffer(), trans_.getBufferPosition(), length); trans_.consumeBuffer(length); @@ -719,11 +723,14 @@ public class TCompactProtocol extends TProtocol { return buf; } - private void checkStringReadLength(int length) throws TProtocolException { + private void checkStringReadLength(int length) throws TException { if (length < 0) { throw new TProtocolException(TProtocolException.NEGATIVE_SIZE, "Negative length: " + length); } + + getTransport().checkReadBytesAvailable(length); + if (stringLengthLimit_ != NO_LENGTH_LIMIT && length > stringLengthLimit_) { throw new TProtocolException(TProtocolException.SIZE_LIMIT, "Length exceeded max allowed: " + length); @@ -901,4 +908,40 @@ public class TCompactProtocol extends TProtocol { private byte getCompactType(byte ttype) { return ttypeToCompactType[ttype]; } + + /** + * Return the minimum number of bytes a type will consume on the wire + */ + public int getMinSerializedSize(byte type) throws TTransportException { + switch (type) { + case 0: + return 0; // Stop + case 1: + return 0; // Void + case 2: + return 1; // Bool sizeof(byte) + case 3: + return 1; // Byte sizeof(byte) + case 4: + return 8; // Double sizeof(double) + case 6: + return 1; // I16 sizeof(byte) + case 8: + return 1; // I32 sizeof(byte) + case 10: + return 1;// I64 sizeof(byte) + case 11: + return 1; // string length sizeof(byte) + case 12: + return 0; // empty struct + case 13: + return 1; // element count Map sizeof(byte) + case 14: + return 1; // element count Set sizeof(byte) + case 15: + return 1; // element count List sizeof(byte) + default: + throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code"); + } + } } diff --git a/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java b/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java index d37c4937f..6bb49cb2f 100644 --- a/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java @@ -28,6 +28,7 @@ import java.util.Stack; import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * JSON protocol implementation for thrift. @@ -591,17 +592,17 @@ public class TJSONProtocol extends TProtocol { @Override public void writeByte(byte b) throws TException { - writeJSONInteger((long)b); + writeJSONInteger(b); } @Override public void writeI16(short i16) throws TException { - writeJSONInteger((long)i16); + writeJSONInteger(i16); } @Override public void writeI32(int i32) throws TException { - writeJSONInteger((long)i32); + writeJSONInteger(i32); } @Override @@ -895,7 +896,10 @@ public class TJSONProtocol extends TProtocol { byte valueType = getTypeIDForTypeName(readJSONString(false).get()); int size = (int)readJSONInteger(); readJSONObjectStart(); - return new TMap(keyType, valueType, size); + TMap map = new TMap(keyType, valueType, size); + + checkReadBytesAvailable(map); + return map; } @Override @@ -909,7 +913,10 @@ public class TJSONProtocol extends TProtocol { readJSONArrayStart(); byte elemType = getTypeIDForTypeName(readJSONString(false).get()); int size = (int)readJSONInteger(); - return new TList(elemType, size); + TList list = new TList(elemType, size); + + checkReadBytesAvailable(list); + return list; } @Override @@ -922,7 +929,10 @@ public class TJSONProtocol extends TProtocol { readJSONArrayStart(); byte elemType = getTypeIDForTypeName(readJSONString(false).get()); int size = (int)readJSONInteger(); - return new TSet(elemType, size); + TSet set = new TSet(elemType, size); + + checkReadBytesAvailable(set); + return set; } @Override @@ -932,7 +942,7 @@ public class TJSONProtocol extends TProtocol { @Override public boolean readBool() throws TException { - return (readJSONInteger() == 0 ? false : true); + return (readJSONInteger() != 0); } @Override @@ -952,7 +962,7 @@ public class TJSONProtocol extends TProtocol { @Override public long readI64() throws TException { - return (long) readJSONInteger(); + return readJSONInteger(); } @Override @@ -962,7 +972,9 @@ public class TJSONProtocol extends TProtocol { @Override public String readString() throws TException { - return readJSONString(false).toString(StandardCharsets.UTF_8); + String str = readJSONString(false).toString(StandardCharsets.UTF_8); + getTransport().checkReadBytesAvailable(str.length() * getMinSerializedSize(TType.STRING)); + return str; } @Override @@ -970,4 +982,28 @@ public class TJSONProtocol extends TProtocol { return ByteBuffer.wrap(readJSONBase64()); } + /** + * + * Return the minimum number of bytes a type will consume on the wire + */ + public int getMinSerializedSize(byte type) throws TTransportException { + switch (type) + { + case 0: return 0; // Stop + case 1: return 0; // Void + case 2: return 1; // Bool + case 3: return 1; // Byte + case 4: return 1; // Double + case 6: return 1; // I16 + case 8: return 1; // I32 + case 10: return 1;// I64 + case 11: return 2; // string length + case 12: return 2; // empty struct + case 13: return 2; // element count Map + case 14: return 2; // element count Set + case 15: return 2; // element count List + default: throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code"); + } + } + } diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocol.java b/lib/java/src/org/apache/thrift/protocol/TProtocol.java index 0e96368d4..38c030e73 100644 --- a/lib/java/src/org/apache/thrift/protocol/TProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TProtocol.java @@ -57,6 +57,27 @@ public abstract class TProtocol { return trans_; } + protected void checkReadBytesAvailable(TMap map) throws TException { + long elemSize = getMinSerializedSize(map.keyType) + getMinSerializedSize(map.valueType); + trans_.checkReadBytesAvailable(map.size * elemSize); + } + + protected void checkReadBytesAvailable(TList list) throws TException { + trans_.checkReadBytesAvailable(list.size * getMinSerializedSize(list.elemType)); + } + + protected void checkReadBytesAvailable(TSet set) throws TException { + trans_.checkReadBytesAvailable(set.size * getMinSerializedSize(set.elemType)); + } + + /** + * Return + * @param type Returns the minimum amount of bytes needed to store the smallest possible instance of TType. + * @return + * @throws TException + */ + public abstract int getMinSerializedSize(byte type) throws TException; + /** * Writing methods. */ @@ -152,7 +173,7 @@ public abstract class TProtocol { * be implemented for stateful protocols. */ public void reset() {} - + /** * Scheme accessor */ diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java b/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java index 2d29cd231..9d109622f 100644 --- a/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java +++ b/lib/java/src/org/apache/thrift/protocol/TProtocolDecorator.java @@ -31,7 +31,7 @@ import java.nio.ByteBuffer; * the behaviour of the enclosed <code>TProtocol</code>. * * <p>See p.175 of Design Patterns (by Gamma et al.)</p> - * + * * @see org.apache.thrift.protocol.TMultiplexedProtocol */ public abstract class TProtocolDecorator extends TProtocol { @@ -210,4 +210,14 @@ public abstract class TProtocolDecorator extends TProtocol { public ByteBuffer readBinary() throws TException { return concreteProtocol.readBinary(); } + + /** + * + * @param type Returns the minimum amount of bytes needed to store the smallest possible instance of TType. + * @return + * @throws TException + */ + public int getMinSerializedSize(byte type) throws TException { + return concreteProtocol.getMinSerializedSize(type); + } } diff --git a/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java b/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java index eb7e23bf9..9413f619c 100644 --- a/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java @@ -25,6 +25,7 @@ import java.util.Stack; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; /** * JSON protocol implementation for thrift. @@ -480,4 +481,28 @@ public class TSimpleJSONProtocol extends TProtocol { super(message); } } + + /** + * + * Return the minimum number of bytes a type will consume on the wire + */ + public int getMinSerializedSize(byte type) throws TException { + switch (type) + { + case 0: return 0; // Stop + case 1: return 0; // Void + case 2: return 1; // Bool + case 3: return 1; // Byte + case 4: return 1; // Double + case 6: return 1; // I16 + case 8: return 1; // I32 + case 10: return 1;// I64 + case 11: return 2; // string length + case 12: return 2; // empty struct + case 13: return 2; // element count Map + case 14: return 2; // element count Set + case 15: return 2; // element count List + default: throw new TTransportException(TTransportException.UNKNOWN, "unrecognized type code"); + } + } } diff --git a/lib/java/src/org/apache/thrift/protocol/TTupleProtocol.java b/lib/java/src/org/apache/thrift/protocol/TTupleProtocol.java index 74f5226c8..67d00edb7 100644 --- a/lib/java/src/org/apache/thrift/protocol/TTupleProtocol.java +++ b/lib/java/src/org/apache/thrift/protocol/TTupleProtocol.java @@ -80,9 +80,9 @@ public final class TTupleProtocol extends TCompactProtocol { * extension). The byte-ordering of the result is big-endian which means the * most significant bit is in element 0. The bit at index 0 of the bit set is * assumed to be the least significant bit. - * + * * @param bits - * @param vectorWidth + * @param vectorWidth * @return a byte array of at least length 1 */ public static byte[] toByteArray(BitSet bits, int vectorWidth) { @@ -95,4 +95,27 @@ public final class TTupleProtocol extends TCompactProtocol { return bytes; } + public TMap readMapBegin(byte keyType, byte valTyep) throws TException { + int size = super.readI32(); + TMap map = new TMap(keyType, valTyep, size); + + checkReadBytesAvailable(map); + return map; + } + + public TList readListBegin(byte type) throws TException { + int size = super.readI32(); + TList list = new TList(type, size); + + checkReadBytesAvailable(list); + return list; + } + + public TSet readSetBegin(byte type) throws TException { + return new TSet(readListBegin(type)); + } + + public void readMapEnd() throws TException {} + public void readListEnd() throws TException {} + public void readSetEnd() throws TException {} } diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java index 8c206e427..4aae803f2 100644 --- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java @@ -23,7 +23,7 @@ import org.apache.thrift.TAsyncProcessor; import org.apache.thrift.TByteArrayOutputStream; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.TIOStreamTransport; import org.apache.thrift.transport.TMemoryInputTransport; import org.apache.thrift.transport.TNonblockingServerTransport; @@ -305,7 +305,7 @@ public abstract class AbstractNonblockingServer extends TServer { public FrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, - final AbstractSelectThread selectThread) { + final AbstractSelectThread selectThread) throws TTransportException { trans_ = trans; selectionKey_ = selectionKey; selectThread_ = selectThread; @@ -542,10 +542,7 @@ public abstract class AbstractNonblockingServer extends TServer { */ private boolean internalRead() { try { - if (trans_.read(buffer_) < 0) { - return false; - } - return true; + return trans_.read(buffer_) >= 0; } catch (IOException e) { LOGGER.warn("Got an IOException in internalRead!", e); return false; @@ -582,7 +579,7 @@ public abstract class AbstractNonblockingServer extends TServer { } // FrameBuffer public class AsyncFrameBuffer extends FrameBuffer { - public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) { + public AsyncFrameBuffer(TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) throws TTransportException { super(trans, selectionKey, selectThread); } diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java index 79610b0f7..eac05a876 100644 --- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -215,7 +215,7 @@ public class TNonblockingServer extends AbstractNonblockingServer { protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, - final AbstractSelectThread selectThread) { + final AbstractSelectThread selectThread) throws TTransportException { return processorFactory_.isAsyncProcessor() ? new AsyncFrameBuffer(trans, selectionKey, selectThread) : new FrameBuffer(trans, selectionKey, selectThread); @@ -229,7 +229,7 @@ public class TNonblockingServer extends AbstractNonblockingServer { TNonblockingTransport client = null; try { // accept the connection - client = (TNonblockingTransport)serverTransport.accept(); + client = serverTransport.accept(); clientKey = client.registerSelector(selector, SelectionKey.OP_READ); // add this key to the map diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java index 038507e9c..095aacbc5 100644 --- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java +++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java @@ -457,7 +457,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { private TNonblockingTransport doAccept() { try { - return (TNonblockingTransport) serverTransport.accept(); + return serverTransport.accept(); } catch (TTransportException tte) { // something went wrong accepting. LOGGER.warn("Exception trying to accept!", tte); @@ -685,7 +685,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans, final SelectionKey selectionKey, - final AbstractSelectThread selectThread) { + final AbstractSelectThread selectThread) throws TTransportException { return processorFactory_.isAsyncProcessor() ? new AsyncFrameBuffer(trans, selectionKey, selectThread) : new FrameBuffer(trans, selectionKey, selectThread); @@ -699,7 +699,7 @@ public class TThreadedSelectorServer extends AbstractNonblockingServer { FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this); clientKey.attach(frameBuffer); - } catch (IOException e) { + } catch (IOException | TTransportException e) { LOGGER.warn("Failed to register accepted connection to selector!", e); if (clientKey != null) { cleanupSelectionKey(clientKey); diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java index fc3aa92df..b355d11ca 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java @@ -27,7 +27,7 @@ import java.util.Arrays; * rate slightly faster than the requested capacity with the (untested) * objective of avoiding expensive buffer allocations and copies. */ -class AutoExpandingBuffer { +public class AutoExpandingBuffer { private byte[] array; public AutoExpandingBuffer(int initialCapacity) { diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java index d06fec78c..6fd4075b9 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java @@ -18,17 +18,20 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + /** * TTransport for reading from an AutoExpandingBuffer. */ -public class AutoExpandingBufferReadTransport extends TTransport { +public class AutoExpandingBufferReadTransport extends TEndpointTransport { private final AutoExpandingBuffer buf; private int pos = 0; private int limit = 0; - public AutoExpandingBufferReadTransport(int initialCapacity) { + public AutoExpandingBufferReadTransport(TConfiguration config, int initialCapacity) throws TTransportException { + super(config); this.buf = new AutoExpandingBuffer(initialCapacity); } diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java index ec7e7d45a..25f974a73 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java @@ -18,10 +18,12 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + /** * TTransport for writing to an AutoExpandingBuffer. */ -public final class AutoExpandingBufferWriteTransport extends TTransport { +public final class AutoExpandingBufferWriteTransport extends TEndpointTransport { private final AutoExpandingBuffer buf; private int pos; @@ -38,7 +40,8 @@ public final class AutoExpandingBufferWriteTransport extends TTransport { * @throws IllegalArgumentException if frontReserve is less than zero * @throws IllegalArgumentException if frontReserve is greater than initialCapacity */ - public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) { + public AutoExpandingBufferWriteTransport(TConfiguration config, int initialCapacity, int frontReserve) throws TTransportException { + super(config); if (initialCapacity < 1) { throw new IllegalArgumentException("initialCapacity"); } diff --git a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java index b6b065748..c792f3b1c 100644 --- a/lib/java/src/org/apache/thrift/transport/TByteBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/TByteBuffer.java @@ -1,5 +1,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.nio.BufferOverflowException; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; @@ -7,14 +9,16 @@ import java.nio.ByteBuffer; /** * ByteBuffer-backed implementation of TTransport. */ -public final class TByteBuffer extends TTransport { +public final class TByteBuffer extends TEndpointTransport { private final ByteBuffer byteBuffer; /** * Creates a new TByteBuffer wrapping a given NIO ByteBuffer. */ - public TByteBuffer(ByteBuffer byteBuffer) { + public TByteBuffer(ByteBuffer byteBuffer) throws TTransportException { + super(new TConfiguration()); this.byteBuffer = byteBuffer; + updateKnownMessageSize(byteBuffer.capacity()); } @Override @@ -32,6 +36,9 @@ public final class TByteBuffer extends TTransport { @Override public int read(byte[] buf, int off, int len) throws TTransportException { + // + checkReadBytesAvailable(len); + final int n = Math.min(byteBuffer.remaining(), len); if (n > 0) { try { diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java new file mode 100644 index 000000000..f32efae90 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport; + +import org.apache.thrift.TConfiguration; + +import java.util.Objects; + +public abstract class TEndpointTransport extends TTransport{ + + protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); } + + protected long knownMessageSize; + protected long remainingMessageSize; + + private TConfiguration _configuration; + public TConfiguration getConfiguration() { + return _configuration; + } + + public TEndpointTransport( TConfiguration config) throws TTransportException { + _configuration = Objects.isNull(config) ? new TConfiguration() : config; + + resetConsumedMessageSize(-1); + } + + /** + * Resets RemainingMessageSize to the configured maximum + * @param newSize + */ + protected void resetConsumedMessageSize(long newSize) throws TTransportException { + // full reset + if (newSize < 0) + { + knownMessageSize = getMaxMessageSize(); + remainingMessageSize = getMaxMessageSize(); + return; + } + + // update only: message size can shrink, but not grow + if (newSize > knownMessageSize) + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + + knownMessageSize = newSize; + remainingMessageSize = newSize; + } + + /** + * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport). + * Will throw if we already consumed too many bytes or if the new size is larger than allowed. + * @param size + */ + public void updateKnownMessageSize(long size) throws TTransportException { + long consumed = knownMessageSize - remainingMessageSize; + resetConsumedMessageSize(size == 0 ? -1 : size); + countConsumedMessageBytes(consumed); + } + + /** + * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data + * @param numBytes + */ + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + if (remainingMessageSize < numBytes) + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + } + + /** + * Consumes numBytes from the RemainingMessageSize. + * @param numBytes + */ + protected void countConsumedMessageBytes(long numBytes) throws TTransportException { + if (remainingMessageSize >= numBytes) + { + remainingMessageSize -= numBytes; + } + else + { + remainingMessageSize = 0; + throw new TTransportException(TTransportException.END_OF_FILE, "MaxMessageSize reached"); + } + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TFileTransport.java b/lib/java/src/org/apache/thrift/transport/TFileTransport.java index 88b73e54d..85f97084c 100644 --- a/lib/java/src/org/apache/thrift/transport/TFileTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFileTransport.java @@ -26,13 +26,14 @@ import java.io.OutputStream; import java.io.IOException; import java.util.Random; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * FileTransport implementation of the TTransport interface. * Currently this is a straightforward port of the cpp implementation - * + * * It may make better sense to provide a basic stream access on top of the framed file format * The FileTransport can then be a user of this framed file format with some additional logic * for chunking. @@ -44,7 +45,7 @@ public class TFileTransport extends TTransport { public static class TruncableBufferedInputStream extends BufferedInputStream { public void trunc() { pos = count = 0; - } + } public TruncableBufferedInputStream(InputStream in) { super(in); } @@ -62,7 +63,7 @@ public class TFileTransport extends TTransport { /** * Initialize an event. Initially, it has no valid contents * - * @param buf byte array buffer to store event + * @param buf byte array buffer to store event */ public Event(byte[] buf) { buf_ = buf; @@ -88,9 +89,9 @@ public class TFileTransport extends TTransport { return(ndesired); } - }; + } - public static class ChunkState { + public static class ChunkState { /** * Chunk Size. Must be same across all implementations */ @@ -111,7 +112,7 @@ public class TFileTransport extends TTransport { public long getOffset() { return (offset_);} } - public static enum TailPolicy { + public enum TailPolicy { NOWAIT(0, 0), WAIT_FOREVER(500, -1); @@ -148,13 +149,13 @@ public class TFileTransport extends TTransport { TailPolicy currentPolicy_ = TailPolicy.NOWAIT; - /** + /** * Underlying file being read */ protected TSeekableFile inputFile_ = null; - /** - * Underlying outputStream + /** + * Underlying outputStream */ protected OutputStream outputStream_ = null; @@ -181,7 +182,7 @@ public class TFileTransport extends TTransport { /** * Get File Tailing Policy - * + * * @return current read policy */ public TailPolicy getTailPolicy() { @@ -190,7 +191,7 @@ public class TFileTransport extends TTransport { /** * Set file Tailing Policy - * + * * @param policy New policy to set * @return Old policy */ @@ -203,7 +204,7 @@ public class TFileTransport extends TTransport { /** * Initialize read input stream - * + * * @return input stream to read from file */ private InputStream createInputStream() throws TTransportException { @@ -223,7 +224,7 @@ public class TFileTransport extends TTransport { /** * Read (potentially tailing) an input stream - * + * * @param is InputStream to read from * @param buf Buffer to read into * @param off Offset in buffer to read into @@ -232,7 +233,7 @@ public class TFileTransport extends TTransport { * * @return number of bytes read */ - private int tailRead(InputStream is, byte[] buf, + private int tailRead(InputStream is, byte[] buf, int off, int len, TailPolicy tp) throws TTransportException { int orig_len = len; try { @@ -322,7 +323,7 @@ public class TFileTransport extends TTransport { // check if event is corrupted and do recovery as required if(esize > cs.getRemaining()) { throw new TTransportException("FileTransport error: bad event size"); - /* + /* if(performRecovery()) { esize=0; } else { @@ -361,7 +362,7 @@ public class TFileTransport extends TTransport { * Files are not opened in ctor - but in explicit open call */ public void open() throws TTransportException { - if (isOpen()) + if (isOpen()) throw new TTransportException(TTransportException.ALREADY_OPEN); try { @@ -406,7 +407,7 @@ public class TFileTransport extends TTransport { * * @param path File path to read and write from * @param readOnly Whether this is a read-only transport - */ + */ public TFileTransport(final String path, boolean readOnly) throws IOException { inputFile_ = new TStandardFile(path); readOnly_ = readOnly; @@ -457,8 +458,8 @@ public class TFileTransport extends TTransport { * @throws TTransportException if there was an error reading data */ public int read(byte[] buf, int off, int len) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before reading"); if(currentEvent_.getRemaining() == 0) { @@ -471,14 +472,14 @@ public class TFileTransport extends TTransport { } public int getNumChunks() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getNumChunks"); try { long len = inputFile_.length(); if(len == 0) return 0; - else + else return (((int)(len/cs.getChunkSize())) + 1); } catch (IOException iox) { @@ -487,8 +488,8 @@ public class TFileTransport extends TTransport { } public int getCurChunk() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before getCurChunk"); return (cs.getChunkNum()); @@ -496,8 +497,8 @@ public class TFileTransport extends TTransport { public void seekToChunk(int chunk) throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); int numChunks = getNumChunks(); @@ -527,7 +528,7 @@ public class TFileTransport extends TTransport { } if(chunk*cs.getChunkSize() != cs.getOffset()) { - try { inputFile_.seek((long)chunk*cs.getChunkSize()); } + try { inputFile_.seek((long)chunk*cs.getChunkSize()); } catch (IOException iox) { throw new TTransportException("Seek to chunk " + chunk + " " +iox.getMessage(), iox); @@ -549,8 +550,8 @@ public class TFileTransport extends TTransport { } public void seekToEnd() throws TTransportException { - if(!isOpen()) - throw new TTransportException(TTransportException.NOT_OPEN, + if(!isOpen()) + throw new TTransportException(TTransportException.NOT_OPEN, "Must open before seeking"); seekToChunk(getNumChunks()); } @@ -577,9 +578,25 @@ public class TFileTransport extends TTransport { throw new TTransportException("Not Supported"); } + + @Override + public TConfiguration getConfiguration() { + return null; + } + + @Override + public void updateKnownMessageSize(long size) throws TTransportException { + + } + + @Override + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + + } + /** * test program - * + * */ public static void main(String[] args) throws Exception { @@ -594,7 +611,7 @@ public class TFileTransport extends TTransport { try { num_chunks = Integer.parseInt(args[1]); } catch (Exception e) { - LOGGER.error("Cannot parse " + args[1]); + LOGGER.error("Cannot parse " + args[1]); printUsage(); } } diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java index c3063fe43..7d61b5c8e 100644 --- a/lib/java/src/org/apache/thrift/transport/THttpClient.java +++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java @@ -37,6 +37,7 @@ import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.params.CoreConnectionPNames; +import org.apache.thrift.TConfiguration; /** * HTTP implementation of the TTransport interface. Used for working with a @@ -51,7 +52,7 @@ import org.apache.http.params.CoreConnectionPNames; * HttpClient to THttpClient(String url, HttpClient client) will create an * instance which will use HttpURLConnection. * - * When using HttpClient, the following configuration leads to 5-15% + * When using HttpClient, the following configuration leads to 5-15% * better performance than the HttpURLConnection implementation: * * http.protocol.version=HttpVersion.HTTP_1_1 @@ -65,7 +66,7 @@ import org.apache.http.params.CoreConnectionPNames; * @see <a href="https://issues.apache.org/jira/browse/THRIFT-970">THRIFT-970</a> */ -public class THttpClient extends TTransport { +public class THttpClient extends TEndpointTransport { private URL url_ = null; @@ -80,14 +81,14 @@ public class THttpClient extends TTransport { private Map<String,String> customHeaders_ = null; private final HttpHost host; - + private final HttpClient client; - + public static class Factory extends TTransportFactory { - + private final String url; private final HttpClient client; - + public Factory(String url) { this.url = url; this.client = null; @@ -97,14 +98,14 @@ public class THttpClient extends TTransport { this.url = url; this.client = client; } - + @Override public TTransport getTransport(TTransport trans) { try { if (null != client) { - return new THttpClient(url, client); + return new THttpClient(trans.getConfiguration(), url, client); } else { - return new THttpClient(url); + return new THttpClient(trans.getConfiguration(), url); } } catch (TTransportException tte) { return null; @@ -112,7 +113,19 @@ public class THttpClient extends TTransport { } } + public THttpClient(TConfiguration config, String url) throws TTransportException { + super(config); + try { + url_ = new URL(url); + this.client = null; + this.host = null; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + public THttpClient(String url) throws TTransportException { + super(new TConfiguration()); try { url_ = new URL(url); this.client = null; @@ -122,7 +135,19 @@ public class THttpClient extends TTransport { } } + public THttpClient(TConfiguration config, String url, HttpClient client) throws TTransportException { + super(config); + try { + url_ = new URL(url); + this.client = client; + this.host = new HttpHost(url_.getHost(), -1 == url_.getPort() ? url_.getDefaultPort() : url_.getPort(), url_.getProtocol()); + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + public THttpClient(String url, HttpClient client) throws TTransportException { + super(new TConfiguration()); try { url_ = new URL(url); this.client = client; @@ -168,7 +193,6 @@ public class THttpClient extends TTransport { try { inputStream_.close(); } catch (IOException ioe) { - ; } inputStream_ = null; } @@ -182,11 +206,16 @@ public class THttpClient extends TTransport { if (inputStream_ == null) { throw new TTransportException("Response buffer is empty, no request."); } + + checkReadBytesAvailable(len); + try { int ret = inputStream_.read(buf, off, len); if (ret == -1) { throw new TTransportException("No more data available."); } + countConsumedMessageBytes(ret); + return ret; } catch (IOException iox) { throw new TTransportException(iox); @@ -214,7 +243,7 @@ public class THttpClient extends TTransport { } private void flushUsingHttpClient() throws TTransportException { - + if (null == this.client) { throw new TTransportException("Null HttpClient, aborting."); } @@ -224,22 +253,22 @@ public class THttpClient extends TTransport { requestBuffer_.reset(); HttpPost post = null; - + InputStream is = null; - - try { + + try { // Set request to path + query string post = new HttpPost(this.url_.getFile()); - + // // Headers are added to the HttpPost instance, not // to HttpClient. // - + post.setHeader("Content-Type", "application/x-thrift"); post.setHeader("Accept", "application/x-thrift"); post.setHeader("User-Agent", "Java/THttpClient/HC"); - + if (null != customHeaders_) { for (Map.Entry<String, String> header : customHeaders_.entrySet()) { post.setHeader(header.getKey(), header.getValue()); @@ -247,17 +276,17 @@ public class THttpClient extends TTransport { } post.setEntity(new ByteArrayEntity(data)); - + HttpResponse response = this.client.execute(this.host, post); int responseCode = response.getStatusLine().getStatusCode(); - // + // // Retrieve the inputstream BEFORE checking the status code so // resources get freed in the finally clause. // is = response.getEntity().getContent(); - + if (responseCode != HttpStatus.SC_OK) { throw new TTransportException("HTTP Response code: " + responseCode); } @@ -268,10 +297,10 @@ public class THttpClient extends TTransport { // thrift struct is being read up the chain). // Proceeding differently might lead to exhaustion of connections and thus // to app failure. - + byte[] buf = new byte[1024]; ByteArrayOutputStream baos = new ByteArrayOutputStream(); - + int len = 0; do { len = is.read(buf); @@ -279,7 +308,7 @@ public class THttpClient extends TTransport { baos.write(buf, 0, len); } } while (-1 != len); - + try { // Indicate we're done with the content. consume(response.getEntity()); @@ -287,7 +316,7 @@ public class THttpClient extends TTransport { // We ignore this exception, it might only mean the server has no // keep-alive capability. } - + inputStream_ = new ByteArrayInputStream(baos.toByteArray()); } catch (IOException ioe) { // Abort method so the connection gets released back to the connection manager @@ -296,6 +325,7 @@ public class THttpClient extends TTransport { } throw new TTransportException(ioe); } finally { + resetConsumedMessageSize(-1); if (null != is) { // Close the entity's input stream, this will release the underlying connection try { @@ -357,6 +387,8 @@ public class THttpClient extends TTransport { } catch (IOException iox) { throw new TTransportException(iox); + } finally { + resetConsumedMessageSize(-1); } } } diff --git a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java index d97d5063e..763e66ad6 100644 --- a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,7 +34,7 @@ import java.io.OutputStream; * has to provide a variety of types of streams. * */ -public class TIOStreamTransport extends TTransport { +public class TIOStreamTransport extends TEndpointTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TIOStreamTransport.class.getName()); @@ -47,33 +48,80 @@ public class TIOStreamTransport extends TTransport { * Subclasses can invoke the default constructor and then assign the input * streams in the open method. */ - protected TIOStreamTransport() {} + protected TIOStreamTransport(TConfiguration config) throws TTransportException { + super(config); + } + + /** + * Subclasses can invoke the default constructor and then assign the input + * streams in the open method. + */ + protected TIOStreamTransport() throws TTransportException { + super(new TConfiguration()); + } /** * Input stream constructor, constructs an input only transport. * + * @param config * @param is Input stream to read from */ - public TIOStreamTransport(InputStream is) { + public TIOStreamTransport(TConfiguration config, InputStream is) throws TTransportException { + super(config); inputStream_ = is; } + /** + * Input stream constructor, constructs an input only transport. + * + * @param is Input stream to read from + */ + public TIOStreamTransport(InputStream is) throws TTransportException { + super(new TConfiguration()); + inputStream_ = is; + } + + /** + * Output stream constructor, constructs an output only transport. + * + * @param config + * @param os Output stream to write to + */ + public TIOStreamTransport(TConfiguration config, OutputStream os) throws TTransportException { + super(config); + outputStream_ = os; + } /** * Output stream constructor, constructs an output only transport. * * @param os Output stream to write to */ - public TIOStreamTransport(OutputStream os) { + public TIOStreamTransport(OutputStream os) throws TTransportException { + super(new TConfiguration()); outputStream_ = os; } /** * Two-way stream constructor. * + * @param config * @param is Input stream to read from * @param os Output stream to read from */ - public TIOStreamTransport(InputStream is, OutputStream os) { + public TIOStreamTransport(TConfiguration config, InputStream is, OutputStream os) throws TTransportException { + super(config); + inputStream_ = is; + outputStream_ = os; + } + + /** + * Two-way stream constructor. + * + * @param is Input stream to read from + * @param os Output stream to read from + */ + public TIOStreamTransport(InputStream is, OutputStream os) throws TTransportException { + super(new TConfiguration()); inputStream_ = is; outputStream_ = os; } @@ -158,6 +206,9 @@ public class TIOStreamTransport extends TTransport { } try { outputStream_.flush(); + + resetConsumedMessageSize(-1); + } catch (IOException iox) { throw new TTransportException(TTransportException.UNKNOWN, iox); } diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java index b19ac86d2..c3a3eb4e1 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java @@ -20,21 +20,39 @@ package org.apache.thrift.transport; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; + import java.nio.charset.Charset; /** * Memory buffer-based implementation of the TTransport interface. */ -public class TMemoryBuffer extends TTransport { +public class TMemoryBuffer extends TEndpointTransport { + /** + * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The + * internal buffer will grow as necessary to accommodate the size of the data + * being written to it. + * + * @param size the initial size of the buffer + */ + public TMemoryBuffer(int size) throws TTransportException { + super(new TConfiguration()); + arr_ = new TByteArrayOutputStream(size); + updateKnownMessageSize(size); + } + /** * Create a TMemoryBuffer with an initial buffer size of <i>size</i>. The * internal buffer will grow as necessary to accommodate the size of the data * being written to it. * + * @param config * @param size the initial size of the buffer */ - public TMemoryBuffer(int size) { + public TMemoryBuffer(TConfiguration config, int size) throws TTransportException { + super(config); arr_ = new TByteArrayOutputStream(size); + updateKnownMessageSize(size); } @Override @@ -53,9 +71,11 @@ public class TMemoryBuffer extends TTransport { } @Override - public int read(byte[] buf, int off, int len) { + public int read(byte[] buf, int off, int len) throws TTransportException { + checkReadBytesAvailable(len); byte[] src = arr_.get(); int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len); + if (amtToRead > 0) { System.arraycopy(src, pos_, buf, off, amtToRead); pos_ += amtToRead; diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java index 2530dcc36..6cb06fc37 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryInputTransport.java @@ -18,21 +18,38 @@ */ package org.apache.thrift.transport; -public final class TMemoryInputTransport extends TTransport { +import org.apache.thrift.TConfiguration; + +public final class TMemoryInputTransport extends TEndpointTransport { private byte[] buf_; private int pos_; private int endPos_; - public TMemoryInputTransport() { + public TMemoryInputTransport() throws TTransportException { + this(new TConfiguration()); + } + + public TMemoryInputTransport(TConfiguration _configuration) throws TTransportException { + this(_configuration, new byte[0]); + } + + public TMemoryInputTransport(byte[] buf) throws TTransportException { + this(new TConfiguration(), buf); } - public TMemoryInputTransport(byte[] buf) { - reset(buf); + public TMemoryInputTransport(TConfiguration _configuration, byte[] buf) throws TTransportException { + this(_configuration, buf, 0, buf.length); } - public TMemoryInputTransport(byte[] buf, int offset, int length) { + public TMemoryInputTransport(byte[] buf, int offset, int length) throws TTransportException { + this(new TConfiguration(), buf, offset, length); + } + + public TMemoryInputTransport(TConfiguration _configuration, byte[] buf, int offset, int length) throws TTransportException { + super(_configuration); reset(buf, offset, length); + updateKnownMessageSize(length); } public void reset(byte[] buf) { @@ -43,10 +60,20 @@ public final class TMemoryInputTransport extends TTransport { buf_ = buf; pos_ = offset; endPos_ = offset + length; + try { + resetConsumedMessageSize(-1); + } catch (TTransportException e) { + // ignore + } } public void clear() { buf_ = null; + try { + resetConsumedMessageSize(-1); + } catch (TTransportException e) { + // ignore + } } @Override @@ -67,6 +94,7 @@ public final class TMemoryInputTransport extends TTransport { if (amtToRead > 0) { System.arraycopy(buf_, pos_, buf, off, amtToRead); consumeBuffer(amtToRead); + countConsumedMessageBytes(amtToRead); } return amtToRead; } diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java index f41bc09c8..0172ca816 100644 --- a/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TMemoryTransport.java @@ -22,18 +22,28 @@ package org.apache.thrift.transport; import java.nio.ByteBuffer; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; /** * In memory transport with separate buffers for input and output. */ -public class TMemoryTransport extends TTransport { +public class TMemoryTransport extends TEndpointTransport { private final ByteBuffer inputBuffer; private final TByteArrayOutputStream outputBuffer; - public TMemoryTransport(byte[] input) { + public TMemoryTransport(byte[] input) throws TTransportException { + super(new TConfiguration()); inputBuffer = ByteBuffer.wrap(input); outputBuffer = new TByteArrayOutputStream(1024); + updateKnownMessageSize(input.length); + } + + public TMemoryTransport(TConfiguration config, byte[] input) throws TTransportException { + super(config); + inputBuffer = ByteBuffer.wrap(input); + outputBuffer = new TByteArrayOutputStream(1024); + updateKnownMessageSize(input.length); } @Override @@ -56,6 +66,7 @@ public class TMemoryTransport extends TTransport { @Override public int read(byte[] buf, int off, int len) throws TTransportException { + checkReadBytesAvailable(len); int remaining = inputBuffer.remaining(); if (remaining < len) { throw new TTransportException(TTransportException.END_OF_FILE, diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java index 37a66d614..76ed02cbb 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -30,6 +30,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public class TNonblockingSocket extends TNonblockingTransport { private final SocketChannel socketChannel_; - public TNonblockingSocket(String host, int port) throws IOException { + public TNonblockingSocket(String host, int port) throws IOException, TTransportException { this(host, port, 0); } @@ -57,7 +58,7 @@ public class TNonblockingSocket extends TNonblockingTransport { * @param port * @throws IOException */ - public TNonblockingSocket(String host, int port, int timeout) throws IOException { + public TNonblockingSocket(String host, int port, int timeout) throws IOException, TTransportException { this(SocketChannel.open(), timeout, new InetSocketAddress(host, port)); } @@ -67,13 +68,19 @@ public class TNonblockingSocket extends TNonblockingTransport { * @param socketChannel Already created SocketChannel object * @throws IOException if there is an error setting up the streams */ - public TNonblockingSocket(SocketChannel socketChannel) throws IOException { + public TNonblockingSocket(SocketChannel socketChannel) throws IOException, TTransportException { this(socketChannel, 0, null); if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected"); } private TNonblockingSocket(SocketChannel socketChannel, int timeout, SocketAddress socketAddress) - throws IOException { + throws IOException, TTransportException { + this(new TConfiguration(), socketChannel, timeout, socketAddress); + } + + private TNonblockingSocket(TConfiguration config, SocketChannel socketChannel, int timeout, SocketAddress socketAddress) + throws IOException, TTransportException { + super(config); socketChannel_ = socketChannel; socketAddress_ = socketAddress; diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java index 43c130688..255595d6c 100644 --- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -19,13 +19,19 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -public abstract class TNonblockingTransport extends TTransport { +public abstract class TNonblockingTransport extends TEndpointTransport { + + public TNonblockingTransport(TConfiguration config) throws TTransportException { + super(config); + } /** * Non-blocking connection initialization. diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java index 570f53373..3389e4d2a 100644 --- a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java @@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory; * TSocket and TServerSocket */ public class TSSLTransportFactory { - + private static final Logger LOGGER = LoggerFactory.getLogger(TSSLTransportFactory.class); @@ -350,7 +350,7 @@ public class TSSLTransportFactory { } isKeyStoreSet = true; } - + /** * Set the keystore, password, certificate type and the store type * @@ -363,7 +363,7 @@ public class TSSLTransportFactory { this.keyStoreStream = keyStoreStream; setKeyStore("", keyPass, keyManagerType, keyStoreType); } - + /** * Set the keystore and password * @@ -373,7 +373,7 @@ public class TSSLTransportFactory { public void setKeyStore(String keyStore, String keyPass) { setKeyStore(keyStore, keyPass, null, null); } - + /** * Set the keystore and password * @@ -383,7 +383,7 @@ public class TSSLTransportFactory { public void setKeyStore(InputStream keyStoreStream, String keyPass) { setKeyStore(keyStoreStream, keyPass, null, null); } - + /** * Set the truststore, password, certificate type and the store type * @@ -403,7 +403,7 @@ public class TSSLTransportFactory { } isTrustStoreSet = true; } - + /** * Set the truststore, password, certificate type and the store type * @@ -426,7 +426,7 @@ public class TSSLTransportFactory { public void setTrustStore(String trustStore, String trustPass) { setTrustStore(trustStore, trustPass, null, null); } - + /** * Set the truststore and password * diff --git a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java index 5fc7cff9b..e5ca41831 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslClientTransport.java @@ -47,14 +47,14 @@ public class TSaslClientTransport extends TSaslTransport { /** * Uses the given <code>SaslClient</code>. - * + * * @param saslClient * The <code>SaslClient</code> to use for the subsequent SASL * negotiation. * @param transport * Transport underlying this one. */ - public TSaslClientTransport(SaslClient saslClient, TTransport transport) { + public TSaslClientTransport(SaslClient saslClient, TTransport transport) throws TTransportException { super(saslClient, transport); mechanism = saslClient.getMechanismName(); } @@ -63,14 +63,14 @@ public class TSaslClientTransport extends TSaslTransport { * Creates a <code>SaslClient</code> using the given SASL-specific parameters. * See the Java documentation for <code>Sasl.createSaslClient</code> for the * details of the parameters. - * + * * @param transport * The underlying Thrift transport. * @throws SaslException */ public TSaslClientTransport(String mechanism, String authorizationId, String protocol, String serverName, Map<String, String> props, CallbackHandler cbh, TTransport transport) - throws SaslException { + throws SaslException, TTransportException { super(Sasl.createSaslClient(new String[] { mechanism }, authorizationId, protocol, serverName, props, cbh), transport); this.mechanism = mechanism; diff --git a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java index 31f309ef8..9111712a4 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslServerTransport.java @@ -58,7 +58,7 @@ public class TSaslServerTransport extends TSaslTransport { * @param transport * Transport underlying this one. */ - public TSaslServerTransport(TTransport transport) { + public TSaslServerTransport(TTransport transport) throws TTransportException { super(transport); } @@ -71,12 +71,12 @@ public class TSaslServerTransport extends TSaslTransport { * The underlying Thrift transport. */ public TSaslServerTransport(String mechanism, String protocol, String serverName, - Map<String, String> props, CallbackHandler cbh, TTransport transport) { + Map<String, String> props, CallbackHandler cbh, TTransport transport) throws TTransportException { super(transport); addServerDefinition(mechanism, protocol, serverName, props, cbh); } - private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) { + private TSaslServerTransport(Map<String, TSaslServerDefinition> serverDefinitionMap, TTransport transport) throws TTransportException { super(transport); this.serverDefinitionMap.putAll(serverDefinitionMap); } @@ -190,7 +190,7 @@ public class TSaslServerTransport extends TSaslTransport { * receives the same <code>TSaslServerTransport</code>. */ @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { WeakReference<TSaslServerTransport> ret = transportMap.get(base); if (ret == null || ret.get() == null) { LOGGER.debug("transport map does not contain key", base); diff --git a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java index d1a3d3115..b106c7004 100644 --- a/lib/java/src/org/apache/thrift/transport/TSaslTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSaslTransport.java @@ -20,6 +20,7 @@ package org.apache.thrift.transport; import java.nio.charset.StandardCharsets; +import java.util.Objects; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; @@ -28,6 +29,8 @@ import javax.security.sasl.SaslServer; import org.apache.thrift.EncodingUtils; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.layered.TFramedTransport; import org.apache.thrift.transport.sasl.NegotiationStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +39,7 @@ import org.slf4j.LoggerFactory; * A superclass for SASL client/server thrift transports. A subclass need only * implement the <code>open</code> method. */ -abstract class TSaslTransport extends TTransport { +abstract class TSaslTransport extends TEndpointTransport { private static final Logger LOGGER = LoggerFactory.getLogger(TSaslTransport.class); @@ -83,7 +86,8 @@ abstract class TSaslTransport extends TTransport { * @param underlyingTransport * The thrift transport which this transport is wrapping. */ - protected TSaslTransport(TTransport underlyingTransport) { + protected TSaslTransport(TTransport underlyingTransport) throws TTransportException { + super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration()); this.underlyingTransport = underlyingTransport; } @@ -96,7 +100,8 @@ abstract class TSaslTransport extends TTransport { * @param underlyingTransport * The thrift transport which this transport is wrapping. */ - protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) { + protected TSaslTransport(SaslClient saslClient, TTransport underlyingTransport) throws TTransportException { + super(Objects.isNull(underlyingTransport.getConfiguration()) ? new TConfiguration() : underlyingTransport.getConfiguration()); sasl = new SaslParticipant(saslClient); this.underlyingTransport = underlyingTransport; } @@ -151,7 +156,7 @@ abstract class TSaslTransport extends TTransport { } int payloadBytes = EncodingUtils.decodeBigEndian(messageHeader, STATUS_BYTES); - if (payloadBytes < 0 || payloadBytes > 104857600 /* 100 MB */) { + if (payloadBytes < 0 || payloadBytes > getConfiguration().getMaxMessageSize() /* 100 MB */) { throw sendAndThrowMessage( NegotiationStatus.ERROR, "Invalid payload header length: " + payloadBytes); } diff --git a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java index 42102d9e8..c1bbd4853 100644 --- a/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TSimpleFileTransport.java @@ -18,6 +18,8 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.io.RandomAccessFile; @@ -25,26 +27,43 @@ import java.io.RandomAccessFile; /** * Basic file support for the TTransport interface */ -public final class TSimpleFileTransport extends TTransport { +public final class TSimpleFileTransport extends TEndpointTransport { + + private RandomAccessFile file = null; + private boolean readable; + private boolean writable; + private String path_; - private RandomAccessFile file = null; - private boolean readable; - private boolean writable; - private String path_; + /** + * Create a transport backed by a simple file + * + * @param path the path to the file to open/create + * @param read true to support read operations + * @param write true to support write operations + * @param openFile true to open the file on construction + * @throws TTransportException if file open fails + */ + public TSimpleFileTransport(String path, boolean read, + boolean write, boolean openFile) + throws TTransportException { + this(new TConfiguration(), path, read, write, openFile); + } /** - * Create a transport backed by a simple file - * + * Create a transport backed by a simple file + * + * @param config * @param path the path to the file to open/create * @param read true to support read operations * @param write true to support write operations * @param openFile true to open the file on construction * @throws TTransportException if file open fails */ - public TSimpleFileTransport(String path, boolean read, + public TSimpleFileTransport(TConfiguration config, String path, boolean read, boolean write, boolean openFile) throws TTransportException { + super(config); if (path.length() <= 0) { throw new TTransportException("No path specified"); } @@ -58,11 +77,11 @@ public final class TSimpleFileTransport extends TTransport { open(); } } - + /** - * Create a transport backed by a simple file + * Create a transport backed by a simple file * Implicitly opens file to conform to C++ behavior. - * + * * @param path the path to the file to open/create * @param read true to support read operations * @param write true to support write operations @@ -72,7 +91,7 @@ public final class TSimpleFileTransport extends TTransport { throws TTransportException { this(path, read, write, true); } - + /** * Create a transport backed by a simple read only disk file (implicitly opens * file) @@ -95,7 +114,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Open file if not previously opened. + * Open file if not previously opened. * * @throws TTransportException if open fails */ @@ -111,7 +130,7 @@ public final class TSimpleFileTransport extends TTransport { } catch (IOException ioe) { file = null; throw new TTransportException(ioe.getMessage()); - } + } } } @@ -131,7 +150,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Read up to len many bytes into buf at offset + * Read up to len many bytes into buf at offset * * @param buf houses bytes read * @param off offset into buff to begin writing to @@ -144,6 +163,7 @@ public final class TSimpleFileTransport extends TTransport { if (!readable) { throw new TTransportException("Read operation on write only file"); } + checkReadBytesAvailable(len); int iBytesRead = 0; try { iBytesRead = file.read(buf, off, len); @@ -155,7 +175,7 @@ public final class TSimpleFileTransport extends TTransport { } /** - * Write len many bytes from buff starting at offset + * Write len many bytes from buff starting at offset * * @param buf buffer containing bytes to write * @param off offset into buffer to begin writing from @@ -213,4 +233,4 @@ public final class TSimpleFileTransport extends TTransport { throw new TTransportException(ex.getMessage()); } } -}
\ No newline at end of file +} diff --git a/lib/java/src/org/apache/thrift/transport/TSocket.java b/lib/java/src/org/apache/thrift/transport/TSocket.java index b20b32b78..eb73e8e76 100644 --- a/lib/java/src/org/apache/thrift/transport/TSocket.java +++ b/lib/java/src/org/apache/thrift/transport/TSocket.java @@ -19,6 +19,7 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ public class TSocket extends TIOStreamTransport { * @throws TTransportException if there is an error setting up the streams */ public TSocket(Socket socket) throws TTransportException { + super(new TConfiguration()); socket_ = socket; try { socket_.setSoLinger(false, 0); @@ -93,23 +95,36 @@ public class TSocket extends TIOStreamTransport { * Creates a new unconnected socket that will connect to the given host * on the given port. * + * @param config check config * @param host Remote host * @param port Remote port */ - public TSocket(String host, int port) { - this(host, port, 0); + public TSocket(TConfiguration config, String host, int port) throws TTransportException { + this(config, host, port, 0); } /** * Creates a new unconnected socket that will connect to the given host * on the given port. * + * @param host Remote host + * @param port Remote port + */ + public TSocket(String host, int port) throws TTransportException { + this(new TConfiguration(), host, port, 0); + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param config check config * @param host Remote host * @param port Remote port * @param timeout Socket timeout and connection timeout */ - public TSocket(String host, int port, int timeout) { - this(host, port, timeout, timeout); + public TSocket(TConfiguration config, String host, int port, int timeout) throws TTransportException { + this(config, host, port, timeout, timeout); } /** @@ -117,12 +132,14 @@ public class TSocket extends TIOStreamTransport { * on the given port, with a specific connection timeout and a * specific socket timeout. * + * @param config check config * @param host Remote host * @param port Remote port * @param socketTimeout Socket timeout * @param connectTimeout Connection timeout */ - public TSocket(String host, int port, int socketTimeout, int connectTimeout) { + public TSocket(TConfiguration config, String host, int port, int socketTimeout, int connectTimeout) throws TTransportException { + super(config); host_ = host; port_ = port; socketTimeout_ = socketTimeout; diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java index 73ad730ce..5645f7fa1 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -19,6 +19,8 @@ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.Closeable; /** @@ -160,4 +162,10 @@ public abstract class TTransport implements Closeable { * @param len */ public void consumeBuffer(int len) {} + + public abstract TConfiguration getConfiguration(); + + public abstract void updateKnownMessageSize(long size) throws TTransportException; + + public abstract void checkReadBytesAvailable(long numBytes) throws TTransportException; } diff --git a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java index 3e71630ae..e068b4beb 100644 --- a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java +++ b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java @@ -34,7 +34,7 @@ public class TTransportFactory { * @param trans The base transport * @return Wrapped Transport */ - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return trans; } diff --git a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java index e755aa532..73b21aa3f 100644 --- a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java @@ -18,9 +18,12 @@ */ package org.apache.thrift.transport; +import org.apache.thrift.TConfiguration; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Objects; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; @@ -38,7 +41,7 @@ public class TZlibTransport extends TIOStreamTransport { } @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { return new TZlibTransport(base); } } @@ -47,7 +50,7 @@ public class TZlibTransport extends TIOStreamTransport { * Constructs a new TZlibTransport instance. * @param transport the underlying transport to read from and write to */ - public TZlibTransport(TTransport transport) { + public TZlibTransport(TTransport transport) throws TTransportException { this(transport, Deflater.BEST_COMPRESSION); } @@ -56,7 +59,8 @@ public class TZlibTransport extends TIOStreamTransport { * @param transport the underlying transport to read from and write to * @param compressionLevel 0 for no compression, 9 for maximum compression */ - public TZlibTransport(TTransport transport, int compressionLevel) { + public TZlibTransport(TTransport transport, int compressionLevel) throws TTransportException { + super(Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration()); transport_ = transport; inputStream_ = new InflaterInputStream(new TTransportInputStream(transport_), new Inflater()); outputStream_ = new DeflaterOutputStream(new TTransportOutputStream(transport_), new Deflater(compressionLevel, false), true); diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java index a1fd2490a..29bf39c14 100644 --- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/layered/TFastFramedTransport.java @@ -16,7 +16,13 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.thrift.transport; +package org.apache.thrift.transport.layered; + + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.*; + +import java.util.Objects; /** * This transport is wire compatible with {@link TFramedTransport}, but makes @@ -27,18 +33,18 @@ package org.apache.thrift.transport; * * This implementation is NOT threadsafe. */ -public class TFastFramedTransport extends TTransport { +public class TFastFramedTransport extends TLayeredTransport { public static class Factory extends TTransportFactory { private final int initialCapacity; private final int maxLength; public Factory() { - this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); + this(DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } public Factory(int initialCapacity) { - this(initialCapacity, DEFAULT_MAX_LENGTH); + this(initialCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } public Factory(int initialCapacity, int maxLength) { @@ -47,7 +53,7 @@ public class TFastFramedTransport extends TTransport { } @Override - public TTransport getTransport(TTransport trans) { + public TTransport getTransport(TTransport trans) throws TTransportException { return new TFastFramedTransport(trans, initialCapacity, maxLength); @@ -58,12 +64,7 @@ public class TFastFramedTransport extends TTransport { * How big should the default read and write buffers be? */ public static final int DEFAULT_BUF_CAPACITY = 1024; - /** - * How big is the largest allowable frame? Defaults to 16MB. - */ - public static final int DEFAULT_MAX_LENGTH = 16384000; - private final TTransport underlying; private final AutoExpandingBufferWriteTransport writeBuffer; private AutoExpandingBufferReadTransport readBuffer; private final int initialBufferCapacity; @@ -75,8 +76,8 @@ public class TFastFramedTransport extends TTransport { * for initial buffer size and max frame length. * @param underlying Transport that real reads and writes will go through to. */ - public TFastFramedTransport(TTransport underlying) { - this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH); + public TFastFramedTransport(TTransport underlying) throws TTransportException { + this(underlying, DEFAULT_BUF_CAPACITY, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } /** @@ -87,8 +88,8 @@ public class TFastFramedTransport extends TTransport { * In practice, it's not critical to set this unless you know in advance that * your messages are going to be very large. */ - public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) { - this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH); + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) throws TTransportException { + this(underlying, initialBufferCapacity, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } /** @@ -102,27 +103,29 @@ public class TFastFramedTransport extends TTransport { * @param maxLength The max frame size you are willing to read. You can use * this parameter to limit how much memory can be allocated. */ - public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) { - this.underlying = underlying; + public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) throws TTransportException { + super(underlying); + TConfiguration config = Objects.isNull(underlying.getConfiguration()) ? new TConfiguration() : underlying.getConfiguration(); this.maxLength = maxLength; + config.setMaxFrameSize(maxLength); this.initialBufferCapacity = initialBufferCapacity; - readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); - writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4); + readBuffer = new AutoExpandingBufferReadTransport(config, initialBufferCapacity); + writeBuffer = new AutoExpandingBufferWriteTransport(config, initialBufferCapacity, 4); } @Override public void close() { - underlying.close(); + getInnerTransport().close(); } @Override public boolean isOpen() { - return underlying.isOpen(); + return getInnerTransport().isOpen(); } @Override public void open() throws TTransportException { - underlying.open(); + getInnerTransport().open(); } @Override @@ -139,7 +142,7 @@ public class TFastFramedTransport extends TTransport { } private void readFrame() throws TTransportException { - underlying.readAll(i32buf , 0, 4); + getInnerTransport().readAll(i32buf , 0, 4); int size = TFramedTransport.decodeFrameSize(i32buf); if (size < 0) { @@ -147,13 +150,13 @@ public class TFastFramedTransport extends TTransport { throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); } - if (size > maxLength) { + if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) { close(); throw new TTransportException(TTransportException.CORRUPTED_DATA, "Frame size (" + size + ") larger than max length (" + maxLength + ")!"); } - readBuffer.fill(underlying, size); + readBuffer.fill(getInnerTransport(), size); } @Override @@ -169,18 +172,18 @@ public class TFastFramedTransport extends TTransport { /** * Only clears the read buffer! */ - public void clear() { - readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); + public void clear() throws TTransportException { + readBuffer = new AutoExpandingBufferReadTransport(getConfiguration(), initialBufferCapacity); } @Override public void flush() throws TTransportException { - int payloadLength = writeBuffer.getLength() - 4; + int payloadLength = writeBuffer.getLength() - 4; byte[] data = writeBuffer.getBuf().array(); TFramedTransport.encodeFrameSize(payloadLength, data); - underlying.write(data, 0, payloadLength + 4); + getInnerTransport().write(data, 0, payloadLength + 4); writeBuffer.reset(); - underlying.flush(); + getInnerTransport().flush(); } @Override diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java index a006c3a6a..10a9a1c17 100644 --- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/layered/TFramedTransport.java @@ -17,24 +17,22 @@ * under the License. */ -package org.apache.thrift.transport; +package org.apache.thrift.transport.layered; import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TMemoryInputTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; + +import java.util.Objects; /** * TFramedTransport is a buffered TTransport that ensures a fully read message * every time by preceding messages with a 4-byte frame size. */ -public class TFramedTransport extends TTransport { - - protected static final int DEFAULT_MAX_LENGTH = 16384000; - - private int maxLength_; - - /** - * Underlying transport - */ - private TTransport transport_ = null; +public class TFramedTransport extends TLayeredTransport { /** * Buffer for output @@ -45,14 +43,13 @@ public class TFramedTransport extends TTransport { /** * Buffer for input */ - private final TMemoryInputTransport readBuffer_ = - new TMemoryInputTransport(new byte[0]); + private final TMemoryInputTransport readBuffer_; public static class Factory extends TTransportFactory { private int maxLength_; public Factory() { - maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; + maxLength_ = TConfiguration.DEFAULT_MAX_FRAME_SIZE; } public Factory(int maxLength) { @@ -60,7 +57,7 @@ public class TFramedTransport extends TTransport { } @Override - public TTransport getTransport(TTransport base) { + public TTransport getTransport(TTransport base) throws TTransportException { return new TFramedTransport(base, maxLength_); } } @@ -75,28 +72,28 @@ public class TFramedTransport extends TTransport { /** * Constructor wraps around another transport */ - public TFramedTransport(TTransport transport, int maxLength) { - transport_ = transport; - maxLength_ = maxLength; + public TFramedTransport(TTransport transport, int maxLength) throws TTransportException { + super(transport); + TConfiguration _configuration = Objects.isNull(transport.getConfiguration()) ? new TConfiguration() : transport.getConfiguration(); + _configuration.setMaxFrameSize(maxLength); writeBuffer_.write(sizeFiller_, 0, 4); + readBuffer_= new TMemoryInputTransport(_configuration, new byte[0]); } - public TFramedTransport(TTransport transport) { - transport_ = transport; - maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; - writeBuffer_.write(sizeFiller_, 0, 4); + public TFramedTransport(TTransport transport) throws TTransportException { + this(transport, TConfiguration.DEFAULT_MAX_FRAME_SIZE); } public void open() throws TTransportException { - transport_.open(); + getInnerTransport().open(); } public boolean isOpen() { - return transport_.isOpen(); + return getInnerTransport().isOpen(); } public void close() { - transport_.close(); + getInnerTransport().close(); } public int read(byte[] buf, int off, int len) throws TTransportException { @@ -138,7 +135,7 @@ public class TFramedTransport extends TTransport { private final byte[] i32buf = new byte[4]; private void readFrame() throws TTransportException { - transport_.readAll(i32buf, 0, 4); + getInnerTransport().readAll(i32buf, 0, 4); int size = decodeFrameSize(i32buf); if (size < 0) { @@ -146,14 +143,14 @@ public class TFramedTransport extends TTransport { throw new TTransportException(TTransportException.CORRUPTED_DATA, "Read a negative frame size (" + size + ")!"); } - if (size > maxLength_) { + if (size > getInnerTransport().getConfiguration().getMaxFrameSize()) { close(); throw new TTransportException(TTransportException.CORRUPTED_DATA, - "Frame size (" + size + ") larger than max length (" + maxLength_ + ")!"); + "Frame size (" + size + ") larger than max length (" + getInnerTransport().getConfiguration().getMaxFrameSize() + ")!"); } byte[] buff = new byte[size]; - transport_.readAll(buff, 0, size); + getInnerTransport().readAll(buff, 0, size); readBuffer_.reset(buff); } @@ -169,8 +166,8 @@ public class TFramedTransport extends TTransport { writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data encodeFrameSize(len, buf); // this is the frame length without the filler - transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data - transport_.flush(); + getInnerTransport().write(buf, 0, len + 4); // we have to write the frame size and frame data + getInnerTransport().flush(); } public static final void encodeFrameSize(final int frameSize, final byte[] buf) { diff --git a/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java new file mode 100644 index 000000000..69ec824ee --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/layered/TLayeredTransport.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.thrift.transport.layered; + +import org.apache.thrift.TConfiguration; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import java.util.Objects; + +public abstract class TLayeredTransport extends TTransport{ + + private TTransport innerTransport; + + public TConfiguration getConfiguration() { + return innerTransport.getConfiguration(); + } + + public TLayeredTransport(TTransport transport) + { + Objects.requireNonNull(transport, "TTransport cannot be null."); + innerTransport = transport; + } + + public void updateKnownMessageSize(long size) throws TTransportException { + innerTransport.updateKnownMessageSize(size); + } + + public void checkReadBytesAvailable(long numBytes) throws TTransportException { + innerTransport.checkReadBytesAvailable(numBytes); + } + + public TTransport getInnerTransport() { + return innerTransport; + } +} |