diff options
Diffstat (limited to 'lib/java/src/org/apache')
52 files changed, 7460 insertions, 0 deletions
diff --git a/lib/java/src/org/apache/thrift/IntRangeSet.java b/lib/java/src/org/apache/thrift/IntRangeSet.java new file mode 100644 index 000000000..5430134de --- /dev/null +++ b/lib/java/src/org/apache/thrift/IntRangeSet.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * IntRangeSet is a specialized Set<Integer> implementation designed + * specifically to make the generated validate() method calls faster. It groups + * the set values into ranges, and in the contains() call, it does + * num ranges * 2 comparisons max. For the common case, which is a single, + * contiguous range, this approach is about 60% faster than using a HashSet. If + * you had a very ragged value set, like all the odd numbers, for instance, + * then you would end up with pretty poor running time. + */ +public class IntRangeSet implements Set<Integer> { + /** + * This array keeps the bounds of each extent in alternating cells, always + * increasing. Example: [0,5,10,15], which corresponds to 0-5, 10-15. + */ + private int[] extents; + + /** + * We'll keep a duplicate, real HashSet around internally to satisfy some of + * the other set operations. + */ + private Set<Integer> realSet = new HashSet<Integer>(); + + public IntRangeSet(int... values) { + Arrays.sort(values); + + List<Integer> extent_list = new ArrayList<Integer>(); + + int ext_start = values[0]; + int ext_end_so_far = values[0]; + for (int i = 1; i < values.length; i++) { + realSet.add(values[i]); + + if (values[i] == ext_end_so_far + 1) { + // advance the end so far + ext_end_so_far = values[i]; + } else { + // create an extent for everything we saw so far, move on to the next one + extent_list.add(ext_start); + extent_list.add(ext_end_so_far); + ext_start = values[i]; + ext_end_so_far = values[i]; + } + } + extent_list.add(ext_start); + extent_list.add(ext_end_so_far); + + extents = new int[extent_list.size()]; + for (int i = 0; i < extent_list.size(); i++) { + extents[i] = extent_list.get(i); + } + } + + public boolean add(Integer i) { + throw new UnsupportedOperationException(); + } + + public void clear() { + throw new UnsupportedOperationException(); + } + + public boolean addAll(Collection<? extends Integer> arg0) { + throw new UnsupportedOperationException(); + } + + /** + * While this method is here for Set interface compatibility, you should avoid + * using it. It incurs boxing overhead! Use the int method directly, instead. + */ + public boolean contains(Object arg0) { + return contains(((Integer)arg0).intValue()); + } + + /** + * This is much faster, since it doesn't stop at Integer on the way through. + * @param val the value you want to check set membership for + * @return true if val was found, false otherwise + */ + public boolean contains(int val) { + for (int i = 0; i < extents.length / 2; i++) { + if (val < extents[i*2]) { + return false; + } else if (val <= extents[i*2+1]) { + return true; + } + } + + return false; + } + + public boolean containsAll(Collection<?> arg0) { + for (Object o : arg0) { + if (!contains(o)) { + return false; + } + } + return true; + } + + public boolean isEmpty() { + return realSet.isEmpty(); + } + + public Iterator<Integer> iterator() { + return realSet.iterator(); + } + + public boolean remove(Object arg0) { + throw new UnsupportedOperationException(); + } + + public boolean removeAll(Collection<?> arg0) { + throw new UnsupportedOperationException(); + } + + public boolean retainAll(Collection<?> arg0) { + throw new UnsupportedOperationException(); + } + + public int size() { + return realSet.size(); + } + + public Object[] toArray() { + return realSet.toArray(); + } + + public <T> T[] toArray(T[] arg0) { + return realSet.toArray(arg0); + } + + @Override + public String toString() { + String buf = ""; + for (int i = 0; i < extents.length / 2; i++) { + if (i != 0) { + buf += ", "; + } + buf += "[" + extents[i*2] + "," + extents[i*2+1] + "]"; + } + return buf; + } +} diff --git a/lib/java/src/org/apache/thrift/TApplicationException.java b/lib/java/src/org/apache/thrift/TApplicationException.java new file mode 100644 index 000000000..a85e3705e --- /dev/null +++ b/lib/java/src/org/apache/thrift/TApplicationException.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import org.apache.thrift.protocol.TField; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolUtil; +import org.apache.thrift.protocol.TStruct; +import org.apache.thrift.protocol.TType; + +/** + * Application level exception + * + */ +public class TApplicationException extends TException { + + private static final TStruct TAPPLICATION_EXCEPTION_STRUCT = new TStruct("TApplicationException"); + private static final TField MESSAGE_FIELD = new TField("message", TType.STRING, (short)1); + private static final TField TYPE_FIELD = new TField("type", TType.I32, (short)2); + + private static final long serialVersionUID = 1L; + + public static final int UNKNOWN = 0; + public static final int UNKNOWN_METHOD = 1; + public static final int INVALID_MESSAGE_TYPE = 2; + public static final int WRONG_METHOD_NAME = 3; + public static final int BAD_SEQUENCE_ID = 4; + public static final int MISSING_RESULT = 5; + + protected int type_ = UNKNOWN; + + public TApplicationException() { + super(); + } + + public TApplicationException(int type) { + super(); + type_ = type; + } + + public TApplicationException(int type, String message) { + super(message); + type_ = type; + } + + public TApplicationException(String message) { + super(message); + } + + public int getType() { + return type_; + } + + public static TApplicationException read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + + String message = null; + int type = UNKNOWN; + + while (true) { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + case 1: + if (field.type == TType.STRING) { + message = iprot.readString(); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: + if (field.type == TType.I32) { + type = iprot.readI32(); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + break; + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + return new TApplicationException(type, message); + } + + public void write(TProtocol oprot) throws TException { + oprot.writeStructBegin(TAPPLICATION_EXCEPTION_STRUCT); + if (getMessage() != null) { + oprot.writeFieldBegin(MESSAGE_FIELD); + oprot.writeString(getMessage()); + oprot.writeFieldEnd(); + } + oprot.writeFieldBegin(TYPE_FIELD); + oprot.writeI32(type_); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } +} diff --git a/lib/java/src/org/apache/thrift/TBase.java b/lib/java/src/org/apache/thrift/TBase.java new file mode 100644 index 000000000..7c8978a2b --- /dev/null +++ b/lib/java/src/org/apache/thrift/TBase.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import org.apache.thrift.protocol.TProtocol; + +/** + * Generic base interface for generated Thrift objects. + * + */ +public interface TBase extends Cloneable { + + /** + * Reads the TObject from the given input protocol. + * + * @param iprot Input protocol + */ + public void read(TProtocol iprot) throws TException; + + /** + * Writes the objects out to the protocol + * + * @param oprot Output protocol + */ + public void write(TProtocol oprot) throws TException; + + /** + * Check if a field is currently set or unset. + * + * @param fieldId The field's id tag as found in the IDL. + */ + public boolean isSet(int fieldId); + + /** + * Get a field's value by id. Primitive types will be wrapped in the + * appropriate "boxed" types. + * + * @param fieldId The field's id tag as found in the IDL. + */ + public Object getFieldValue(int fieldId); + + /** + * Set a field's value by id. Primitive types must be "boxed" in the + * appropriate object wrapper type. + * + * @param fieldId The field's id tag as found in the IDL. + */ + public void setFieldValue(int fieldId, Object value); +} diff --git a/lib/java/src/org/apache/thrift/TByteArrayOutputStream.java b/lib/java/src/org/apache/thrift/TByteArrayOutputStream.java new file mode 100644 index 000000000..e35fbcb73 --- /dev/null +++ b/lib/java/src/org/apache/thrift/TByteArrayOutputStream.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import java.io.ByteArrayOutputStream; + +/** + * Class that allows access to the underlying buf without doing deep + * copies on it. + * + */ +public class TByteArrayOutputStream extends ByteArrayOutputStream { + public TByteArrayOutputStream(int size) { + super(size); + } + + public TByteArrayOutputStream() { + super(); + } + + + public byte[] get() { + return buf; + } + + public int len() { + return count; + } +} diff --git a/lib/java/src/org/apache/thrift/TDeserializer.java b/lib/java/src/org/apache/thrift/TDeserializer.java new file mode 100644 index 000000000..d6dd5d4b1 --- /dev/null +++ b/lib/java/src/org/apache/thrift/TDeserializer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import java.io.ByteArrayInputStream; +import java.io.UnsupportedEncodingException; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TIOStreamTransport; + +/** + * Generic utility for easily deserializing objects from a byte array or Java + * String. + * + */ +public class TDeserializer { + private final TProtocolFactory protocolFactory_; + + /** + * Create a new TDeserializer that uses the TBinaryProtocol by default. + */ + public TDeserializer() { + this(new TBinaryProtocol.Factory()); + } + + /** + * Create a new TDeserializer. It will use the TProtocol specified by the + * factory that is passed in. + * + * @param protocolFactory Factory to create a protocol + */ + public TDeserializer(TProtocolFactory protocolFactory) { + protocolFactory_ = protocolFactory; + } + + /** + * Deserialize the Thrift object from a byte array. + * + * @param base The object to read into + * @param bytes The array to read from + */ + public void deserialize(TBase base, byte[] bytes) throws TException { + base.read( + protocolFactory_.getProtocol( + new TIOStreamTransport( + new ByteArrayInputStream(bytes)))); + } + + /** + * Deserialize the Thrift object from a Java string, using a specified + * character set for decoding. + * + * @param base The object to read into + * @param data The string to read from + * @param charset Valid JVM charset + */ + public void deserialize(TBase base, String data, String charset) throws TException { + try { + deserialize(base, data.getBytes(charset)); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT ENCODING: " + charset); + } + } + + /** + * Deserialize the Thrift object from a Java string, using the default JVM + * charset encoding. + * + * @param base The object to read into + * @param data The string to read from + */ + public void toString(TBase base, String data) throws TException { + deserialize(base, data.getBytes()); + } +} + diff --git a/lib/java/src/org/apache/thrift/TException.java b/lib/java/src/org/apache/thrift/TException.java new file mode 100644 index 000000000..f84f4812e --- /dev/null +++ b/lib/java/src/org/apache/thrift/TException.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +/** + * Generic exception class for Thrift. + * + */ +public class TException extends Exception { + + private static final long serialVersionUID = 1L; + + public TException() { + super(); + } + + public TException(String message) { + super(message); + } + + public TException(Throwable cause) { + super(cause); + } + + public TException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/lib/java/src/org/apache/thrift/TFieldRequirementType.java b/lib/java/src/org/apache/thrift/TFieldRequirementType.java new file mode 100644 index 000000000..74bac4eff --- /dev/null +++ b/lib/java/src/org/apache/thrift/TFieldRequirementType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +/** + * Requirement type constants. + * + */ +public final class TFieldRequirementType { + public static final byte REQUIRED = 1; + public static final byte OPTIONAL = 2; + public static final byte DEFAULT = 3; +} diff --git a/lib/java/src/org/apache/thrift/TProcessor.java b/lib/java/src/org/apache/thrift/TProcessor.java new file mode 100644 index 000000000..d79522c3e --- /dev/null +++ b/lib/java/src/org/apache/thrift/TProcessor.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import org.apache.thrift.protocol.TProtocol; + +/** + * A processor is a generic object which operates upon an input stream and + * writes to some output stream. + * + */ +public interface TProcessor { + public boolean process(TProtocol in, TProtocol out) + throws TException; +} diff --git a/lib/java/src/org/apache/thrift/TProcessorFactory.java b/lib/java/src/org/apache/thrift/TProcessorFactory.java new file mode 100644 index 000000000..bcd8a38fd --- /dev/null +++ b/lib/java/src/org/apache/thrift/TProcessorFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import org.apache.thrift.transport.TTransport; + +/** + * The default processor factory just returns a singleton + * instance. + */ +public class TProcessorFactory { + + private final TProcessor processor_; + + public TProcessorFactory(TProcessor processor) { + processor_ = processor; + } + + public TProcessor getProcessor(TTransport trans) { + return processor_; + } +} diff --git a/lib/java/src/org/apache/thrift/TSerializer.java b/lib/java/src/org/apache/thrift/TSerializer.java new file mode 100644 index 000000000..4e1ce6129 --- /dev/null +++ b/lib/java/src/org/apache/thrift/TSerializer.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift; + +import java.io.ByteArrayOutputStream; +import java.io.UnsupportedEncodingException; + +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TIOStreamTransport; + +/** + * Generic utility for easily serializing objects into a byte array or Java + * String. + * + */ +public class TSerializer { + + /** + * This is the byte array that data is actually serialized into + */ + private final ByteArrayOutputStream baos_ = new ByteArrayOutputStream(); + + /** + * This transport wraps that byte array + */ + private final TIOStreamTransport transport_ = new TIOStreamTransport(baos_); + + /** + * Internal protocol used for serializing objects. + */ + private TProtocol protocol_; + + /** + * Create a new TSerializer that uses the TBinaryProtocol by default. + */ + public TSerializer() { + this(new TBinaryProtocol.Factory()); + } + + /** + * Create a new TSerializer. It will use the TProtocol specified by the + * factory that is passed in. + * + * @param protocolFactory Factory to create a protocol + */ + public TSerializer(TProtocolFactory protocolFactory) { + protocol_ = protocolFactory.getProtocol(transport_); + } + + /** + * Serialize the Thrift object into a byte array. The process is simple, + * just clear the byte array output, write the object into it, and grab the + * raw bytes. + * + * @param base The object to serialize + * @return Serialized object in byte[] format + */ + public byte[] serialize(TBase base) throws TException { + baos_.reset(); + base.write(protocol_); + return baos_.toByteArray(); + } + + /** + * Serialize the Thrift object into a Java string, using a specified + * character set for encoding. + * + * @param base The object to serialize + * @param charset Valid JVM charset + * @return Serialized object as a String + */ + public String toString(TBase base, String charset) throws TException { + try { + return new String(serialize(base), charset); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT ENCODING: " + charset); + } + } + + /** + * Serialize the Thrift object into a Java string, using the default JVM + * charset encoding. + * + * @param base The object to serialize + * @return Serialized object as a String + */ + public String toString(TBase base) throws TException { + return new String(serialize(base)); + } +} + diff --git a/lib/java/src/org/apache/thrift/meta_data/FieldMetaData.java b/lib/java/src/org/apache/thrift/meta_data/FieldMetaData.java new file mode 100644 index 000000000..3e90a8b9e --- /dev/null +++ b/lib/java/src/org/apache/thrift/meta_data/FieldMetaData.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.meta_data; + +import java.util.HashMap; +import java.util.Map; +import org.apache.thrift.TBase; + +/** + * This class is used to store meta data about thrift fields. Every field in a + * a struct should have a corresponding instance of this class describing it. + * + */ +public class FieldMetaData implements java.io.Serializable { + public final String fieldName; + public final byte requirementType; + public final FieldValueMetaData valueMetaData; + private static Map<Class<? extends TBase>, Map<Integer, FieldMetaData>> structMap; + + static { + structMap = new HashMap<Class<? extends TBase>, Map<Integer, FieldMetaData>>(); + } + + public FieldMetaData(String name, byte req, FieldValueMetaData vMetaData){ + this.fieldName = name; + this.requirementType = req; + this.valueMetaData = vMetaData; + } + + public static void addStructMetaDataMap(Class<? extends TBase> sClass, Map<Integer, FieldMetaData> map){ + structMap.put(sClass, map); + } + + /** + * Returns a map with metadata (i.e. instances of FieldMetaData) that + * describe the fields of the given class. + * + * @param sClass The TBase class for which the metadata map is requested + */ + public static Map<Integer, FieldMetaData> getStructMetaDataMap(Class<? extends TBase> sClass){ + if (!structMap.containsKey(sClass)){ // Load class if it hasn't been loaded + try{ + sClass.newInstance(); + } catch (InstantiationException e){ + throw new RuntimeException("InstantiationException for TBase class: " + sClass.getName() + ", message: " + e.getMessage()); + } catch (IllegalAccessException e){ + throw new RuntimeException("IllegalAccessException for TBase class: " + sClass.getName() + ", message: " + e.getMessage()); + } + } + return structMap.get(sClass); + } +} diff --git a/lib/java/src/org/apache/thrift/meta_data/FieldValueMetaData.java b/lib/java/src/org/apache/thrift/meta_data/FieldValueMetaData.java new file mode 100644 index 000000000..f72da0cd2 --- /dev/null +++ b/lib/java/src/org/apache/thrift/meta_data/FieldValueMetaData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.meta_data; + +import org.apache.thrift.protocol.TType; + +/** + * FieldValueMetaData and collection of subclasses to store metadata about + * the value(s) of a field + */ +public class FieldValueMetaData implements java.io.Serializable { + public final byte type; + + public FieldValueMetaData(byte type){ + this.type = type; + } + + public boolean isStruct() { + return type == TType.STRUCT; + } + + public boolean isContainer() { + return type == TType.LIST || type == TType.MAP || type == TType.SET; + } +} diff --git a/lib/java/src/org/apache/thrift/meta_data/ListMetaData.java b/lib/java/src/org/apache/thrift/meta_data/ListMetaData.java new file mode 100644 index 000000000..8e7073bf5 --- /dev/null +++ b/lib/java/src/org/apache/thrift/meta_data/ListMetaData.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.meta_data; + +public class ListMetaData extends FieldValueMetaData { + public final FieldValueMetaData elemMetaData; + + public ListMetaData(byte type, FieldValueMetaData eMetaData){ + super(type); + this.elemMetaData = eMetaData; + } +} diff --git a/lib/java/src/org/apache/thrift/meta_data/MapMetaData.java b/lib/java/src/org/apache/thrift/meta_data/MapMetaData.java new file mode 100644 index 000000000..e7c408c78 --- /dev/null +++ b/lib/java/src/org/apache/thrift/meta_data/MapMetaData.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.meta_data; + +public class MapMetaData extends FieldValueMetaData { + public final FieldValueMetaData keyMetaData; + public final FieldValueMetaData valueMetaData; + + public MapMetaData(byte type, FieldValueMetaData kMetaData, FieldValueMetaData vMetaData){ + super(type); + this.keyMetaData = kMetaData; + this.valueMetaData = vMetaData; + } +} diff --git a/lib/java/src/org/apache/thrift/meta_data/SetMetaData.java b/lib/java/src/org/apache/thrift/meta_data/SetMetaData.java new file mode 100644 index 000000000..cf4b96aab --- /dev/null +++ b/lib/java/src/org/apache/thrift/meta_data/SetMetaData.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.meta_data; + +public class SetMetaData extends FieldValueMetaData { + public final FieldValueMetaData elemMetaData; + + public SetMetaData(byte type, FieldValueMetaData eMetaData){ + super(type); + this.elemMetaData = eMetaData; + } +} diff --git a/lib/java/src/org/apache/thrift/meta_data/StructMetaData.java b/lib/java/src/org/apache/thrift/meta_data/StructMetaData.java new file mode 100644 index 000000000..b37d21dab --- /dev/null +++ b/lib/java/src/org/apache/thrift/meta_data/StructMetaData.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.meta_data; + +import org.apache.thrift.TBase; + +public class StructMetaData extends FieldValueMetaData { + public final Class<? extends TBase> structClass; + + public StructMetaData(byte type, Class<? extends TBase> sClass){ + super(type); + this.structClass = sClass; + } +} diff --git a/lib/java/src/org/apache/thrift/protocol/TBase64Utils.java b/lib/java/src/org/apache/thrift/protocol/TBase64Utils.java new file mode 100644 index 000000000..37a9fd9f9 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TBase64Utils.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Class for encoding and decoding Base64 data. + * + * This class is kept at package level because the interface does no input + * validation and is therefore too low-level for generalized reuse. + * + * Note also that the encoding does not pad with equal signs , as discussed in + * section 2.2 of the RFC (http://www.faqs.org/rfcs/rfc3548.html). Furthermore, + * bad data encountered when decoding is neither rejected or ignored but simply + * results in bad decoded data -- this is not in compliance with the RFC but is + * done in the interest of performance. + * + */ +class TBase64Utils { + + private static final String ENCODE_TABLE = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + /** + * Encode len bytes of data in src at offset srcOff, storing the result into + * dst at offset dstOff. len must be 1, 2, or 3. dst must have at least len+1 + * bytes of space at dstOff. src and dst should not be the same object. This + * method does no validation of the input values in the interest of + * performance. + * + * @param src the source of bytes to encode + * @param srcOff the offset into the source to read the unencoded bytes + * @param len the number of bytes to encode (must be 1, 2, or 3). + * @param dst the destination for the encoding + * @param dstOff the offset into the destination to place the encoded bytes + */ + static final void encode(byte[] src, int srcOff, int len, byte[] dst, + int dstOff) { + dst[dstOff] = (byte)ENCODE_TABLE.charAt((src[srcOff] >> 2) & 0x3F); + if (len == 3) { + dst[dstOff + 1] = + (byte)ENCODE_TABLE.charAt( + ((src[srcOff] << 4) + (src[srcOff+1] >> 4)) & 0x3F); + dst[dstOff + 2] = + (byte)ENCODE_TABLE.charAt( + ((src[srcOff+1] << 2) + (src[srcOff+2] >> 6)) & 0x3F); + dst[dstOff + 3] = + (byte)ENCODE_TABLE.charAt(src[srcOff+2] & 0x3F); + } + else if (len == 2) { + dst[dstOff+1] = + (byte)ENCODE_TABLE.charAt( + ((src[srcOff] << 4) + (src[srcOff+1] >> 4)) & 0x3F); + dst[dstOff + 2] = + (byte)ENCODE_TABLE.charAt((src[srcOff+1] << 2) & 0x3F); + + } + else { // len == 1) { + dst[dstOff + 1] = + (byte)ENCODE_TABLE.charAt((src[srcOff] << 4) & 0x3F); + } + } + + private static final byte[] DECODE_TABLE = { + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,62,-1,-1,-1,63, + 52,53,54,55,56,57,58,59,60,61,-1,-1,-1,-1,-1,-1, + -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14, + 15,16,17,18,19,20,21,22,23,24,25,-1,-1,-1,-1,-1, + -1,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40, + 41,42,43,44,45,46,47,48,49,50,51,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, + }; + + /** + * Decode len bytes of data in src at offset srcOff, storing the result into + * dst at offset dstOff. len must be 2, 3, or 4. dst must have at least len-1 + * bytes of space at dstOff. src and dst may be the same object as long as + * dstoff <= srcOff. This method does no validation of the input values in + * the interest of performance. + * + * @param src the source of bytes to decode + * @param srcOff the offset into the source to read the encoded bytes + * @param len the number of bytes to decode (must be 2, 3, or 4) + * @param dst the destination for the decoding + * @param dstOff the offset into the destination to place the decoded bytes + */ + static final void decode(byte[] src, int srcOff, int len, byte[] dst, + int dstOff) { + dst[dstOff] = (byte) + ((DECODE_TABLE[src[srcOff] & 0x0FF] << 2) | + (DECODE_TABLE[src[srcOff+1] & 0x0FF] >> 4)); + if (len > 2) { + dst[dstOff+1] = (byte) + (((DECODE_TABLE[src[srcOff+1] & 0x0FF] << 4) & 0xF0) | + (DECODE_TABLE[src[srcOff+2] & 0x0FF] >> 2)); + if (len > 3) { + dst[dstOff+2] = (byte) + (((DECODE_TABLE[src[srcOff+2] & 0x0FF] << 6) & 0xC0) | + DECODE_TABLE[src[srcOff+3] & 0x0FF]); + } + } + } +} diff --git a/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java b/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java new file mode 100644 index 000000000..e9bd8b796 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TBinaryProtocol.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import java.io.UnsupportedEncodingException; + +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; + +/** + * Binary protocol implementation for thrift. + * + */ +public class TBinaryProtocol extends TProtocol { + private static final TStruct ANONYMOUS_STRUCT = new TStruct(); + + protected static final int VERSION_MASK = 0xffff0000; + protected static final int VERSION_1 = 0x80010000; + + protected boolean strictRead_ = false; + protected boolean strictWrite_ = true; + + protected int readLength_; + protected boolean checkReadLength_ = false; + + /** + * Factory + */ + public static class Factory implements TProtocolFactory { + protected boolean strictRead_ = false; + protected boolean strictWrite_ = true; + + public Factory() { + this(false, true); + } + + public Factory(boolean strictRead, boolean strictWrite) { + strictRead_ = strictRead; + strictWrite_ = strictWrite; + } + + public TProtocol getProtocol(TTransport trans) { + return new TBinaryProtocol(trans, strictRead_, strictWrite_); + } + } + + /** + * Constructor + */ + public TBinaryProtocol(TTransport trans) { + this(trans, false, true); + } + + public TBinaryProtocol(TTransport trans, boolean strictRead, boolean strictWrite) { + super(trans); + strictRead_ = strictRead; + strictWrite_ = strictWrite; + } + + public void writeMessageBegin(TMessage message) throws TException { + if (strictWrite_) { + int version = VERSION_1 | message.type; + writeI32(version); + writeString(message.name); + writeI32(message.seqid); + } else { + writeString(message.name); + writeByte(message.type); + writeI32(message.seqid); + } + } + + public void writeMessageEnd() {} + + public void writeStructBegin(TStruct struct) {} + + public void writeStructEnd() {} + + public void writeFieldBegin(TField field) throws TException { + writeByte(field.type); + writeI16(field.id); + } + + public void writeFieldEnd() {} + + public void writeFieldStop() throws TException { + writeByte(TType.STOP); + } + + public void writeMapBegin(TMap map) throws TException { + writeByte(map.keyType); + writeByte(map.valueType); + writeI32(map.size); + } + + public void writeMapEnd() {} + + public void writeListBegin(TList list) throws TException { + writeByte(list.elemType); + writeI32(list.size); + } + + public void writeListEnd() {} + + public void writeSetBegin(TSet set) throws TException { + writeByte(set.elemType); + writeI32(set.size); + } + + public void writeSetEnd() {} + + public void writeBool(boolean b) throws TException { + writeByte(b ? (byte)1 : (byte)0); + } + + private byte [] bout = new byte[1]; + public void writeByte(byte b) throws TException { + bout[0] = b; + trans_.write(bout, 0, 1); + } + + private byte[] i16out = new byte[2]; + public void writeI16(short i16) throws TException { + i16out[0] = (byte)(0xff & (i16 >> 8)); + i16out[1] = (byte)(0xff & (i16)); + trans_.write(i16out, 0, 2); + } + + private byte[] i32out = new byte[4]; + public void writeI32(int i32) throws TException { + i32out[0] = (byte)(0xff & (i32 >> 24)); + i32out[1] = (byte)(0xff & (i32 >> 16)); + i32out[2] = (byte)(0xff & (i32 >> 8)); + i32out[3] = (byte)(0xff & (i32)); + trans_.write(i32out, 0, 4); + } + + private byte[] i64out = new byte[8]; + public void writeI64(long i64) throws TException { + i64out[0] = (byte)(0xff & (i64 >> 56)); + i64out[1] = (byte)(0xff & (i64 >> 48)); + i64out[2] = (byte)(0xff & (i64 >> 40)); + i64out[3] = (byte)(0xff & (i64 >> 32)); + i64out[4] = (byte)(0xff & (i64 >> 24)); + i64out[5] = (byte)(0xff & (i64 >> 16)); + i64out[6] = (byte)(0xff & (i64 >> 8)); + i64out[7] = (byte)(0xff & (i64)); + trans_.write(i64out, 0, 8); + } + + public void writeDouble(double dub) throws TException { + writeI64(Double.doubleToLongBits(dub)); + } + + public void writeString(String str) throws TException { + try { + byte[] dat = str.getBytes("UTF-8"); + writeI32(dat.length); + trans_.write(dat, 0, dat.length); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + + public void writeBinary(byte[] bin) throws TException { + writeI32(bin.length); + trans_.write(bin, 0, bin.length); + } + + /** + * Reading methods. + */ + + public TMessage readMessageBegin() throws TException { + int size = readI32(); + if (size < 0) { + int version = size & VERSION_MASK; + if (version != VERSION_1) { + throw new TProtocolException(TProtocolException.BAD_VERSION, "Bad version in readMessageBegin"); + } + return new TMessage(readString(), (byte)(size & 0x000000ff), readI32()); + } else { + if (strictRead_) { + throw new TProtocolException(TProtocolException.BAD_VERSION, "Missing version in readMessageBegin, old client?"); + } + return new TMessage(readStringBody(size), readByte(), readI32()); + } + } + + public void readMessageEnd() {} + + public TStruct readStructBegin() { + return ANONYMOUS_STRUCT; + } + + public void readStructEnd() {} + + public TField readFieldBegin() throws TException { + byte type = readByte(); + short id = type == TType.STOP ? 0 : readI16(); + return new TField("", type, id); + } + + public void readFieldEnd() {} + + public TMap readMapBegin() throws TException { + return new TMap(readByte(), readByte(), readI32()); + } + + public void readMapEnd() {} + + public TList readListBegin() throws TException { + return new TList(readByte(), readI32()); + } + + public void readListEnd() {} + + public TSet readSetBegin() throws TException { + return new TSet(readByte(), readI32()); + } + + public void readSetEnd() {} + + public boolean readBool() throws TException { + return (readByte() == 1); + } + + private byte[] bin = new byte[1]; + public byte readByte() throws TException { + readAll(bin, 0, 1); + return bin[0]; + } + + private byte[] i16rd = new byte[2]; + public short readI16() throws TException { + readAll(i16rd, 0, 2); + return + (short) + (((i16rd[0] & 0xff) << 8) | + ((i16rd[1] & 0xff))); + } + + private byte[] i32rd = new byte[4]; + public int readI32() throws TException { + readAll(i32rd, 0, 4); + return + ((i32rd[0] & 0xff) << 24) | + ((i32rd[1] & 0xff) << 16) | + ((i32rd[2] & 0xff) << 8) | + ((i32rd[3] & 0xff)); + } + + private byte[] i64rd = new byte[8]; + public long readI64() throws TException { + readAll(i64rd, 0, 8); + return + ((long)(i64rd[0] & 0xff) << 56) | + ((long)(i64rd[1] & 0xff) << 48) | + ((long)(i64rd[2] & 0xff) << 40) | + ((long)(i64rd[3] & 0xff) << 32) | + ((long)(i64rd[4] & 0xff) << 24) | + ((long)(i64rd[5] & 0xff) << 16) | + ((long)(i64rd[6] & 0xff) << 8) | + ((long)(i64rd[7] & 0xff)); + } + + public double readDouble() throws TException { + return Double.longBitsToDouble(readI64()); + } + + public String readString() throws TException { + int size = readI32(); + return readStringBody(size); + } + + public String readStringBody(int size) throws TException { + try { + checkReadLength(size); + byte[] buf = new byte[size]; + trans_.readAll(buf, 0, size); + return new String(buf, "UTF-8"); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + + public byte[] readBinary() throws TException { + int size = readI32(); + checkReadLength(size); + byte[] buf = new byte[size]; + trans_.readAll(buf, 0, size); + return buf; + } + + private int readAll(byte[] buf, int off, int len) throws TException { + checkReadLength(len); + return trans_.readAll(buf, off, len); + } + + public void setReadLength(int readLength) { + readLength_ = readLength; + checkReadLength_ = true; + } + + protected void checkReadLength(int length) throws TException { + if (checkReadLength_) { + readLength_ -= length; + if (readLength_ < 0) { + throw new TException("Message length exceeded: " + length); + } + } + } + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java new file mode 100755 index 000000000..e2d0bfdc1 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TCompactProtocol.java @@ -0,0 +1,741 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.protocol; + +import java.util.Stack; +import java.io.UnsupportedEncodingException; + +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.TException; + +/** + * TCompactProtocol2 is the Java implementation of the compact protocol specified + * in THRIFT-110. The fundamental approach to reducing the overhead of + * structures is a) use variable-length integers all over the place and b) make + * use of unused bits wherever possible. Your savings will obviously vary + * based on the specific makeup of your structs, but in general, the more + * fields, nested structures, short strings and collections, and low-value i32 + * and i64 fields you have, the more benefit you'll see. + */ +public final class TCompactProtocol extends TProtocol { + + private final static TStruct ANONYMOUS_STRUCT = new TStruct(""); + private final static TField TSTOP = new TField("", TType.STOP, (short)0); + + private final static byte[] ttypeToCompactType = new byte[16]; + + static { + ttypeToCompactType[TType.STOP] = TType.STOP; + ttypeToCompactType[TType.BOOL] = Types.BOOLEAN_TRUE; + ttypeToCompactType[TType.BYTE] = Types.BYTE; + ttypeToCompactType[TType.I16] = Types.I16; + ttypeToCompactType[TType.I32] = Types.I32; + ttypeToCompactType[TType.I64] = Types.I64; + ttypeToCompactType[TType.DOUBLE] = Types.DOUBLE; + ttypeToCompactType[TType.STRING] = Types.BINARY; + ttypeToCompactType[TType.LIST] = Types.LIST; + ttypeToCompactType[TType.SET] = Types.SET; + ttypeToCompactType[TType.MAP] = Types.MAP; + ttypeToCompactType[TType.STRUCT] = Types.STRUCT; + } + + /** + * TProtocolFactory that produces TCompactProtocols. + */ + public static class Factory implements TProtocolFactory { + public Factory() {} + + public TProtocol getProtocol(TTransport trans) { + return new TCompactProtocol(trans); + } + } + + private static final byte PROTOCOL_ID = (byte)0x82; + private static final byte VERSION = 1; + private static final byte VERSION_MASK = 0x1f; // 0001 1111 + private static final byte TYPE_MASK = (byte)0xE0; // 1110 0000 + private static final int TYPE_SHIFT_AMOUNT = 5; + + /** + * All of the on-wire type codes. + */ + private static class Types { + public static final byte BOOLEAN_TRUE = 0x01; + public static final byte BOOLEAN_FALSE = 0x02; + public static final byte BYTE = 0x03; + public static final byte I16 = 0x04; + public static final byte I32 = 0x05; + public static final byte I64 = 0x06; + public static final byte DOUBLE = 0x07; + public static final byte BINARY = 0x08; + public static final byte LIST = 0x09; + public static final byte SET = 0x0A; + public static final byte MAP = 0x0B; + public static final byte STRUCT = 0x0C; + } + + /** + * Used to keep track of the last field for the current and previous structs, + * so we can do the delta stuff. + */ + private Stack<Short> lastField_ = new Stack<Short>(); + + private short lastFieldId_ = 0; + + /** + * If we encounter a boolean field begin, save the TField here so it can + * have the value incorporated. + */ + private TField booleanField_ = null; + + /** + * If we read a field header, and it's a boolean field, save the boolean + * value here so that readBool can use it. + */ + private Boolean boolValue_ = null; + + /** + * Create a TCompactProtocol. + * + * @param transport the TTransport object to read from or write to. + */ + public TCompactProtocol(TTransport transport) { + super(transport); + } + + + // + // Public Writing methods. + // + + /** + * Write a message header to the wire. Compact Protocol messages contain the + * protocol version so we can migrate forwards in the future if need be. + */ + public void writeMessageBegin(TMessage message) throws TException { + writeByteDirect(PROTOCOL_ID); + writeByteDirect((VERSION & VERSION_MASK) | ((message.type << TYPE_SHIFT_AMOUNT) & TYPE_MASK)); + writeVarint32(message.seqid); + writeString(message.name); + } + + /** + * Write a struct begin. This doesn't actually put anything on the wire. We + * use it as an opportunity to put special placeholder markers on the field + * stack so we can get the field id deltas correct. + */ + public void writeStructBegin(TStruct struct) throws TException { + lastField_.push(lastFieldId_); + lastFieldId_ = 0; + } + + /** + * Write a struct end. This doesn't actually put anything on the wire. We use + * this as an opportunity to pop the last field from the current struct off + * of the field stack. + */ + public void writeStructEnd() throws TException { + lastFieldId_ = lastField_.pop(); + } + + /** + * Write a field header containing the field id and field type. If the + * difference between the current field id and the last one is small (< 15), + * then the field id will be encoded in the 4 MSB as a delta. Otherwise, the + * field id will follow the type header as a zigzag varint. + */ + public void writeFieldBegin(TField field) throws TException { + if (field.type == TType.BOOL) { + // we want to possibly include the value, so we'll wait. + booleanField_ = field; + } else { + writeFieldBeginInternal(field, (byte)-1); + } + } + + /** + * The workhorse of writeFieldBegin. It has the option of doing a + * 'type override' of the type header. This is used specifically in the + * boolean field case. + */ + private void writeFieldBeginInternal(TField field, byte typeOverride) throws TException { + // short lastField = lastField_.pop(); + + // if there's a type override, use that. + byte typeToWrite = typeOverride == -1 ? getCompactType(field.type) : typeOverride; + + // check if we can use delta encoding for the field id + if (field.id > lastFieldId_ && field.id - lastFieldId_ <= 15) { + // write them together + writeByteDirect((field.id - lastFieldId_) << 4 | typeToWrite); + } else { + // write them separate + writeByteDirect(typeToWrite); + writeI16(field.id); + } + + lastFieldId_ = field.id; + // lastField_.push(field.id); + } + + /** + * Write the STOP symbol so we know there are no more fields in this struct. + */ + public void writeFieldStop() throws TException { + writeByteDirect(TType.STOP); + } + + /** + * Write a map header. If the map is empty, omit the key and value type + * headers, as we don't need any additional information to skip it. + */ + public void writeMapBegin(TMap map) throws TException { + if (map.size == 0) { + writeByteDirect(0); + } else { + writeVarint32(map.size); + writeByteDirect(getCompactType(map.keyType) << 4 | getCompactType(map.valueType)); + } + } + + /** + * Write a list header. + */ + public void writeListBegin(TList list) throws TException { + writeCollectionBegin(list.elemType, list.size); + } + + /** + * Write a set header. + */ + public void writeSetBegin(TSet set) throws TException { + writeCollectionBegin(set.elemType, set.size); + } + + /** + * Write a boolean value. Potentially, this could be a boolean field, in + * which case the field header info isn't written yet. If so, decide what the + * right type header is for the value and then write the field header. + * Otherwise, write a single byte. + */ + public void writeBool(boolean b) throws TException { + if (booleanField_ != null) { + // we haven't written the field header yet + writeFieldBeginInternal(booleanField_, b ? Types.BOOLEAN_TRUE : Types.BOOLEAN_FALSE); + booleanField_ = null; + } else { + // we're not part of a field, so just write the value. + writeByteDirect(b ? Types.BOOLEAN_TRUE : Types.BOOLEAN_FALSE); + } + } + + /** + * Write a byte. Nothing to see here! + */ + public void writeByte(byte b) throws TException { + writeByteDirect(b); + } + + /** + * Write an I16 as a zigzag varint. + */ + public void writeI16(short i16) throws TException { + writeVarint32(intToZigZag(i16)); + } + + /** + * Write an i32 as a zigzag varint. + */ + public void writeI32(int i32) throws TException { + writeVarint32(intToZigZag(i32)); + } + + /** + * Write an i64 as a zigzag varint. + */ + public void writeI64(long i64) throws TException { + writeVarint64(longToZigzag(i64)); + } + + /** + * Write a double to the wire as 8 bytes. + */ + public void writeDouble(double dub) throws TException { + byte[] data = new byte[]{0, 0, 0, 0, 0, 0, 0, 0}; + fixedLongToBytes(Double.doubleToLongBits(dub), data, 0); + trans_.write(data); + } + + /** + * Write a string to the wire with a varint size preceeding. + */ + public void writeString(String str) throws TException { + try { + writeBinary(str.getBytes("UTF-8")); + } catch (UnsupportedEncodingException e) { + throw new TException("UTF-8 not supported!"); + } + } + + /** + * Write a byte array, using a varint for the size. + */ + public void writeBinary(byte[] bin) throws TException { + writeVarint32(bin.length); + trans_.write(bin); + } + + // + // These methods are called by structs, but don't actually have any wire + // output or purpose. + // + + public void writeMessageEnd() throws TException {} + public void writeMapEnd() throws TException {} + public void writeListEnd() throws TException {} + public void writeSetEnd() throws TException {} + public void writeFieldEnd() throws TException {} + + // + // Internal writing methods + // + + /** + * Abstract method for writing the start of lists and sets. List and sets on + * the wire differ only by the type indicator. + */ + protected void writeCollectionBegin(byte elemType, int size) throws TException { + if (size <= 14) { + writeByteDirect(size << 4 | getCompactType(elemType)); + } else { + writeByteDirect(0xf0 | getCompactType(elemType)); + writeVarint32(size); + } + } + + /** + * Write an i32 as a varint. Results in 1-5 bytes on the wire. + * TODO: make a permanent buffer like writeVarint64? + */ + byte[] i32buf = new byte[5]; + private void writeVarint32(int n) throws TException { + int idx = 0; + while (true) { + if ((n & ~0x7F) == 0) { + i32buf[idx++] = (byte)n; + // writeByteDirect((byte)n); + break; + // return; + } else { + i32buf[idx++] = (byte)((n & 0x7F) | 0x80); + // writeByteDirect((byte)((n & 0x7F) | 0x80)); + n >>>= 7; + } + } + trans_.write(i32buf, 0, idx); + } + + /** + * Write an i64 as a varint. Results in 1-10 bytes on the wire. + */ + byte[] varint64out = new byte[10]; + private void writeVarint64(long n) throws TException { + int idx = 0; + while (true) { + if ((n & ~0x7FL) == 0) { + varint64out[idx++] = (byte)n; + break; + } else { + varint64out[idx++] = ((byte)((n & 0x7F) | 0x80)); + n >>>= 7; + } + } + trans_.write(varint64out, 0, idx); + } + + /** + * Convert l into a zigzag long. This allows negative numbers to be + * represented compactly as a varint. + */ + private long longToZigzag(long l) { + return (l << 1) ^ (l >> 63); + } + + /** + * Convert n into a zigzag int. This allows negative numbers to be + * represented compactly as a varint. + */ + private int intToZigZag(int n) { + return (n << 1) ^ (n >> 31); + } + + /** + * Convert a long into little-endian bytes in buf starting at off and going + * until off+7. + */ + private void fixedLongToBytes(long n, byte[] buf, int off) { + buf[off+0] = (byte)( n & 0xff); + buf[off+1] = (byte)((n >> 8 ) & 0xff); + buf[off+2] = (byte)((n >> 16) & 0xff); + buf[off+3] = (byte)((n >> 24) & 0xff); + buf[off+4] = (byte)((n >> 32) & 0xff); + buf[off+5] = (byte)((n >> 40) & 0xff); + buf[off+6] = (byte)((n >> 48) & 0xff); + buf[off+7] = (byte)((n >> 56) & 0xff); + } + + /** + * Writes a byte without any possiblity of all that field header nonsense. + * Used internally by other writing methods that know they need to write a byte. + */ + private byte[] byteDirectBuffer = new byte[1]; + private void writeByteDirect(byte b) throws TException { + byteDirectBuffer[0] = b; + trans_.write(byteDirectBuffer); + } + + /** + * Writes a byte without any possiblity of all that field header nonsense. + */ + private void writeByteDirect(int n) throws TException { + writeByteDirect((byte)n); + } + + + // + // Reading methods. + // + + /** + * Read a message header. + */ + public TMessage readMessageBegin() throws TException { + byte protocolId = readByte(); + if (protocolId != PROTOCOL_ID) { + throw new TProtocolException("Expected protocol id " + Integer.toHexString(PROTOCOL_ID) + " but got " + Integer.toHexString(protocolId)); + } + byte versionAndType = readByte(); + byte version = (byte)(versionAndType & VERSION_MASK); + if (version != VERSION) { + throw new TProtocolException("Expected version " + VERSION + " but got " + version); + } + byte type = (byte)((versionAndType >> TYPE_SHIFT_AMOUNT) & 0x03); + int seqid = readVarint32(); + String messageName = readString(); + return new TMessage(messageName, type, seqid); + } + + /** + * Read a struct begin. There's nothing on the wire for this, but it is our + * opportunity to push a new struct begin marker onto the field stack. + */ + public TStruct readStructBegin() throws TException { + lastField_.push(lastFieldId_); + lastFieldId_ = 0; + return ANONYMOUS_STRUCT; + } + + /** + * Doesn't actually consume any wire data, just removes the last field for + * this struct from the field stack. + */ + public void readStructEnd() throws TException { + // consume the last field we read off the wire. + lastFieldId_ = lastField_.pop(); + } + + /** + * Read a field header off the wire. + */ + public TField readFieldBegin() throws TException { + byte type = readByte(); + + // if it's a stop, then we can return immediately, as the struct is over. + if ((type & 0x0f) == TType.STOP) { + return TSTOP; + } + + short fieldId; + + // mask off the 4 MSB of the type header. it could contain a field id delta. + short modifier = (short)((type & 0xf0) >> 4); + if (modifier == 0) { + // not a delta. look ahead for the zigzag varint field id. + fieldId = readI16(); + } else { + // has a delta. add the delta to the last read field id. + fieldId = (short)(lastFieldId_ + modifier); + } + + TField field = new TField("", getTType((byte)(type & 0x0f)), fieldId); + + // if this happens to be a boolean field, the value is encoded in the type + if (isBoolType(type)) { + // save the boolean value in a special instance variable. + boolValue_ = (byte)(type & 0x0f) == Types.BOOLEAN_TRUE ? Boolean.TRUE : Boolean.FALSE; + } + + // push the new field onto the field stack so we can keep the deltas going. + lastFieldId_ = field.id; + return field; + } + + /** + * Read a map header off the wire. If the size is zero, skip reading the key + * and value type. This means that 0-length maps will yield TMaps without the + * "correct" types. + */ + public TMap readMapBegin() throws TException { + int size = readVarint32(); + byte keyAndValueType = size == 0 ? 0 : readByte(); + return new TMap(getTType((byte)(keyAndValueType >> 4)), getTType((byte)(keyAndValueType & 0xf)), size); + } + + /** + * Read a list header off the wire. If the list size is 0-14, the size will + * be packed into the element type header. If it's a longer list, the 4 MSB + * of the element type header will be 0xF, and a varint will follow with the + * true size. + */ + public TList readListBegin() throws TException { + byte size_and_type = readByte(); + int size = (size_and_type >> 4) & 0x0f; + if (size == 15) { + size = readVarint32(); + } + byte type = getTType(size_and_type); + return new TList(type, size); + } + + /** + * Read a set header off the wire. If the set size is 0-14, the size will + * be packed into the element type header. If it's a longer set, the 4 MSB + * of the element type header will be 0xF, and a varint will follow with the + * true size. + */ + public TSet readSetBegin() throws TException { + return new TSet(readListBegin()); + } + + /** + * Read a boolean off the wire. If this is a boolean field, the value should + * already have been read during readFieldBegin, so we'll just consume the + * pre-stored value. Otherwise, read a byte. + */ + public boolean readBool() throws TException { + if (boolValue_ != null) { + boolean result = boolValue_.booleanValue(); + boolValue_ = null; + return result; + } + return readByte() == Types.BOOLEAN_TRUE; + } + + byte[] byteRawBuf = new byte[1]; + /** + * Read a single byte off the wire. Nothing interesting here. + */ + public byte readByte() throws TException { + trans_.read(byteRawBuf, 0, 1); + return byteRawBuf[0]; + } + + /** + * Read an i16 from the wire as a zigzag varint. + */ + public short readI16() throws TException { + return (short)zigzagToInt(readVarint32()); + } + + /** + * Read an i32 from the wire as a zigzag varint. + */ + public int readI32() throws TException { + return zigzagToInt(readVarint32()); + } + + /** + * Read an i64 from the wire as a zigzag varint. + */ + public long readI64() throws TException { + return zigzagToLong(readVarint64()); + } + + /** + * No magic here - just read a double off the wire. + */ + public double readDouble() throws TException { + byte[] longBits = new byte[8]; + trans_.read(longBits, 0, 8); + return Double.longBitsToDouble(bytesToLong(longBits)); + } + + /** + * Reads a byte[] (via readBinary), and then UTF-8 decodes it. + */ + public String readString() throws TException { + try { + return new String(readBinary(), "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new TException("UTF-8 not supported!"); + } + } + + /** + * Read a byte[] from the wire. + */ + public byte[] readBinary() throws TException { + int length = readVarint32(); + if (length == 0) return new byte[0]; + + byte[] buf = new byte[length]; + trans_.read(buf, 0, length); + return buf; + } + + + // + // These methods are here for the struct to call, but don't have any wire + // encoding. + // + public void readMessageEnd() throws TException {} + public void readFieldEnd() throws TException {} + public void readMapEnd() throws TException {} + public void readListEnd() throws TException {} + public void readSetEnd() throws TException {} + + // + // Internal reading methods + // + + /** + * Read an i32 from the wire as a varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 5 bytes. + */ + private int readVarint32() throws TException { + // if the wire contains the right stuff, this will just truncate the i64 we + // read and get us the right sign. + return (int)readVarint64(); + } + + /** + * Read an i64 from the wire as a proper varint. The MSB of each byte is set + * if there is another byte to follow. This can read up to 10 bytes. + */ + private long readVarint64() throws TException { + int shift = 0; + long result = 0; + while (true) { + byte b = readByte(); + result |= (long) (b & 0x7f) << shift; + if ((b & 0x80) != 0x80) break; + shift +=7; + } + return result; + } + + // + // encoding helpers + // + + /** + * Convert from zigzag int to int. + */ + private int zigzagToInt(int n) { + return (n >>> 1) ^ -(n & 1); + } + + /** + * Convert from zigzag long to long. + */ + private long zigzagToLong(long n) { + return (n >>> 1) ^ -(n & 1); + } + + /** + * Note that it's important that the mask bytes are long literals, + * otherwise they'll default to ints, and when you shift an int left 56 bits, + * you just get a messed up int. + */ + private long bytesToLong(byte[] bytes) { + return + ((bytes[7] & 0xffL) << 56) | + ((bytes[6] & 0xffL) << 48) | + ((bytes[5] & 0xffL) << 40) | + ((bytes[4] & 0xffL) << 32) | + ((bytes[3] & 0xffL) << 24) | + ((bytes[2] & 0xffL) << 16) | + ((bytes[1] & 0xffL) << 8) | + ((bytes[0] & 0xffL)); + } + + // + // type testing and converting + // + + private boolean isBoolType(byte b) { + return (b & 0x0f) == Types.BOOLEAN_TRUE || (b & 0x0f) == Types.BOOLEAN_FALSE; + } + + /** + * Given a TCompactProtocol.Types constant, convert it to its corresponding + * TType value. + */ + private byte getTType(byte type) { + switch ((byte)(type & 0x0f)) { + case TType.STOP: + return TType.STOP; + case Types.BOOLEAN_FALSE: + case Types.BOOLEAN_TRUE: + return TType.BOOL; + case Types.BYTE: + return TType.BYTE; + case Types.I16: + return TType.I16; + case Types.I32: + return TType.I32; + case Types.I64: + return TType.I64; + case Types.DOUBLE: + return TType.DOUBLE; + case Types.BINARY: + return TType.STRING; + case Types.LIST: + return TType.LIST; + case Types.SET: + return TType.SET; + case Types.MAP: + return TType.MAP; + case Types.STRUCT: + return TType.STRUCT; + default: + throw new RuntimeException("don't know what type: " + (byte)(type & 0x0f)); + } + } + + /** + * Given a TType value, find the appropriate TCompactProtocol.Types constant. + */ + private byte getCompactType(byte ttype) { + return ttypeToCompactType[ttype]; + } + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TField.java b/lib/java/src/org/apache/thrift/protocol/TField.java new file mode 100644 index 000000000..03affdaa1 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TField.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Helper class that encapsulates field metadata. + * + */ +public class TField { + public TField() { + this("", TType.STOP, (short)0); + } + + public TField(String n, byte t, short i) { + name = n; + type = t; + id = i; + } + + public final String name; + public final byte type; + public final short id; + + public String toString() { + return "<TField name:'" + name + "' type:" + type + " field-id:" + id + ">"; + } + + public boolean equals(TField otherField) { + return type == otherField.type && id == otherField.id; + } +} diff --git a/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java b/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java new file mode 100644 index 000000000..631c6a5b5 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TJSONProtocol.java @@ -0,0 +1,927 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import org.apache.thrift.TException; +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.transport.TTransport; +import java.io.UnsupportedEncodingException; +import java.util.Stack; + +/** + * JSON protocol implementation for thrift. + * + * This is a full-featured protocol supporting write and read. + * + * Please see the C++ class header for a detailed description of the + * protocol's wire format. + * + */ +public class TJSONProtocol extends TProtocol { + + /** + * Factory for JSON protocol objects + */ + public static class Factory implements TProtocolFactory { + + public TProtocol getProtocol(TTransport trans) { + return new TJSONProtocol(trans); + } + + } + + private static final byte[] COMMA = new byte[] {','}; + private static final byte[] COLON = new byte[] {':'}; + private static final byte[] LBRACE = new byte[] {'{'}; + private static final byte[] RBRACE = new byte[] {'}'}; + private static final byte[] LBRACKET = new byte[] {'['}; + private static final byte[] RBRACKET = new byte[] {']'}; + private static final byte[] QUOTE = new byte[] {'"'}; + private static final byte[] BACKSLASH = new byte[] {'\\'}; + private static final byte[] ZERO = new byte[] {'0'}; + + private static final byte[] ESCSEQ = new byte[] {'\\','u','0','0'}; + + private static final long VERSION = 1; + + private static final byte[] JSON_CHAR_TABLE = { + /* 0 1 2 3 4 5 6 7 8 9 A B C D E F */ + 0, 0, 0, 0, 0, 0, 0, 0,'b','t','n', 0,'f','r', 0, 0, // 0 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // 1 + 1, 1,'"', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, // 2 + }; + + private static final String ESCAPE_CHARS = "\"\\bfnrt"; + + private static final byte[] ESCAPE_CHAR_VALS = { + '"', '\\', '\b', '\f', '\n', '\r', '\t', + }; + + private static final int DEF_STRING_SIZE = 16; + + private static final byte[] NAME_BOOL = new byte[] {'t', 'f'}; + private static final byte[] NAME_BYTE = new byte[] {'i','8'}; + private static final byte[] NAME_I16 = new byte[] {'i','1','6'}; + private static final byte[] NAME_I32 = new byte[] {'i','3','2'}; + private static final byte[] NAME_I64 = new byte[] {'i','6','4'}; + private static final byte[] NAME_DOUBLE = new byte[] {'d','b','l'}; + private static final byte[] NAME_STRUCT = new byte[] {'r','e','c'}; + private static final byte[] NAME_STRING = new byte[] {'s','t','r'}; + private static final byte[] NAME_MAP = new byte[] {'m','a','p'}; + private static final byte[] NAME_LIST = new byte[] {'l','s','t'}; + private static final byte[] NAME_SET = new byte[] {'s','e','t'}; + + private static final TStruct ANONYMOUS_STRUCT = new TStruct(); + + private static final byte[] getTypeNameForTypeID(byte typeID) + throws TException { + switch (typeID) { + case TType.BOOL: + return NAME_BOOL; + case TType.BYTE: + return NAME_BYTE; + case TType.I16: + return NAME_I16; + case TType.I32: + return NAME_I32; + case TType.I64: + return NAME_I64; + case TType.DOUBLE: + return NAME_DOUBLE; + case TType.STRING: + return NAME_STRING; + case TType.STRUCT: + return NAME_STRUCT; + case TType.MAP: + return NAME_MAP; + case TType.SET: + return NAME_SET; + case TType.LIST: + return NAME_LIST; + default: + throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, + "Unrecognized type"); + } + } + + private static final byte getTypeIDForTypeName(byte[] name) + throws TException { + byte result = TType.STOP; + if (name.length > 1) { + switch (name[0]) { + case 'd': + result = TType.DOUBLE; + break; + case 'i': + switch (name[1]) { + case '8': + result = TType.BYTE; + break; + case '1': + result = TType.I16; + break; + case '3': + result = TType.I32; + break; + case '6': + result = TType.I64; + break; + } + break; + case 'l': + result = TType.LIST; + break; + case 'm': + result = TType.MAP; + break; + case 'r': + result = TType.STRUCT; + break; + case 's': + if (name[1] == 't') { + result = TType.STRING; + } + else if (name[1] == 'e') { + result = TType.SET; + } + break; + case 't': + result = TType.BOOL; + break; + } + } + if (result == TType.STOP) { + throw new TProtocolException(TProtocolException.NOT_IMPLEMENTED, + "Unrecognized type"); + } + return result; + } + + // Base class for tracking JSON contexts that may require inserting/reading + // additional JSON syntax characters + // This base context does nothing. + protected class JSONBaseContext { + protected void write() throws TException {} + + protected void read() throws TException {} + + protected boolean escapeNum() { return false; } + } + + // Context for JSON lists. Will insert/read commas before each item except + // for the first one + protected class JSONListContext extends JSONBaseContext { + private boolean first_ = true; + + @Override + protected void write() throws TException { + if (first_) { + first_ = false; + } else { + trans_.write(COMMA); + } + } + + @Override + protected void read() throws TException { + if (first_) { + first_ = false; + } else { + readJSONSyntaxChar(COMMA); + } + } + } + + // Context for JSON records. Will insert/read colons before the value portion + // of each record pair, and commas before each key except the first. In + // addition, will indicate that numbers in the key position need to be + // escaped in quotes (since JSON keys must be strings). + protected class JSONPairContext extends JSONBaseContext { + private boolean first_ = true; + private boolean colon_ = true; + + @Override + protected void write() throws TException { + if (first_) { + first_ = false; + colon_ = true; + } else { + trans_.write(colon_ ? COLON : COMMA); + colon_ = !colon_; + } + } + + @Override + protected void read() throws TException { + if (first_) { + first_ = false; + colon_ = true; + } else { + readJSONSyntaxChar(colon_ ? COLON : COMMA); + colon_ = !colon_; + } + } + + @Override + protected boolean escapeNum() { + return colon_; + } + } + + // Holds up to one byte from the transport + protected class LookaheadReader { + + private boolean hasData_; + private byte[] data_ = new byte[1]; + + // Return and consume the next byte to be read, either taking it from the + // data buffer if present or getting it from the transport otherwise. + protected byte read() throws TException { + if (hasData_) { + hasData_ = false; + } + else { + trans_.readAll(data_, 0, 1); + } + return data_[0]; + } + + // Return the next byte to be read without consuming, filling the data + // buffer if it has not been filled already. + protected byte peek() throws TException { + if (!hasData_) { + trans_.readAll(data_, 0, 1); + } + hasData_ = true; + return data_[0]; + } + } + + // Stack of nested contexts that we may be in + private Stack<JSONBaseContext> contextStack_ = new Stack<JSONBaseContext>(); + + // Current context that we are in + private JSONBaseContext context_ = new JSONBaseContext(); + + // Reader that manages a 1-byte buffer + private LookaheadReader reader_ = new LookaheadReader(); + + // Push a new JSON context onto the stack. + private void pushContext(JSONBaseContext c) { + contextStack_.push(context_); + context_ = c; + } + + // Pop the last JSON context off the stack + private void popContext() { + context_ = contextStack_.pop(); + } + + /** + * Constructor + */ + public TJSONProtocol(TTransport trans) { + super(trans); + } + + // Temporary buffer used by several methods + private byte[] tmpbuf_ = new byte[4]; + + // Read a byte that must match b[0]; otherwise an excpetion is thrown. + // Marked protected to avoid synthetic accessor in JSONListContext.read + // and JSONPairContext.read + protected void readJSONSyntaxChar(byte[] b) throws TException { + byte ch = reader_.read(); + if (ch != b[0]) { + throw new TProtocolException(TProtocolException.INVALID_DATA, + "Unexpected character:" + (char)ch); + } + } + + // Convert a byte containing a hex char ('0'-'9' or 'a'-'f') into its + // corresponding hex value + private static final byte hexVal(byte ch) throws TException { + if ((ch >= '0') && (ch <= '9')) { + return (byte)((char)ch - '0'); + } + else if ((ch >= 'a') && (ch <= 'f')) { + return (byte)((char)ch - 'a'); + } + else { + throw new TProtocolException(TProtocolException.INVALID_DATA, + "Expected hex character"); + } + } + + // Convert a byte containing a hex value to its corresponding hex character + private static final byte hexChar(byte val) { + val &= 0x0F; + if (val < 10) { + return (byte)((char)val + '0'); + } + else { + return (byte)((char)val + 'a'); + } + } + + // Write the bytes in array buf as a JSON characters, escaping as needed + private void writeJSONString(byte[] b) throws TException { + context_.write(); + trans_.write(QUOTE); + int len = b.length; + for (int i = 0; i < len; i++) { + if ((b[i] & 0x00FF) >= 0x30) { + if (b[i] == BACKSLASH[0]) { + trans_.write(BACKSLASH); + trans_.write(BACKSLASH); + } + else { + trans_.write(b, i, 1); + } + } + else { + tmpbuf_[0] = JSON_CHAR_TABLE[b[i]]; + if (tmpbuf_[0] == 1) { + trans_.write(b, i, 1); + } + else if (tmpbuf_[0] > 1) { + trans_.write(BACKSLASH); + trans_.write(tmpbuf_, 0, 1); + } + else { + trans_.write(ESCSEQ); + tmpbuf_[0] = hexChar((byte)(b[i] >> 4)); + tmpbuf_[1] = hexChar(b[i]); + trans_.write(tmpbuf_, 0, 2); + } + } + } + trans_.write(QUOTE); + } + + // Write out number as a JSON value. If the context dictates so, it will be + // wrapped in quotes to output as a JSON string. + private void writeJSONInteger(long num) throws TException { + context_.write(); + String str = Long.toString(num); + boolean escapeNum = context_.escapeNum(); + if (escapeNum) { + trans_.write(QUOTE); + } + try { + byte[] buf = str.getBytes("UTF-8"); + trans_.write(buf); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + if (escapeNum) { + trans_.write(QUOTE); + } + } + + // Write out a double as a JSON value. If it is NaN or infinity or if the + // context dictates escaping, write out as JSON string. + private void writeJSONDouble(double num) throws TException { + context_.write(); + String str = Double.toString(num); + boolean special = false; + switch (str.charAt(0)) { + case 'N': // NaN + case 'I': // Infinity + special = true; + break; + case '-': + if (str.charAt(1) == 'I') { // -Infinity + special = true; + } + break; + } + + boolean escapeNum = special || context_.escapeNum(); + if (escapeNum) { + trans_.write(QUOTE); + } + try { + byte[] b = str.getBytes("UTF-8"); + trans_.write(b, 0, b.length); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + if (escapeNum) { + trans_.write(QUOTE); + } + } + + // Write out contents of byte array b as a JSON string with base-64 encoded + // data + private void writeJSONBase64(byte[] b) throws TException { + context_.write(); + trans_.write(QUOTE); + int len = b.length; + int off = 0; + while (len >= 3) { + // Encode 3 bytes at a time + TBase64Utils.encode(b, off, 3, tmpbuf_, 0); + trans_.write(tmpbuf_, 0, 4); + off += 3; + len -= 3; + } + if (len > 0) { + // Encode remainder + TBase64Utils.encode(b, off, len, tmpbuf_, 0); + trans_.write(tmpbuf_, 0, len + 1); + } + trans_.write(QUOTE); + } + + private void writeJSONObjectStart() throws TException { + context_.write(); + trans_.write(LBRACE); + pushContext(new JSONPairContext()); + } + + private void writeJSONObjectEnd() throws TException { + popContext(); + trans_.write(RBRACE); + } + + private void writeJSONArrayStart() throws TException { + context_.write(); + trans_.write(LBRACKET); + pushContext(new JSONListContext()); + } + + private void writeJSONArrayEnd() throws TException { + popContext(); + trans_.write(RBRACKET); + } + + @Override + public void writeMessageBegin(TMessage message) throws TException { + writeJSONArrayStart(); + writeJSONInteger(VERSION); + try { + byte[] b = message.name.getBytes("UTF-8"); + writeJSONString(b); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + writeJSONInteger(message.type); + writeJSONInteger(message.seqid); + } + + @Override + public void writeMessageEnd() throws TException { + writeJSONArrayEnd(); + } + + @Override + public void writeStructBegin(TStruct struct) throws TException { + writeJSONObjectStart(); + } + + @Override + public void writeStructEnd() throws TException { + writeJSONObjectEnd(); + } + + @Override + public void writeFieldBegin(TField field) throws TException { + writeJSONInteger(field.id); + writeJSONObjectStart(); + writeJSONString(getTypeNameForTypeID(field.type)); + } + + @Override + public void writeFieldEnd() throws TException { + writeJSONObjectEnd(); + } + + @Override + public void writeFieldStop() {} + + @Override + public void writeMapBegin(TMap map) throws TException { + writeJSONArrayStart(); + writeJSONString(getTypeNameForTypeID(map.keyType)); + writeJSONString(getTypeNameForTypeID(map.valueType)); + writeJSONInteger(map.size); + writeJSONObjectStart(); + } + + @Override + public void writeMapEnd() throws TException { + writeJSONObjectEnd(); + writeJSONArrayEnd(); + } + + @Override + public void writeListBegin(TList list) throws TException { + writeJSONArrayStart(); + writeJSONString(getTypeNameForTypeID(list.elemType)); + writeJSONInteger(list.size); + } + + @Override + public void writeListEnd() throws TException { + writeJSONArrayEnd(); + } + + @Override + public void writeSetBegin(TSet set) throws TException { + writeJSONArrayStart(); + writeJSONString(getTypeNameForTypeID(set.elemType)); + writeJSONInteger(set.size); + } + + @Override + public void writeSetEnd() throws TException { + writeJSONArrayEnd(); + } + + @Override + public void writeBool(boolean b) throws TException { + writeJSONInteger(b ? (long)1 : (long)0); + } + + @Override + public void writeByte(byte b) throws TException { + writeJSONInteger((long)b); + } + + @Override + public void writeI16(short i16) throws TException { + writeJSONInteger((long)i16); + } + + @Override + public void writeI32(int i32) throws TException { + writeJSONInteger((long)i32); + } + + @Override + public void writeI64(long i64) throws TException { + writeJSONInteger(i64); + } + + @Override + public void writeDouble(double dub) throws TException { + writeJSONDouble(dub); + } + + @Override + public void writeString(String str) throws TException { + try { + byte[] b = str.getBytes("UTF-8"); + writeJSONString(b); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + + @Override + public void writeBinary(byte[] bin) throws TException { + writeJSONBase64(bin); + } + + /** + * Reading methods. + */ + + // Read in a JSON string, unescaping as appropriate.. Skip reading from the + // context if skipContext is true. + private TByteArrayOutputStream readJSONString(boolean skipContext) + throws TException { + TByteArrayOutputStream arr = new TByteArrayOutputStream(DEF_STRING_SIZE); + if (!skipContext) { + context_.read(); + } + readJSONSyntaxChar(QUOTE); + while (true) { + byte ch = reader_.read(); + if (ch == QUOTE[0]) { + break; + } + if (ch == ESCSEQ[0]) { + ch = reader_.read(); + if (ch == ESCSEQ[1]) { + readJSONSyntaxChar(ZERO); + readJSONSyntaxChar(ZERO); + trans_.readAll(tmpbuf_, 0, 2); + ch = (byte)((hexVal((byte)tmpbuf_[0]) << 4) + hexVal(tmpbuf_[1])); + } + else { + int off = ESCAPE_CHARS.indexOf(ch); + if (off == -1) { + throw new TProtocolException(TProtocolException.INVALID_DATA, + "Expected control char"); + } + ch = ESCAPE_CHAR_VALS[off]; + } + } + arr.write(ch); + } + return arr; + } + + // Return true if the given byte could be a valid part of a JSON number. + private boolean isJSONNumeric(byte b) { + switch (b) { + case '+': + case '-': + case '.': + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + case 'E': + case 'e': + return true; + } + return false; + } + + // Read in a sequence of characters that are all valid in JSON numbers. Does + // not do a complete regex check to validate that this is actually a number. + private String readJSONNumericChars() throws TException { + StringBuilder strbld = new StringBuilder(); + while (true) { + byte ch = reader_.peek(); + if (!isJSONNumeric(ch)) { + break; + } + strbld.append((char)reader_.read()); + } + return strbld.toString(); + } + + // Read in a JSON number. If the context dictates, read in enclosing quotes. + private long readJSONInteger() throws TException { + context_.read(); + if (context_.escapeNum()) { + readJSONSyntaxChar(QUOTE); + } + String str = readJSONNumericChars(); + if (context_.escapeNum()) { + readJSONSyntaxChar(QUOTE); + } + try { + return Long.valueOf(str); + } + catch (NumberFormatException ex) { + throw new TProtocolException(TProtocolException.INVALID_DATA, + "Bad data encounted in numeric data"); + } + } + + // Read in a JSON double value. Throw if the value is not wrapped in quotes + // when expected or if wrapped in quotes when not expected. + private double readJSONDouble() throws TException { + context_.read(); + if (reader_.peek() == QUOTE[0]) { + TByteArrayOutputStream arr = readJSONString(true); + try { + double dub = Double.valueOf(arr.toString("UTF-8")); + if (!context_.escapeNum() && !Double.isNaN(dub) && + !Double.isInfinite(dub)) { + // Throw exception -- we should not be in a string in this case + throw new TProtocolException(TProtocolException.INVALID_DATA, + "Numeric data unexpectedly quoted"); + } + return dub; + } + catch (UnsupportedEncodingException ex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + else { + if (context_.escapeNum()) { + // This will throw - we should have had a quote if escapeNum == true + readJSONSyntaxChar(QUOTE); + } + try { + return Double.valueOf(readJSONNumericChars()); + } + catch (NumberFormatException ex) { + throw new TProtocolException(TProtocolException.INVALID_DATA, + "Bad data encounted in numeric data"); + } + } + } + + // Read in a JSON string containing base-64 encoded data and decode it. + private byte[] readJSONBase64() throws TException { + TByteArrayOutputStream arr = readJSONString(false); + byte[] b = arr.get(); + int len = arr.len(); + int off = 0; + int size = 0; + while (len >= 4) { + // Decode 4 bytes at a time + TBase64Utils.decode(b, off, 4, b, size); // NB: decoded in place + off += 4; + len -= 4; + size += 3; + } + // Don't decode if we hit the end or got a single leftover byte (invalid + // base64 but legal for skip of regular string type) + if (len > 1) { + // Decode remainder + TBase64Utils.decode(b, off, len, b, size); // NB: decoded in place + size += len - 1; + } + // Sadly we must copy the byte[] (any way around this?) + byte [] result = new byte[size]; + System.arraycopy(b, 0, result, 0, size); + return result; + } + + private void readJSONObjectStart() throws TException { + context_.read(); + readJSONSyntaxChar(LBRACE); + pushContext(new JSONPairContext()); + } + + private void readJSONObjectEnd() throws TException { + readJSONSyntaxChar(RBRACE); + popContext(); + } + + private void readJSONArrayStart() throws TException { + context_.read(); + readJSONSyntaxChar(LBRACKET); + pushContext(new JSONListContext()); + } + + private void readJSONArrayEnd() throws TException { + readJSONSyntaxChar(RBRACKET); + popContext(); + } + + @Override + public TMessage readMessageBegin() throws TException { + readJSONArrayStart(); + if (readJSONInteger() != VERSION) { + throw new TProtocolException(TProtocolException.BAD_VERSION, + "Message contained bad version."); + } + String name; + try { + name = readJSONString(false).toString("UTF-8"); + } + catch (UnsupportedEncodingException ex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + byte type = (byte) readJSONInteger(); + int seqid = (int) readJSONInteger(); + return new TMessage(name, type, seqid); + } + + @Override + public void readMessageEnd() throws TException { + readJSONArrayEnd(); + } + + @Override + public TStruct readStructBegin() throws TException { + readJSONObjectStart(); + return ANONYMOUS_STRUCT; + } + + @Override + public void readStructEnd() throws TException { + readJSONObjectEnd(); + } + + @Override + public TField readFieldBegin() throws TException { + byte ch = reader_.peek(); + byte type; + short id = 0; + if (ch == RBRACE[0]) { + type = TType.STOP; + } + else { + id = (short) readJSONInteger(); + readJSONObjectStart(); + type = getTypeIDForTypeName(readJSONString(false).get()); + } + return new TField("", type, id); + } + + @Override + public void readFieldEnd() throws TException { + readJSONObjectEnd(); + } + + @Override + public TMap readMapBegin() throws TException { + readJSONArrayStart(); + byte keyType = getTypeIDForTypeName(readJSONString(false).get()); + byte valueType = getTypeIDForTypeName(readJSONString(false).get()); + int size = (int)readJSONInteger(); + readJSONObjectStart(); + return new TMap(keyType, valueType, size); + } + + @Override + public void readMapEnd() throws TException { + readJSONObjectEnd(); + readJSONArrayEnd(); + } + + @Override + public TList readListBegin() throws TException { + readJSONArrayStart(); + byte elemType = getTypeIDForTypeName(readJSONString(false).get()); + int size = (int)readJSONInteger(); + return new TList(elemType, size); + } + + @Override + public void readListEnd() throws TException { + readJSONArrayEnd(); + } + + @Override + public TSet readSetBegin() throws TException { + readJSONArrayStart(); + byte elemType = getTypeIDForTypeName(readJSONString(false).get()); + int size = (int)readJSONInteger(); + return new TSet(elemType, size); + } + + @Override + public void readSetEnd() throws TException { + readJSONArrayEnd(); + } + + @Override + public boolean readBool() throws TException { + return (readJSONInteger() == 0 ? false : true); + } + + @Override + public byte readByte() throws TException { + return (byte) readJSONInteger(); + } + + @Override + public short readI16() throws TException { + return (short) readJSONInteger(); + } + + @Override + public int readI32() throws TException { + return (int) readJSONInteger(); + } + + @Override + public long readI64() throws TException { + return (long) readJSONInteger(); + } + + @Override + public double readDouble() throws TException { + return readJSONDouble(); + } + + @Override + public String readString() throws TException { + try { + return readJSONString(false).toString("UTF-8"); + } + catch (UnsupportedEncodingException ex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + + @Override + public byte[] readBinary() throws TException { + return readJSONBase64(); + } + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TList.java b/lib/java/src/org/apache/thrift/protocol/TList.java new file mode 100644 index 000000000..0d36e83d9 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TList.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Helper class that encapsulates list metadata. + * + */ +public final class TList { + public TList() { + this(TType.STOP, 0); + } + + public TList(byte t, int s) { + elemType = t; + size = s; + } + + public final byte elemType; + public final int size; +} diff --git a/lib/java/src/org/apache/thrift/protocol/TMap.java b/lib/java/src/org/apache/thrift/protocol/TMap.java new file mode 100644 index 000000000..20881f7ac --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TMap.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Helper class that encapsulates map metadata. + * + */ +public final class TMap { + public TMap() { + this(TType.STOP, TType.STOP, 0); + } + + public TMap(byte k, byte v, int s) { + keyType = k; + valueType = v; + size = s; + } + + public final byte keyType; + public final byte valueType; + public final int size; +} diff --git a/lib/java/src/org/apache/thrift/protocol/TMessage.java b/lib/java/src/org/apache/thrift/protocol/TMessage.java new file mode 100644 index 000000000..cd56964da --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TMessage.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Helper class that encapsulates struct metadata. + * + */ +public final class TMessage { + public TMessage() { + this("", TType.STOP, 0); + } + + public TMessage(String n, byte t, int s) { + name = n; + type = t; + seqid = s; + } + + public final String name; + public final byte type; + public final int seqid; + + public String toString() { + return "<TMessage name:'" + name + "' type: " + type + " seqid:" + seqid + ">"; + } + + public boolean equals(TMessage other) { + return name.equals(other.name) && type == other.type && seqid == other.seqid; + } +} diff --git a/lib/java/src/org/apache/thrift/protocol/TMessageType.java b/lib/java/src/org/apache/thrift/protocol/TMessageType.java new file mode 100644 index 000000000..aa3f93177 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TMessageType.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Message type constants in the Thrift protocol. + * + */ +public final class TMessageType { + public static final byte CALL = 1; + public static final byte REPLY = 2; + public static final byte EXCEPTION = 3; + public static final byte ONEWAY = 4; +} diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocol.java b/lib/java/src/org/apache/thrift/protocol/TProtocol.java new file mode 100644 index 000000000..50d6683df --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TProtocol.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; + +/** + * Protocol interface definition. + * + */ +public abstract class TProtocol { + + /** + * Prevent direct instantiation + */ + @SuppressWarnings("unused") + private TProtocol() {} + + /** + * Transport + */ + protected TTransport trans_; + + /** + * Constructor + */ + protected TProtocol(TTransport trans) { + trans_ = trans; + } + + /** + * Transport accessor + */ + public TTransport getTransport() { + return trans_; + } + + /** + * Writing methods. + */ + + public abstract void writeMessageBegin(TMessage message) throws TException; + + public abstract void writeMessageEnd() throws TException; + + public abstract void writeStructBegin(TStruct struct) throws TException; + + public abstract void writeStructEnd() throws TException; + + public abstract void writeFieldBegin(TField field) throws TException; + + public abstract void writeFieldEnd() throws TException; + + public abstract void writeFieldStop() throws TException; + + public abstract void writeMapBegin(TMap map) throws TException; + + public abstract void writeMapEnd() throws TException; + + public abstract void writeListBegin(TList list) throws TException; + + public abstract void writeListEnd() throws TException; + + public abstract void writeSetBegin(TSet set) throws TException; + + public abstract void writeSetEnd() throws TException; + + public abstract void writeBool(boolean b) throws TException; + + public abstract void writeByte(byte b) throws TException; + + public abstract void writeI16(short i16) throws TException; + + public abstract void writeI32(int i32) throws TException; + + public abstract void writeI64(long i64) throws TException; + + public abstract void writeDouble(double dub) throws TException; + + public abstract void writeString(String str) throws TException; + + public abstract void writeBinary(byte[] bin) throws TException; + + /** + * Reading methods. + */ + + public abstract TMessage readMessageBegin() throws TException; + + public abstract void readMessageEnd() throws TException; + + public abstract TStruct readStructBegin() throws TException; + + public abstract void readStructEnd() throws TException; + + public abstract TField readFieldBegin() throws TException; + + public abstract void readFieldEnd() throws TException; + + public abstract TMap readMapBegin() throws TException; + + public abstract void readMapEnd() throws TException; + + public abstract TList readListBegin() throws TException; + + public abstract void readListEnd() throws TException; + + public abstract TSet readSetBegin() throws TException; + + public abstract void readSetEnd() throws TException; + + public abstract boolean readBool() throws TException; + + public abstract byte readByte() throws TException; + + public abstract short readI16() throws TException; + + public abstract int readI32() throws TException; + + public abstract long readI64() throws TException; + + public abstract double readDouble() throws TException; + + public abstract String readString() throws TException; + + public abstract byte[] readBinary() throws TException; + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocolException.java b/lib/java/src/org/apache/thrift/protocol/TProtocolException.java new file mode 100644 index 000000000..248815bec --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TProtocolException.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import org.apache.thrift.TException; + +/** + * Protocol exceptions. + * + */ +public class TProtocolException extends TException { + + + private static final long serialVersionUID = 1L; + public static final int UNKNOWN = 0; + public static final int INVALID_DATA = 1; + public static final int NEGATIVE_SIZE = 2; + public static final int SIZE_LIMIT = 3; + public static final int BAD_VERSION = 4; + public static final int NOT_IMPLEMENTED = 5; + + protected int type_ = UNKNOWN; + + public TProtocolException() { + super(); + } + + public TProtocolException(int type) { + super(); + type_ = type; + } + + public TProtocolException(int type, String message) { + super(message); + type_ = type; + } + + public TProtocolException(String message) { + super(message); + } + + public TProtocolException(int type, Throwable cause) { + super(cause); + type_ = type; + } + + public TProtocolException(Throwable cause) { + super(cause); + } + + public TProtocolException(String message, Throwable cause) { + super(message, cause); + } + + public TProtocolException(int type, String message, Throwable cause) { + super(message, cause); + type_ = type; + } + + public int getType() { + return type_; + } + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocolFactory.java b/lib/java/src/org/apache/thrift/protocol/TProtocolFactory.java new file mode 100644 index 000000000..afa502b70 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TProtocolFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import org.apache.thrift.transport.TTransport; + +/** + * Factory interface for constructing protocol instances. + * + */ +public interface TProtocolFactory { + public TProtocol getProtocol(TTransport trans); +} diff --git a/lib/java/src/org/apache/thrift/protocol/TProtocolUtil.java b/lib/java/src/org/apache/thrift/protocol/TProtocolUtil.java new file mode 100644 index 000000000..9bf10f67e --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TProtocolUtil.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import org.apache.thrift.TException; + +/** + * Utility class with static methods for interacting with protocol data + * streams. + * + */ +public class TProtocolUtil { + + /** + * The maximum recursive depth the skip() function will traverse before + * throwing a TException. + */ + private static int maxSkipDepth = Integer.MAX_VALUE; + + /** + * Specifies the maximum recursive depth that the skip function will + * traverse before throwing a TException. This is a global setting, so + * any call to skip in this JVM will enforce this value. + * + * @param depth the maximum recursive depth. A value of 2 would allow + * the skip function to skip a structure or collection with basic children, + * but it would not permit skipping a struct that had a field containing + * a child struct. A value of 1 would only allow skipping of simple + * types and empty structs/collections. + */ + public static void setMaxSkipDepth(int depth) { + maxSkipDepth = depth; + } + + /** + * Skips over the next data element from the provided input TProtocol object. + * + * @param prot the protocol object to read from + * @param type the next value will be intepreted as this TType value. + */ + public static void skip(TProtocol prot, byte type) + throws TException { + skip(prot, type, maxSkipDepth); + } + + /** + * Skips over the next data element from the provided input TProtocol object. + * + * @param prot the protocol object to read from + * @param type the next value will be intepreted as this TType value. + * @param maxDepth this function will only skip complex objects to this + * recursive depth, to prevent Java stack overflow. + */ + public static void skip(TProtocol prot, byte type, int maxDepth) + throws TException { + if (maxDepth <= 0) { + throw new TException("Maximum skip depth exceeded"); + } + switch (type) { + case TType.BOOL: + { + prot.readBool(); + break; + } + case TType.BYTE: + { + prot.readByte(); + break; + } + case TType.I16: + { + prot.readI16(); + break; + } + case TType.I32: + { + prot.readI32(); + break; + } + case TType.I64: + { + prot.readI64(); + break; + } + case TType.DOUBLE: + { + prot.readDouble(); + break; + } + case TType.STRING: + { + prot.readBinary(); + break; + } + case TType.STRUCT: + { + prot.readStructBegin(); + while (true) { + TField field = prot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + skip(prot, field.type, maxDepth - 1); + prot.readFieldEnd(); + } + prot.readStructEnd(); + break; + } + case TType.MAP: + { + TMap map = prot.readMapBegin(); + for (int i = 0; i < map.size; i++) { + skip(prot, map.keyType, maxDepth - 1); + skip(prot, map.valueType, maxDepth - 1); + } + prot.readMapEnd(); + break; + } + case TType.SET: + { + TSet set = prot.readSetBegin(); + for (int i = 0; i < set.size; i++) { + skip(prot, set.elemType, maxDepth - 1); + } + prot.readSetEnd(); + break; + } + case TType.LIST: + { + TList list = prot.readListBegin(); + for (int i = 0; i < list.size; i++) { + skip(prot, list.elemType, maxDepth - 1); + } + prot.readListEnd(); + break; + } + default: + break; + } + } +} diff --git a/lib/java/src/org/apache/thrift/protocol/TSet.java b/lib/java/src/org/apache/thrift/protocol/TSet.java new file mode 100644 index 000000000..38be9a991 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TSet.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Helper class that encapsulates set metadata. + * + */ +public final class TSet { + public TSet() { + this(TType.STOP, 0); + } + + public TSet(byte t, int s) { + elemType = t; + size = s; + } + + public TSet(TList list) { + this(list.elemType, list.size); + } + + public final byte elemType; + public final int size; +} diff --git a/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java b/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java new file mode 100644 index 000000000..a60bdf407 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TSimpleJSONProtocol.java @@ -0,0 +1,384 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +import java.io.UnsupportedEncodingException; +import java.util.Stack; + +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransport; + +/** + * JSON protocol implementation for thrift. + * + * This protocol is write-only and produces a simple output format + * suitable for parsing by scripting languages. It should not be + * confused with the full-featured TJSONProtocol. + * + */ +public class TSimpleJSONProtocol extends TProtocol { + + /** + * Factory + */ + public static class Factory implements TProtocolFactory { + public TProtocol getProtocol(TTransport trans) { + return new TSimpleJSONProtocol(trans); + } + } + + public static final byte[] COMMA = new byte[] {','}; + public static final byte[] COLON = new byte[] {':'}; + public static final byte[] LBRACE = new byte[] {'{'}; + public static final byte[] RBRACE = new byte[] {'}'}; + public static final byte[] LBRACKET = new byte[] {'['}; + public static final byte[] RBRACKET = new byte[] {']'}; + public static final char QUOTE = '"'; + + private static final TStruct ANONYMOUS_STRUCT = new TStruct(); + private static final TField ANONYMOUS_FIELD = new TField(); + private static final TMessage EMPTY_MESSAGE = new TMessage(); + private static final TSet EMPTY_SET = new TSet(); + private static final TList EMPTY_LIST = new TList(); + private static final TMap EMPTY_MAP = new TMap(); + + protected class Context { + protected void write() throws TException {} + } + + protected class ListContext extends Context { + protected boolean first_ = true; + + protected void write() throws TException { + if (first_) { + first_ = false; + } else { + trans_.write(COMMA); + } + } + } + + protected class StructContext extends Context { + protected boolean first_ = true; + protected boolean colon_ = true; + + protected void write() throws TException { + if (first_) { + first_ = false; + colon_ = true; + } else { + trans_.write(colon_ ? COLON : COMMA); + colon_ = !colon_; + } + } + } + + protected final Context BASE_CONTEXT = new Context(); + + /** + * Stack of nested contexts that we may be in. + */ + protected Stack<Context> writeContextStack_ = new Stack<Context>(); + + /** + * Current context that we are in + */ + protected Context writeContext_ = BASE_CONTEXT; + + /** + * Push a new write context onto the stack. + */ + protected void pushWriteContext(Context c) { + writeContextStack_.push(writeContext_); + writeContext_ = c; + } + + /** + * Pop the last write context off the stack + */ + protected void popWriteContext() { + writeContext_ = writeContextStack_.pop(); + } + + /** + * Constructor + */ + public TSimpleJSONProtocol(TTransport trans) { + super(trans); + } + + public void writeMessageBegin(TMessage message) throws TException { + trans_.write(LBRACKET); + pushWriteContext(new ListContext()); + writeString(message.name); + writeByte(message.type); + writeI32(message.seqid); + } + + public void writeMessageEnd() throws TException { + popWriteContext(); + trans_.write(RBRACKET); + } + + public void writeStructBegin(TStruct struct) throws TException { + writeContext_.write(); + trans_.write(LBRACE); + pushWriteContext(new StructContext()); + } + + public void writeStructEnd() throws TException { + popWriteContext(); + trans_.write(RBRACE); + } + + public void writeFieldBegin(TField field) throws TException { + // Note that extra type information is omitted in JSON! + writeString(field.name); + } + + public void writeFieldEnd() {} + + public void writeFieldStop() {} + + public void writeMapBegin(TMap map) throws TException { + writeContext_.write(); + trans_.write(LBRACE); + pushWriteContext(new StructContext()); + // No metadata! + } + + public void writeMapEnd() throws TException { + popWriteContext(); + trans_.write(RBRACE); + } + + public void writeListBegin(TList list) throws TException { + writeContext_.write(); + trans_.write(LBRACKET); + pushWriteContext(new ListContext()); + // No metadata! + } + + public void writeListEnd() throws TException { + popWriteContext(); + trans_.write(RBRACKET); + } + + public void writeSetBegin(TSet set) throws TException { + writeContext_.write(); + trans_.write(LBRACKET); + pushWriteContext(new ListContext()); + // No metadata! + } + + public void writeSetEnd() throws TException { + popWriteContext(); + trans_.write(RBRACKET); + } + + public void writeBool(boolean b) throws TException { + writeByte(b ? (byte)1 : (byte)0); + } + + public void writeByte(byte b) throws TException { + writeI32(b); + } + + public void writeI16(short i16) throws TException { + writeI32(i16); + } + + public void writeI32(int i32) throws TException { + writeContext_.write(); + _writeStringData(Integer.toString(i32)); + } + + public void _writeStringData(String s) throws TException { + try { + byte[] b = s.getBytes("UTF-8"); + trans_.write(b); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + + public void writeI64(long i64) throws TException { + writeContext_.write(); + _writeStringData(Long.toString(i64)); + } + + public void writeDouble(double dub) throws TException { + writeContext_.write(); + _writeStringData(Double.toString(dub)); + } + + public void writeString(String str) throws TException { + writeContext_.write(); + int length = str.length(); + StringBuffer escape = new StringBuffer(length + 16); + escape.append(QUOTE); + for (int i = 0; i < length; ++i) { + char c = str.charAt(i); + switch (c) { + case '"': + case '\\': + escape.append('\\'); + escape.append(c); + break; + case '\b': + escape.append('\\'); + escape.append('b'); + break; + case '\f': + escape.append('\\'); + escape.append('f'); + break; + case '\n': + escape.append('\\'); + escape.append('n'); + break; + case '\r': + escape.append('\\'); + escape.append('r'); + break; + case '\t': + escape.append('\\'); + escape.append('t'); + break; + default: + // Control characeters! According to JSON RFC u0020 (space) + if (c < ' ') { + String hex = Integer.toHexString(c); + escape.append('\\'); + escape.append('u'); + for (int j = 4; j > hex.length(); --j) { + escape.append('0'); + } + escape.append(hex); + } else { + escape.append(c); + } + break; + } + } + escape.append(QUOTE); + _writeStringData(escape.toString()); + } + + public void writeBinary(byte[] bin) throws TException { + try { + // TODO(mcslee): Fix this + writeString(new String(bin, "UTF-8")); + } catch (UnsupportedEncodingException uex) { + throw new TException("JVM DOES NOT SUPPORT UTF-8"); + } + } + + /** + * Reading methods. + */ + + public TMessage readMessageBegin() throws TException { + // TODO(mcslee): implement + return EMPTY_MESSAGE; + } + + public void readMessageEnd() {} + + public TStruct readStructBegin() { + // TODO(mcslee): implement + return ANONYMOUS_STRUCT; + } + + public void readStructEnd() {} + + public TField readFieldBegin() throws TException { + // TODO(mcslee): implement + return ANONYMOUS_FIELD; + } + + public void readFieldEnd() {} + + public TMap readMapBegin() throws TException { + // TODO(mcslee): implement + return EMPTY_MAP; + } + + public void readMapEnd() {} + + public TList readListBegin() throws TException { + // TODO(mcslee): implement + return EMPTY_LIST; + } + + public void readListEnd() {} + + public TSet readSetBegin() throws TException { + // TODO(mcslee): implement + return EMPTY_SET; + } + + public void readSetEnd() {} + + public boolean readBool() throws TException { + return (readByte() == 1); + } + + public byte readByte() throws TException { + // TODO(mcslee): implement + return 0; + } + + public short readI16() throws TException { + // TODO(mcslee): implement + return 0; + } + + public int readI32() throws TException { + // TODO(mcslee): implement + return 0; + } + + public long readI64() throws TException { + // TODO(mcslee): implement + return 0; + } + + public double readDouble() throws TException { + // TODO(mcslee): implement + return 0; + } + + public String readString() throws TException { + // TODO(mcslee): implement + return ""; + } + + public String readStringBody(int size) throws TException { + // TODO(mcslee): implement + return ""; + } + + public byte[] readBinary() throws TException { + // TODO(mcslee): implement + return new byte[0]; + } + +} diff --git a/lib/java/src/org/apache/thrift/protocol/TStruct.java b/lib/java/src/org/apache/thrift/protocol/TStruct.java new file mode 100644 index 000000000..a0f79012a --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TStruct.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Helper class that encapsulates struct metadata. + * + */ +public final class TStruct { + public TStruct() { + this(""); + } + + public TStruct(String n) { + name = n; + } + + public final String name; +} diff --git a/lib/java/src/org/apache/thrift/protocol/TType.java b/lib/java/src/org/apache/thrift/protocol/TType.java new file mode 100644 index 000000000..dbdc3caa8 --- /dev/null +++ b/lib/java/src/org/apache/thrift/protocol/TType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.protocol; + +/** + * Type constants in the Thrift protocol. + * + */ +public final class TType { + public static final byte STOP = 0; + public static final byte VOID = 1; + public static final byte BOOL = 2; + public static final byte BYTE = 3; + public static final byte DOUBLE = 4; + public static final byte I16 = 6; + public static final byte I32 = 8; + public static final byte I64 = 10; + public static final byte STRING = 11; + public static final byte STRUCT = 12; + public static final byte MAP = 13; + public static final byte SET = 14; + public static final byte LIST = 15; +} diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java new file mode 100644 index 000000000..8bf096ed6 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.server; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerTransport; + +/** + * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. + * Like TNonblockingServer, it relies on the use of TFramedTransport. + */ +public class THsHaServer extends TNonblockingServer { + + // This wraps all the functionality of queueing and thread pool management + // for the passing of Invocations from the Selector to workers. + private ExecutorService invoker; + + protected final int MIN_WORKER_THREADS; + protected final int MAX_WORKER_THREADS; + protected final int STOP_TIMEOUT_VAL; + protected final TimeUnit STOP_TIMEOUT_UNIT; + + /** + * Create server with given processor, and server transport. Default server + * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on + * both input and output transports. A TProcessorFactory will be created that + * always returns the specified processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport) { + this(processor, serverTransport, new Options()); + } + + /** + * Create server with given processor, server transport, and server options + * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on + * both input and output transports. A TProcessorFactory will be created that + * always returns the specified processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + Options options) { + this(new TProcessorFactory(processor), serverTransport, options); + } + + /** + * Create server with specified processor factory and server transport. Uses + * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is + * used on both input and output transports. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport) { + this(processorFactory, serverTransport, new Options()); + } + + /** + * Create server with specified processor factory, server transport, and server + * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on + * both input and output transports. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + Options options) { + this(processorFactory, serverTransport, new TFramedTransport.Factory(), + new TBinaryProtocol.Factory(), options); + } + + /** + * Server with specified processor, server transport, and in/out protocol + * factory. Defaults will be used for in/out transport factory and server + * options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, protocolFactory, new Options()); + } + + /** + * Server with specified processor, server transport, and in/out protocol + * factory. Defaults will be used for in/out transport factory and server + * options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory, + Options options) { + this(processor, serverTransport, new TFramedTransport.Factory(), + protocolFactory); + } + + /** + * Create server with specified processor, server transport, in/out + * transport factory, in/out protocol factory, and default server options. A + * processor factory will be created that always returns the specified + * processor. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + transportFactory, protocolFactory); + } + + /** + * Create server with specified processor factory, server transport, in/out + * transport factory, in/out protocol factory, and default server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, new Options()); + } + + /** + * Create server with specified processor factory, server transport, in/out + * transport factory, in/out protocol factory, and server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory, + Options options) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory, + options); + } + + /** + * Create server with everything specified, except use default server options. + */ + public THsHaServer( TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + /** + * Create server with everything specified, except use default server options. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) + { + this(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, new Options()); + } + + /** + * Create server with every option fully specified. + */ + public THsHaServer( TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) + { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + options); + + MIN_WORKER_THREADS = options.minWorkerThreads; + MAX_WORKER_THREADS = options.maxWorkerThreads; + STOP_TIMEOUT_VAL = options.stopTimeoutVal; + STOP_TIMEOUT_UNIT = options.stopTimeoutUnit; + } + + /** @inheritDoc */ + @Override + public void serve() { + if (!startInvokerPool()) { + return; + } + + // start listening, or exit + if (!startListening()) { + return; + } + + // start the selector, or exit + if (!startSelectorThread()) { + return; + } + + // this will block while we serve + joinSelector(); + + gracefullyShutdownInvokerPool(); + + // do a little cleanup + stopListening(); + + // ungracefully shut down the invoker pool? + } + + protected boolean startInvokerPool() { + // start the invoker pool + LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); + invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS, + STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue); + + return true; + } + + protected void gracefullyShutdownInvokerPool() { + // try to gracefully shut down the executor service + invoker.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = 10000; + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + /** + * We override the standard invoke method here to queue the invocation for + * invoker service instead of immediately invoking. The thread pool takes care of the rest. + */ + @Override + protected void requestInvoke(FrameBuffer frameBuffer) { + invoker.execute(new Invocation(frameBuffer)); + } + + /** + * An Invocation represents a method call that is prepared to execute, given + * an idle worker thread. It contains the input and output protocols the + * thread's processor should use to perform the usual Thrift invocation. + */ + private class Invocation implements Runnable { + + private final FrameBuffer frameBuffer; + + public Invocation(final FrameBuffer frameBuffer) { + this.frameBuffer = frameBuffer; + } + + public void run() { + frameBuffer.invoke(); + } + } + + public static class Options extends TNonblockingServer.Options { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + } +} diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java new file mode 100644 index 000000000..95d81e223 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java @@ -0,0 +1,769 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.server; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.spi.SelectorProvider; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.log4j.Logger; + +import org.apache.thrift.TByteArrayOutputStream; +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TIOStreamTransport; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TNonblockingTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +/** + * A nonblocking TServer implementation. This allows for fairness amongst all + * connected clients in terms of invocations. + * + * This server is inherently single-threaded. If you want a limited thread pool + * coupled with invocation-fairness, see THsHaServer. + * + * To use this server, you MUST use a TFramedTransport at the outermost + * transport, otherwise this server will be unable to determine when a whole + * method call has been read off the wire. Clients must also use TFramedTransport. + */ +public class TNonblockingServer extends TServer { + private static final Logger LOGGER = + Logger.getLogger(TNonblockingServer.class.getName()); + + // Flag for stopping the server + private volatile boolean stopped_; + + private SelectThread selectThread_; + + /** + * The maximum amount of memory we will allocate to client IO buffers at a + * time. Without this limit, the server will gladly allocate client buffers + * right into an out of memory exception, rather than waiting. + */ + private final long MAX_READ_BUFFER_BYTES; + + protected final Options options_; + + /** + * How many bytes are currently allocated to read buffers. + */ + private long readBufferBytesAllocated = 0; + + /** + * Create server with given processor and server transport, using + * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input + * and output transports. A TProcessorFactory will be created that always + * returns the specified processor. + */ + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport) { + this(new TProcessorFactory(processor), serverTransport); + } + + /** + * Create server with specified processor factory and server transport. + * TBinaryProtocol is assumed. TFramedTransport.Factory is used on both input + * and output transports. + */ + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport) { + this(processorFactory, serverTransport, + new TFramedTransport.Factory(), new TFramedTransport.Factory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); + } + + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, + new TFramedTransport.Factory(), new TFramedTransport.Factory(), + protocolFactory, protocolFactory); + } + + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory); + } + + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory); + } + + public TNonblockingServer(TProcessor processor, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + new Options()); + } + + public TNonblockingServer(TProcessorFactory processorFactory, + TNonblockingServerTransport serverTransport, + TFramedTransport.Factory inputTransportFactory, + TFramedTransport.Factory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + options_ = options; + options_.validate(); + MAX_READ_BUFFER_BYTES = options.maxReadBufferBytes; + } + + /** + * Begin accepting connections and processing invocations. + */ + public void serve() { + // start listening, or exit + if (!startListening()) { + return; + } + + // start the selector, or exit + if (!startSelectorThread()) { + return; + } + + // this will block while we serve + joinSelector(); + + // do a little cleanup + stopListening(); + } + + /** + * Have the server transport start accepting connections. + * + * @return true if we started listening successfully, false if something went + * wrong. + */ + protected boolean startListening() { + try { + serverTransport_.listen(); + return true; + } catch (TTransportException ttx) { + LOGGER.error("Failed to start listening on server socket!", ttx); + return false; + } + } + + /** + * Stop listening for conections. + */ + protected void stopListening() { + serverTransport_.close(); + } + + /** + * Start the selector thread running to deal with clients. + * + * @return true if everything went ok, false if we couldn't start for some + * reason. + */ + protected boolean startSelectorThread() { + // start the selector + try { + selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_); + selectThread_.start(); + return true; + } catch (IOException e) { + LOGGER.error("Failed to start selector thread!", e); + return false; + } + } + + /** + * Block until the selector exits. + */ + protected void joinSelector() { + // wait until the selector thread exits + try { + selectThread_.join(); + } catch (InterruptedException e) { + // for now, just silently ignore. technically this means we'll have less of + // a graceful shutdown as a result. + } + } + + /** + * Stop serving and shut everything down. + */ + public void stop() { + stopped_ = true; + selectThread_.wakeupSelector(); + } + + /** + * Perform an invocation. This method could behave several different ways + * - invoke immediately inline, queue for separate execution, etc. + */ + protected void requestInvoke(FrameBuffer frameBuffer) { + frameBuffer.invoke(); + } + + /** + * A FrameBuffer wants to change its selection preferences, but might not be + * in the select thread. + */ + protected void requestSelectInterestChange(FrameBuffer frameBuffer) { + selectThread_.requestSelectInterestChange(frameBuffer); + } + + /** + * The thread that will be doing all the selecting, managing new connections + * and those that still need to be read. + */ + protected class SelectThread extends Thread { + + private final TNonblockingServerTransport serverTransport; + private final Selector selector; + + // List of FrameBuffers that want to change their selection interests. + private final Set<FrameBuffer> selectInterestChanges = + new HashSet<FrameBuffer>(); + + /** + * Set up the SelectorThread. + */ + public SelectThread(final TNonblockingServerTransport serverTransport) + throws IOException { + this.serverTransport = serverTransport; + this.selector = SelectorProvider.provider().openSelector(); + serverTransport.registerSelector(selector); + } + + /** + * The work loop. Handles both selecting (all IO operations) and managing + * the selection preferences of all existing connections. + */ + public void run() { + while (!stopped_) { + select(); + processInterestChanges(); + } + } + + /** + * If the selector is blocked, wake it up. + */ + public void wakeupSelector() { + selector.wakeup(); + } + + /** + * Add FrameBuffer to the list of select interest changes and wake up the + * selector if it's blocked. When the select() call exits, it'll give the + * FrameBuffer a chance to change its interests. + */ + public void requestSelectInterestChange(FrameBuffer frameBuffer) { + synchronized (selectInterestChanges) { + selectInterestChanges.add(frameBuffer); + } + // wakeup the selector, if it's currently blocked. + selector.wakeup(); + } + + /** + * Select and process IO events appropriately: + * If there are connections to be accepted, accept them. + * If there are existing connections with data waiting to be read, read it, + * bufferring until a whole frame has been read. + * If there are any pending responses, buffer them until their target client + * is available, and then send the data. + */ + private void select() { + try { + // wait for io events. + selector.select(); + + // process the io events we received + Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); + while (!stopped_ && selectedKeys.hasNext()) { + SelectionKey key = selectedKeys.next(); + selectedKeys.remove(); + + // skip if not valid + if (!key.isValid()) { + cleanupSelectionkey(key); + continue; + } + + // if the key is marked Accept, then it has to be the server + // transport. + if (key.isAcceptable()) { + handleAccept(); + } else if (key.isReadable()) { + // deal with reads + handleRead(key); + } else if (key.isWritable()) { + // deal with writes + handleWrite(key); + } else { + LOGGER.warn("Unexpected state in select! " + key.interestOps()); + } + } + } catch (IOException e) { + LOGGER.warn("Got an IOException while selecting!", e); + } + } + + /** + * Check to see if there are any FrameBuffers that have switched their + * interest type from read to write or vice versa. + */ + private void processInterestChanges() { + synchronized (selectInterestChanges) { + for (FrameBuffer fb : selectInterestChanges) { + fb.changeSelectInterests(); + } + selectInterestChanges.clear(); + } + } + + /** + * Accept a new connection. + */ + private void handleAccept() throws IOException { + SelectionKey clientKey = null; + TNonblockingTransport client = null; + try { + // accept the connection + client = (TNonblockingTransport)serverTransport.accept(); + clientKey = client.registerSelector(selector, SelectionKey.OP_READ); + + // add this key to the map + FrameBuffer frameBuffer = new FrameBuffer(client, clientKey); + clientKey.attach(frameBuffer); + } catch (TTransportException tte) { + // something went wrong accepting. + LOGGER.warn("Exception trying to accept!", tte); + tte.printStackTrace(); + if (clientKey != null) cleanupSelectionkey(clientKey); + if (client != null) client.close(); + } + } + + /** + * Do the work required to read from a readable client. If the frame is + * fully read, then invoke the method call. + */ + private void handleRead(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer)key.attachment(); + if (buffer.read()) { + // if the buffer's frame read is complete, invoke the method. + if (buffer.isFrameFullyRead()) { + requestInvoke(buffer); + } + } else { + cleanupSelectionkey(key); + } + } + + /** + * Let a writable client get written, if there's data to be written. + */ + private void handleWrite(SelectionKey key) { + FrameBuffer buffer = (FrameBuffer)key.attachment(); + if (!buffer.write()) { + cleanupSelectionkey(key); + } + } + + /** + * Do connection-close cleanup on a given SelectionKey. + */ + private void cleanupSelectionkey(SelectionKey key) { + // remove the records from the two maps + FrameBuffer buffer = (FrameBuffer)key.attachment(); + if (buffer != null) { + // close the buffer + buffer.close(); + } + // cancel the selection key + key.cancel(); + } + } // SelectorThread + + /** + * Class that implements a sort of state machine around the interaction with + * a client and an invoker. It manages reading the frame size and frame data, + * getting it handed off as wrapped transports, and then the writing of + * reponse data back to the client. In the process it manages flipping the + * read and write bits on the selection key for its client. + */ + protected class FrameBuffer { + // + // Possible states for the FrameBuffer state machine. + // + // in the midst of reading the frame size off the wire + private static final int READING_FRAME_SIZE = 1; + // reading the actual frame data now, but not all the way done yet + private static final int READING_FRAME = 2; + // completely read the frame, so an invocation can now happen + private static final int READ_FRAME_COMPLETE = 3; + // waiting to get switched to listening for write events + private static final int AWAITING_REGISTER_WRITE = 4; + // started writing response data, not fully complete yet + private static final int WRITING = 6; + // another thread wants this framebuffer to go back to reading + private static final int AWAITING_REGISTER_READ = 7; + // we want our transport and selection key invalidated in the selector thread + private static final int AWAITING_CLOSE = 8; + + // + // Instance variables + // + + // the actual transport hooked up to the client. + private final TNonblockingTransport trans_; + + // the SelectionKey that corresponds to our transport + private final SelectionKey selectionKey_; + + // where in the process of reading/writing are we? + private int state_ = READING_FRAME_SIZE; + + // the ByteBuffer we'll be using to write and read, depending on the state + private ByteBuffer buffer_; + + private TByteArrayOutputStream response_; + + public FrameBuffer( final TNonblockingTransport trans, + final SelectionKey selectionKey) { + trans_ = trans; + selectionKey_ = selectionKey; + buffer_ = ByteBuffer.allocate(4); + } + + /** + * Give this FrameBuffer a chance to read. The selector loop should have + * received a read event for this FrameBuffer. + * + * @return true if the connection should live on, false if it should be + * closed + */ + public boolean read() { + if (state_ == READING_FRAME_SIZE) { + // try to read the frame size completely + if (!internalRead()) { + return false; + } + + // if the frame size has been read completely, then prepare to read the + // actual frame. + if (buffer_.remaining() == 0) { + // pull out the frame size as an integer. + int frameSize = buffer_.getInt(0); + if (frameSize <= 0) { + LOGGER.error("Read an invalid frame size of " + frameSize + + ". Are you using TFramedTransport on the client side?"); + return false; + } + + // if this frame will always be too large for this server, log the + // error and close the connection. + if (frameSize + 4 > MAX_READ_BUFFER_BYTES) { + LOGGER.error("Read a frame size of " + frameSize + + ", which is bigger than the maximum allowable buffer size for ALL connections."); + return false; + } + + // if this frame will push us over the memory limit, then return. + // with luck, more memory will free up the next time around. + if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) { + return true; + } + + // incremement the amount of memory allocated to read buffers + readBufferBytesAllocated += frameSize + 4; + + // reallocate the readbuffer as a frame-sized buffer + buffer_ = ByteBuffer.allocate(frameSize + 4); + // put the frame size at the head of the buffer + buffer_.putInt(frameSize); + + state_ = READING_FRAME; + } else { + // this skips the check of READING_FRAME state below, since we can't + // possibly go on to that state if there's data left to be read at + // this one. + return true; + } + } + + // it is possible to fall through from the READING_FRAME_SIZE section + // to READING_FRAME if there's already some frame data available once + // READING_FRAME_SIZE is complete. + + if (state_ == READING_FRAME) { + if (!internalRead()) { + return false; + } + + // since we're already in the select loop here for sure, we can just + // modify our selection key directly. + if (buffer_.remaining() == 0) { + // get rid of the read select interests + selectionKey_.interestOps(0); + state_ = READ_FRAME_COMPLETE; + } + + return true; + } + + // if we fall through to this point, then the state must be invalid. + LOGGER.error("Read was called but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to write its output to the final client. + */ + public boolean write() { + if (state_ == WRITING) { + try { + if (trans_.write(buffer_) < 0) { + return false; + } + } catch (IOException e) { + LOGGER.warn("Got an IOException during write!", e); + return false; + } + + // we're done writing. now we need to switch back to reading. + if (buffer_.remaining() == 0) { + prepareRead(); + } + return true; + } + + LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); + return false; + } + + /** + * Give this FrameBuffer a chance to set its interest to write, once data + * has come in. + */ + public void changeSelectInterests() { + if (state_ == AWAITING_REGISTER_WRITE) { + // set the OP_WRITE interest + selectionKey_.interestOps(SelectionKey.OP_WRITE); + state_ = WRITING; + } else if (state_ == AWAITING_REGISTER_READ) { + prepareRead(); + } else if (state_ == AWAITING_CLOSE){ + close(); + selectionKey_.cancel(); + } else { + LOGGER.error( + "changeSelectInterest was called, but state is invalid (" + + state_ + ")"); + } + } + + /** + * Shut the connection down. + */ + public void close() { + // if we're being closed due to an error, we might have allocated a + // buffer that we need to subtract for our memory accounting. + if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) { + readBufferBytesAllocated -= buffer_.array().length; + } + trans_.close(); + } + + /** + * Check if this FrameBuffer has a full frame read. + */ + public boolean isFrameFullyRead() { + return state_ == READ_FRAME_COMPLETE; + } + + /** + * After the processor has processed the invocation, whatever thread is + * managing invocations should call this method on this FrameBuffer so we + * know it's time to start trying to write again. Also, if it turns out + * that there actually isn't any data in the response buffer, we'll skip + * trying to write and instead go back to reading. + */ + public void responseReady() { + // the read buffer is definitely no longer in use, so we will decrement + // our read buffer count. we do this here as well as in close because + // we'd like to free this read memory up as quickly as possible for other + // clients. + readBufferBytesAllocated -= buffer_.array().length; + + if (response_.len() == 0) { + // go straight to reading again. this was probably an oneway method + state_ = AWAITING_REGISTER_READ; + buffer_ = null; + } else { + buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); + + // set state that we're waiting to be switched to write. we do this + // asynchronously through requestSelectInterestChange() because there is a + // possibility that we're not in the main thread, and thus currently + // blocked in select(). (this functionality is in place for the sake of + // the HsHa server.) + state_ = AWAITING_REGISTER_WRITE; + } + requestSelectInterestChange(); + } + + /** + * Actually invoke the method signified by this FrameBuffer. + */ + public void invoke() { + TTransport inTrans = getInputTransport(); + TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans); + TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport()); + + try { + processorFactory_.getProcessor(inTrans).process(inProt, outProt); + responseReady(); + return; + } catch (TException te) { + LOGGER.warn("Exception while invoking!", te); + } catch (Exception e) { + LOGGER.error("Unexpected exception while invoking!", e); + } + // This will only be reached when there is an exception. + state_ = AWAITING_CLOSE; + requestSelectInterestChange(); + } + + /** + * Wrap the read buffer in a memory-based transport so a processor can read + * the data it needs to handle an invocation. + */ + private TTransport getInputTransport() { + return inputTransportFactory_.getTransport(new TIOStreamTransport( + new ByteArrayInputStream(buffer_.array()))); + } + + /** + * Get the transport that should be used by the invoker for responding. + */ + private TTransport getOutputTransport() { + response_ = new TByteArrayOutputStream(); + return outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); + } + + /** + * Perform a read into buffer. + * + * @return true if the read succeeded, false if there was an error or the + * connection closed. + */ + private boolean internalRead() { + try { + if (trans_.read(buffer_) < 0) { + return false; + } + return true; + } catch (IOException e) { + LOGGER.warn("Got an IOException in internalRead!", e); + return false; + } + } + + /** + * We're done writing, so reset our interest ops and change state accordingly. + */ + private void prepareRead() { + // we can set our interest directly without using the queue because + // we're in the select thread. + selectionKey_.interestOps(SelectionKey.OP_READ); + // get ready for another go-around + buffer_ = ByteBuffer.allocate(4); + state_ = READING_FRAME_SIZE; + } + + /** + * When this FrameBuffer needs to change it's select interests and execution + * might not be in the select thread, then this method will make sure the + * interest change gets done when the select thread wakes back up. When the + * current thread is the select thread, then it just does the interest change + * immediately. + */ + private void requestSelectInterestChange() { + if (Thread.currentThread() == selectThread_) { + changeSelectInterests(); + } else { + TNonblockingServer.this.requestSelectInterestChange(this); + } + } + } // FrameBuffer + + + public static class Options { + public long maxReadBufferBytes = Long.MAX_VALUE; + + public Options() {} + + public void validate() { + if (maxReadBufferBytes <= 1024) { + throw new IllegalArgumentException("You must allocate at least 1KB to the read buffer."); + } + } + } +} diff --git a/lib/java/src/org/apache/thrift/server/TServer.java b/lib/java/src/org/apache/thrift/server/TServer.java new file mode 100644 index 000000000..eafe0c173 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/TServer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.server; + +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportFactory; + +/** + * Generic interface for a Thrift server. + * + */ +public abstract class TServer { + + /** + * Core processor + */ + protected TProcessorFactory processorFactory_; + + /** + * Server transport + */ + protected TServerTransport serverTransport_; + + /** + * Input Transport Factory + */ + protected TTransportFactory inputTransportFactory_; + + /** + * Output Transport Factory + */ + protected TTransportFactory outputTransportFactory_; + + /** + * Input Protocol Factory + */ + protected TProtocolFactory inputProtocolFactory_; + + /** + * Output Protocol Factory + */ + protected TProtocolFactory outputProtocolFactory_; + + /** + * Default constructors. + */ + + protected TServer(TProcessorFactory processorFactory, + TServerTransport serverTransport) { + this(processorFactory, + serverTransport, + new TTransportFactory(), + new TTransportFactory(), + new TBinaryProtocol.Factory(), + new TBinaryProtocol.Factory()); + } + + protected TServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory) { + this(processorFactory, + serverTransport, + transportFactory, + transportFactory, + new TBinaryProtocol.Factory(), + new TBinaryProtocol.Factory()); + } + + protected TServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, + serverTransport, + transportFactory, + transportFactory, + protocolFactory, + protocolFactory); + } + + protected TServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + processorFactory_ = processorFactory; + serverTransport_ = serverTransport; + inputTransportFactory_ = inputTransportFactory; + outputTransportFactory_ = outputTransportFactory; + inputProtocolFactory_ = inputProtocolFactory; + outputProtocolFactory_ = outputProtocolFactory; + } + + /** + * The run method fires up the server and gets things going. + */ + public abstract void serve(); + + /** + * Stop the server. This is optional on a per-implementation basis. Not + * all servers are required to be cleanly stoppable. + */ + public void stop() {} + +} diff --git a/lib/java/src/org/apache/thrift/server/TSimpleServer.java b/lib/java/src/org/apache/thrift/server/TSimpleServer.java new file mode 100644 index 000000000..b3ee5ad6e --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/TSimpleServer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.server; + +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportFactory; +import org.apache.thrift.transport.TTransportException; +import org.apache.log4j.Logger; + +/** + * Simple singlethreaded server for testing. + * + */ +public class TSimpleServer extends TServer { + + private static final Logger LOGGER = Logger.getLogger(TSimpleServer.class.getName()); + + private boolean stopped_ = false; + + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport) { + super(new TProcessorFactory(processor), serverTransport); + } + + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + super(new TProcessorFactory(processor), serverTransport, transportFactory, protocolFactory); + } + + public TSimpleServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + super(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + public TSimpleServer(TProcessorFactory processorFactory, + TServerTransport serverTransport) { + super(processorFactory, serverTransport); + } + + public TSimpleServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + super(processorFactory, serverTransport, transportFactory, protocolFactory); + } + + public TSimpleServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + + public void serve() { + stopped_ = false; + try { + serverTransport_.listen(); + } catch (TTransportException ttx) { + LOGGER.error("Error occurred during listening.", ttx); + return; + } + + while (!stopped_) { + TTransport client = null; + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + try { + client = serverTransport_.accept(); + if (client != null) { + processor = processorFactory_.getProcessor(client); + inputTransport = inputTransportFactory_.getTransport(client); + outputTransport = outputTransportFactory_.getTransport(client); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + while (processor.process(inputProtocol, outputProtocol)) {} + } + } catch (TTransportException ttx) { + // Client died, just move on + } catch (TException tx) { + if (!stopped_) { + LOGGER.error("Thrift error occurred during processing of message.", tx); + } + } catch (Exception x) { + if (!stopped_) { + LOGGER.error("Error occurred during processing of message.", x); + } + } + + if (inputTransport != null) { + inputTransport.close(); + } + + if (outputTransport != null) { + outputTransport.close(); + } + + } + } + + public void stop() { + stopped_ = true; + serverTransport_.interrupt(); + } +} diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java new file mode 100644 index 000000000..ebc5a9be6 --- /dev/null +++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.server; + +import org.apache.thrift.TException; +import org.apache.thrift.TProcessor; +import org.apache.thrift.TProcessorFactory; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.apache.thrift.transport.TTransportFactory; +import org.apache.log4j.Logger; +import org.apache.log4j.Level; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + + +/** + * Server which uses Java's built in ThreadPool management to spawn off + * a worker pool that + * + */ +public class TThreadPoolServer extends TServer { + + private static final Logger LOGGER = Logger.getLogger(TThreadPoolServer.class.getName()); + + // Executor service for handling client connections + private ExecutorService executorService_; + + // Flag for stopping the server + private volatile boolean stopped_; + + // Server options + private Options options_; + + // Customizable server options + public static class Options { + public int minWorkerThreads = 5; + public int maxWorkerThreads = Integer.MAX_VALUE; + public int stopTimeoutVal = 60; + public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport) { + this(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); + } + + public TThreadPoolServer(TProcessorFactory processorFactory, + TServerTransport serverTransport) { + this(processorFactory, serverTransport, + new TTransportFactory(), new TTransportFactory(), + new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory()); + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, + new TTransportFactory(), new TTransportFactory(), + protocolFactory, protocolFactory); + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + this(processor, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory); + } + + public TThreadPoolServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory transportFactory, + TProtocolFactory protocolFactory) { + this(processorFactory, serverTransport, + transportFactory, transportFactory, + protocolFactory, protocolFactory); + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + } + + public TThreadPoolServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory) { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + options_ = new Options(); + executorService_ = Executors.newCachedThreadPool(); + } + + public TThreadPoolServer(TProcessor processor, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) { + this(new TProcessorFactory(processor), serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory, + options); + } + + public TThreadPoolServer(TProcessorFactory processorFactory, + TServerTransport serverTransport, + TTransportFactory inputTransportFactory, + TTransportFactory outputTransportFactory, + TProtocolFactory inputProtocolFactory, + TProtocolFactory outputProtocolFactory, + Options options) { + super(processorFactory, serverTransport, + inputTransportFactory, outputTransportFactory, + inputProtocolFactory, outputProtocolFactory); + + executorService_ = null; + + SynchronousQueue<Runnable> executorQueue = + new SynchronousQueue<Runnable>(); + + executorService_ = new ThreadPoolExecutor(options.minWorkerThreads, + options.maxWorkerThreads, + 60, + TimeUnit.SECONDS, + executorQueue); + + options_ = options; + } + + + public void serve() { + try { + serverTransport_.listen(); + } catch (TTransportException ttx) { + LOGGER.error("Error occurred during listening.", ttx); + return; + } + + stopped_ = false; + while (!stopped_) { + int failureCount = 0; + try { + TTransport client = serverTransport_.accept(); + WorkerProcess wp = new WorkerProcess(client); + executorService_.execute(wp); + } catch (TTransportException ttx) { + if (!stopped_) { + ++failureCount; + LOGGER.warn("Transport error occurred during acceptance of message.", ttx); + } + } + } + + executorService_.shutdown(); + + // Loop until awaitTermination finally does return without a interrupted + // exception. If we don't do this, then we'll shut down prematurely. We want + // to let the executorService clear it's task queue, closing client sockets + // appropriately. + long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal); + long now = System.currentTimeMillis(); + while (timeoutMS >= 0) { + try { + executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); + break; + } catch (InterruptedException ix) { + long newnow = System.currentTimeMillis(); + timeoutMS -= (newnow - now); + now = newnow; + } + } + } + + public void stop() { + stopped_ = true; + serverTransport_.interrupt(); + } + + private class WorkerProcess implements Runnable { + + /** + * Client that this services. + */ + private TTransport client_; + + /** + * Default constructor. + * + * @param client Transport to process + */ + private WorkerProcess(TTransport client) { + client_ = client; + } + + /** + * Loops on processing a client forever + */ + public void run() { + TProcessor processor = null; + TTransport inputTransport = null; + TTransport outputTransport = null; + TProtocol inputProtocol = null; + TProtocol outputProtocol = null; + try { + processor = processorFactory_.getProcessor(client_); + inputTransport = inputTransportFactory_.getTransport(client_); + outputTransport = outputTransportFactory_.getTransport(client_); + inputProtocol = inputProtocolFactory_.getProtocol(inputTransport); + outputProtocol = outputProtocolFactory_.getProtocol(outputTransport); + // we check stopped_ first to make sure we're not supposed to be shutting + // down. this is necessary for graceful shutdown. + while (!stopped_ && processor.process(inputProtocol, outputProtocol)) {} + } catch (TTransportException ttx) { + // Assume the client died and continue silently + } catch (TException tx) { + LOGGER.error("Thrift error occurred during processing of message.", tx); + } catch (Exception x) { + LOGGER.error("Error occurred during processing of message.", x); + } + + if (inputTransport != null) { + inputTransport.close(); + } + + if (outputTransport != null) { + outputTransport.close(); + } + } + } +} diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java new file mode 100644 index 000000000..c83748ad2 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.ByteArrayInputStream; + +import org.apache.thrift.TByteArrayOutputStream; + +/** + * Socket implementation of the TTransport interface. To be commented soon! + * + */ +public class TFramedTransport extends TTransport { + + /** + * Underlying transport + */ + private TTransport transport_ = null; + + /** + * Buffer for output + */ + private final TByteArrayOutputStream writeBuffer_ = + new TByteArrayOutputStream(1024); + + /** + * Buffer for input + */ + private ByteArrayInputStream readBuffer_ = null; + + public static class Factory extends TTransportFactory { + public Factory() { + } + + public TTransport getTransport(TTransport base) { + return new TFramedTransport(base); + } + } + + /** + * Constructor wraps around another tranpsort + */ + public TFramedTransport(TTransport transport) { + transport_ = transport; + } + + public void open() throws TTransportException { + transport_.open(); + } + + public boolean isOpen() { + return transport_.isOpen(); + } + + public void close() { + transport_.close(); + } + + public int read(byte[] buf, int off, int len) throws TTransportException { + if (readBuffer_ != null) { + int got = readBuffer_.read(buf, off, len); + if (got > 0) { + return got; + } + } + + // Read another frame of data + readFrame(); + + return readBuffer_.read(buf, off, len); + } + + private void readFrame() throws TTransportException { + byte[] i32rd = new byte[4]; + transport_.readAll(i32rd, 0, 4); + int size = + ((i32rd[0] & 0xff) << 24) | + ((i32rd[1] & 0xff) << 16) | + ((i32rd[2] & 0xff) << 8) | + ((i32rd[3] & 0xff)); + + byte[] buff = new byte[size]; + transport_.readAll(buff, 0, size); + readBuffer_ = new ByteArrayInputStream(buff); + } + + public void write(byte[] buf, int off, int len) throws TTransportException { + writeBuffer_.write(buf, off, len); + } + + public void flush() throws TTransportException { + byte[] buf = writeBuffer_.get(); + int len = writeBuffer_.len(); + writeBuffer_.reset(); + + byte[] i32out = new byte[4]; + i32out[0] = (byte)(0xff & (len >> 24)); + i32out[1] = (byte)(0xff & (len >> 16)); + i32out[2] = (byte)(0xff & (len >> 8)); + i32out[3] = (byte)(0xff & (len)); + transport_.write(i32out, 0, 4); + transport_.write(buf, 0, len); + transport_.flush(); + } +} diff --git a/lib/java/src/org/apache/thrift/transport/THttpClient.java b/lib/java/src/org/apache/thrift/transport/THttpClient.java new file mode 100644 index 000000000..419235310 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/THttpClient.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.IOException; + +import java.net.URL; +import java.net.HttpURLConnection; +import java.util.HashMap; +import java.util.Map; + +/** + * HTTP implementation of the TTransport interface. Used for working with a + * Thrift web services implementation. + * + */ +public class THttpClient extends TTransport { + + private URL url_ = null; + + private final ByteArrayOutputStream requestBuffer_ = + new ByteArrayOutputStream(); + + private InputStream inputStream_ = null; + + private int connectTimeout_ = 0; + + private int readTimeout_ = 0; + + private Map<String,String> customHeaders_ = null; + + public THttpClient(String url) throws TTransportException { + try { + url_ = new URL(url); + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public void setConnectTimeout(int timeout) { + connectTimeout_ = timeout; + } + + public void setReadTimeout(int timeout) { + readTimeout_ = timeout; + } + + public void setCustomHeaders(Map<String,String> headers) { + customHeaders_ = headers; + } + + public void setCustomHeader(String key, String value) { + if (customHeaders_ == null) { + customHeaders_ = new HashMap<String, String>(); + } + customHeaders_.put(key, value); + } + + public void open() {} + + public void close() { + if (null != inputStream_) { + try { + inputStream_.close(); + } catch (IOException ioe) { + ; + } + inputStream_ = null; + } + } + + public boolean isOpen() { + return true; + } + + public int read(byte[] buf, int off, int len) throws TTransportException { + if (inputStream_ == null) { + throw new TTransportException("Response buffer is empty, no request."); + } + try { + int ret = inputStream_.read(buf, off, len); + if (ret == -1) { + throw new TTransportException("No more data available."); + } + return ret; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public void write(byte[] buf, int off, int len) { + requestBuffer_.write(buf, off, len); + } + + public void flush() throws TTransportException { + // Extract request and reset buffer + byte[] data = requestBuffer_.toByteArray(); + requestBuffer_.reset(); + + try { + // Create connection object + HttpURLConnection connection = (HttpURLConnection)url_.openConnection(); + + // Timeouts, only if explicitly set + if (connectTimeout_ > 0) { + connection.setConnectTimeout(connectTimeout_); + } + if (readTimeout_ > 0) { + connection.setReadTimeout(readTimeout_); + } + + // Make the request + connection.setRequestMethod("POST"); + connection.setRequestProperty("Content-Type", "application/x-thrift"); + connection.setRequestProperty("Accept", "application/x-thrift"); + connection.setRequestProperty("User-Agent", "Java/THttpClient"); + if (customHeaders_ != null) { + for (Map.Entry<String, String> header : customHeaders_.entrySet()) { + connection.setRequestProperty(header.getKey(), header.getValue()); + } + } + connection.setDoOutput(true); + connection.connect(); + connection.getOutputStream().write(data); + + int responseCode = connection.getResponseCode(); + if (responseCode != HttpURLConnection.HTTP_OK) { + throw new TTransportException("HTTP Response code: " + responseCode); + } + + // Read the responses + inputStream_ = connection.getInputStream(); + + } catch (IOException iox) { + throw new TTransportException(iox); + } + } +} diff --git a/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java new file mode 100644 index 000000000..89cdb5828 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TIOStreamTransport.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** + * This is the most commonly used base transport. It takes an InputStream + * and an OutputStream and uses those to perform all transport operations. + * This allows for compatibility with all the nice constructs Java already + * has to provide a variety of types of streams. + * + */ +public class TIOStreamTransport extends TTransport { + + private static final Logger LOGGER = Logger.getLogger(TIOStreamTransport.class.getName()); + + /** Underlying inputStream */ + protected InputStream inputStream_ = null; + + /** Underlying outputStream */ + protected OutputStream outputStream_ = null; + + /** + * Subclasses can invoke the default constructor and then assign the input + * streams in the open method. + */ + protected TIOStreamTransport() {} + + /** + * Input stream constructor. + * + * @param is Input stream to read from + */ + public TIOStreamTransport(InputStream is) { + inputStream_ = is; + } + + /** + * Output stream constructor. + * + * @param os Output stream to read from + */ + public TIOStreamTransport(OutputStream os) { + outputStream_ = os; + } + + /** + * Two-way stream constructor. + * + * @param is Input stream to read from + * @param os Output stream to read from + */ + public TIOStreamTransport(InputStream is, OutputStream os) { + inputStream_ = is; + outputStream_ = os; + } + + /** + * The streams must already be open at construction time, so this should + * always return true. + * + * @return true + */ + public boolean isOpen() { + return true; + } + + /** + * The streams must already be open. This method does nothing. + */ + public void open() throws TTransportException {} + + /** + * Closes both the input and output streams. + */ + public void close() { + if (inputStream_ != null) { + try { + inputStream_.close(); + } catch (IOException iox) { + LOGGER.warn("Error closing input stream.", iox); + } + inputStream_ = null; + } + if (outputStream_ != null) { + try { + outputStream_.close(); + } catch (IOException iox) { + LOGGER.warn("Error closing output stream.", iox); + } + outputStream_ = null; + } + } + + /** + * Reads from the underlying input stream if not null. + */ + public int read(byte[] buf, int off, int len) throws TTransportException { + if (inputStream_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot read from null inputStream"); + } + try { + return inputStream_.read(buf, off, len); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } + + /** + * Writes to the underlying output stream if not null. + */ + public void write(byte[] buf, int off, int len) throws TTransportException { + if (outputStream_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot write to null outputStream"); + } + try { + outputStream_.write(buf, off, len); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } + + /** + * Flushes the underlying output stream if not null. + */ + public void flush() throws TTransportException { + if (outputStream_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot flush null outputStream"); + } + try { + outputStream_.flush(); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } +} diff --git a/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java new file mode 100644 index 000000000..886fcbf62 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TMemoryBuffer.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import org.apache.thrift.TByteArrayOutputStream; +import java.io.UnsupportedEncodingException; + +/** + * Memory buffer-based implementation of the TTransport interface. + * + */ +public class TMemoryBuffer extends TTransport { + + /** + * + */ + public TMemoryBuffer(int size) { + arr_ = new TByteArrayOutputStream(size); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void open() { + /* Do nothing */ + } + + @Override + public void close() { + /* Do nothing */ + } + + @Override + public int read(byte[] buf, int off, int len) { + byte[] src = arr_.get(); + int amtToRead = (len > arr_.len() - pos_ ? arr_.len() - pos_ : len); + if (amtToRead > 0) { + System.arraycopy(src, pos_, buf, off, amtToRead); + pos_ += amtToRead; + } + return amtToRead; + } + + @Override + public void write(byte[] buf, int off, int len) { + arr_.write(buf, off, len); + } + + /** + * Output the contents of the memory buffer as a String, using the supplied + * encoding + * @param enc the encoding to use + * @return the contents of the memory buffer as a String + */ + public String toString(String enc) throws UnsupportedEncodingException { + return arr_.toString(enc); + } + + public String inspect() { + String buf = ""; + byte[] bytes = arr_.toByteArray(); + for (int i = 0; i < bytes.length; i++) { + buf += (pos_ == i ? "==>" : "" ) + Integer.toHexString(bytes[i] & 0xff) + " "; + } + return buf; + } + + // The contents of the buffer + private TByteArrayOutputStream arr_; + + // Position to read next byte from + private int pos_; + + public int length() { + return arr_.size(); + } +} + diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java new file mode 100644 index 000000000..571adbff3 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.transport; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +/** + * Wrapper around ServerSocketChannel + */ +public class TNonblockingServerSocket extends TNonblockingServerTransport { + + /** + * This channel is where all the nonblocking magic happens. + */ + private ServerSocketChannel serverSocketChannel = null; + + /** + * Underlying serversocket object + */ + private ServerSocket serverSocket_ = null; + + /** + * Port to listen on + */ + private int port_ = 0; + + /** + * Timeout for client sockets from accept + */ + private int clientTimeout_ = 0; + + /** + * Creates a server socket from underlying socket object + */ + // public TNonblockingServerSocket(ServerSocket serverSocket) { + // this(serverSocket, 0); + // } + + /** + * Creates a server socket from underlying socket object + */ + // public TNonblockingServerSocket(ServerSocket serverSocket, int clientTimeout) { + // serverSocket_ = serverSocket; + // clientTimeout_ = clientTimeout; + // } + + /** + * Creates just a port listening server socket + */ + public TNonblockingServerSocket(int port) throws TTransportException { + this(port, 0); + } + + /** + * Creates just a port listening server socket + */ + public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException { + port_ = port; + clientTimeout_ = clientTimeout; + try { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.configureBlocking(false); + + // Make server socket + serverSocket_ = serverSocketChannel.socket(); + // Prevent 2MSL delay problem on server restarts + serverSocket_.setReuseAddress(true); + // Bind to listening port + serverSocket_.bind(new InetSocketAddress(port_)); + } catch (IOException ioe) { + serverSocket_ = null; + throw new TTransportException("Could not create ServerSocket on port " + port + "."); + } + } + + public void listen() throws TTransportException { + // Make sure not to block on accept + if (serverSocket_ != null) { + try { + serverSocket_.setSoTimeout(0); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + } + + protected TNonblockingSocket acceptImpl() throws TTransportException { + if (serverSocket_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); + } + try { + SocketChannel socketChannel = serverSocketChannel.accept(); + if (socketChannel == null) { + return null; + } + + TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel); + tsocket.setTimeout(clientTimeout_); + return tsocket; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public void registerSelector(Selector selector) { + try { + // Register the server socket channel, indicating an interest in + // accepting new connections + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + } catch (ClosedChannelException e) { + // this shouldn't happen, ideally... + // TODO: decide what to do with this. + } + } + + public void close() { + if (serverSocket_ != null) { + try { + serverSocket_.close(); + } catch (IOException iox) { + System.err.println("WARNING: Could not close server socket: " + + iox.getMessage()); + } + serverSocket_ = null; + } + } + + public void interrupt() { + // The thread-safeness of this is dubious, but Java documentation suggests + // that it is safe to do this from a different thread context + close(); + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerTransport.java new file mode 100644 index 000000000..ba45b09dc --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerTransport.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.transport; + +import java.nio.channels.Selector; + +/** + * Server transport that can be operated in a nonblocking fashion. + */ +public abstract class TNonblockingServerTransport extends TServerTransport { + + public abstract void registerSelector(Selector selector); +} diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java new file mode 100644 index 000000000..bc2d53969 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.thrift.transport; + +import java.io.IOException; +import java.net.Socket; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; + +/** + * Socket implementation of the TTransport interface. To be commented soon! + */ +public class TNonblockingSocket extends TNonblockingTransport { + + private SocketChannel socketChannel = null; + + /** + * Wrapped Socket object + */ + private Socket socket_ = null; + + /** + * Remote host + */ + private String host_ = null; + + /** + * Remote port + */ + private int port_ = 0; + + /** + * Socket timeout + */ + private int timeout_ = 0; + + /** + * Constructor that takes an already created socket. + * + * @param socketChannel Already created SocketChannel object + * @throws TTransportException if there is an error setting up the streams + */ + public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException { + try { + // make it a nonblocking channel + socketChannel.configureBlocking(false); + } catch (IOException e) { + throw new TTransportException(e); + } + + this.socketChannel = socketChannel; + this.socket_ = socketChannel.socket(); + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + + /** + * Register this socket with the specified selector for both read and write + * operations. + * + * @param selector + * @return the selection key for this socket. + */ + public SelectionKey registerSelector(Selector selector, int interests) throws IOException { + // Register the new SocketChannel with our Selector, indicating + // we'd like to be notified when there's data waiting to be read + return socketChannel.register(selector, interests); + } + + /** + * Initializes the socket object + */ + private void initSocket() { + socket_ = new Socket(); + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + socket_.setSoTimeout(timeout_); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + + /** + * Sets the socket timeout + * + * @param timeout Milliseconds timeout + */ + public void setTimeout(int timeout) { + timeout_ = timeout; + try { + socket_.setSoTimeout(timeout); + } catch (SocketException sx) { + sx.printStackTrace(); + } + } + + /** + * Returns a reference to the underlying socket. + */ + public Socket getSocket() { + if (socket_ == null) { + initSocket(); + } + return socket_; + } + + /** + * Checks whether the socket is connected. + */ + public boolean isOpen() { + if (socket_ == null) { + return false; + } + return socket_.isConnected(); + } + + /** + * Connects the socket, creating a new socket object if necessary. + */ + public void open() throws TTransportException { + throw new RuntimeException("Not implemented yet"); + } + + /** + * Perform a nonblocking read into buffer. + */ + public int read(ByteBuffer buffer) throws IOException { + return socketChannel.read(buffer); + } + + + /** + * Reads from the underlying input stream if not null. + */ + public int read(byte[] buf, int off, int len) throws TTransportException { + if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) { + throw new TTransportException(TTransportException.NOT_OPEN, + "Cannot read from write-only socket channel"); + } + try { + return socketChannel.read(ByteBuffer.wrap(buf, off, len)); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } + + /** + * Perform a nonblocking write of the data in buffer; + */ + public int write(ByteBuffer buffer) throws IOException { + return socketChannel.write(buffer); + } + + /** + * Writes to the underlying output stream if not null. + */ + public void write(byte[] buf, int off, int len) throws TTransportException { + if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) { + throw new TTransportException(TTransportException.NOT_OPEN, + "Cannot write to write-only socket channel"); + } + try { + socketChannel.write(ByteBuffer.wrap(buf, off, len)); + } catch (IOException iox) { + throw new TTransportException(TTransportException.UNKNOWN, iox); + } + } + + /** + * Flushes the underlying output stream if not null. + */ + public void flush() throws TTransportException { + // Not supported by SocketChannel. + } + + /** + * Closes the socket. + */ + public void close() { + try { + socketChannel.close(); + } catch (IOException e) { + // silently ignore. + } + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java new file mode 100644 index 000000000..517eacb74 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import java.io.IOException; +import java.nio.channels.Selector; +import java.nio.channels.SelectionKey; +import java.nio.ByteBuffer; + +public abstract class TNonblockingTransport extends TTransport { + public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException; + public abstract int read(ByteBuffer buffer) throws IOException; + public abstract int write(ByteBuffer buffer) throws IOException; +} diff --git a/lib/java/src/org/apache/thrift/transport/TServerSocket.java b/lib/java/src/org/apache/thrift/transport/TServerSocket.java new file mode 100644 index 000000000..796cd659c --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TServerSocket.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; + +/** + * Wrapper around ServerSocket for Thrift. + * + */ +public class TServerSocket extends TServerTransport { + + private static final Logger LOGGER = Logger.getLogger(TServerSocket.class.getName()); + + /** + * Underlying serversocket object + */ + private ServerSocket serverSocket_ = null; + + /** + * Port to listen on + */ + private int port_ = 0; + + /** + * Timeout for client sockets from accept + */ + private int clientTimeout_ = 0; + + /** + * Creates a server socket from underlying socket object + */ + public TServerSocket(ServerSocket serverSocket) { + this(serverSocket, 0); + } + + /** + * Creates a server socket from underlying socket object + */ + public TServerSocket(ServerSocket serverSocket, int clientTimeout) { + serverSocket_ = serverSocket; + clientTimeout_ = clientTimeout; + } + + /** + * Creates just a port listening server socket + */ + public TServerSocket(int port) throws TTransportException { + this(port, 0); + } + + /** + * Creates just a port listening server socket + */ + public TServerSocket(int port, int clientTimeout) throws TTransportException { + this(new InetSocketAddress(port), clientTimeout); + port_ = port; + } + + public TServerSocket(InetSocketAddress bindAddr) throws TTransportException { + this(bindAddr, 0); + } + + public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException { + clientTimeout_ = clientTimeout; + try { + // Make server socket + serverSocket_ = new ServerSocket(); + // Prevent 2MSL delay problem on server restarts + serverSocket_.setReuseAddress(true); + // Bind to listening port + serverSocket_.bind(bindAddr); + } catch (IOException ioe) { + serverSocket_ = null; + throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + "."); + } + } + + public void listen() throws TTransportException { + // Make sure not to block on accept + if (serverSocket_ != null) { + try { + serverSocket_.setSoTimeout(0); + } catch (SocketException sx) { + LOGGER.error("Could not set socket timeout.", sx); + } + } + } + + protected TSocket acceptImpl() throws TTransportException { + if (serverSocket_ == null) { + throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket."); + } + try { + Socket result = serverSocket_.accept(); + TSocket result2 = new TSocket(result); + result2.setTimeout(clientTimeout_); + return result2; + } catch (IOException iox) { + throw new TTransportException(iox); + } + } + + public void close() { + if (serverSocket_ != null) { + try { + serverSocket_.close(); + } catch (IOException iox) { + LOGGER.warn("Could not close server socket.", iox); + } + serverSocket_ = null; + } + } + + public void interrupt() { + // The thread-safeness of this is dubious, but Java documentation suggests + // that it is safe to do this from a different thread context + close(); + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TServerTransport.java b/lib/java/src/org/apache/thrift/transport/TServerTransport.java new file mode 100644 index 000000000..17ff86bec --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TServerTransport.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +/** + * Server transport. Object which provides client transports. + * + */ +public abstract class TServerTransport { + + public abstract void listen() throws TTransportException; + + public final TTransport accept() throws TTransportException { + TTransport transport = acceptImpl(); + if (transport == null) { + throw new TTransportException("accept() may not return NULL"); + } + return transport; + } + + public abstract void close(); + + protected abstract TTransport acceptImpl() throws TTransportException; + + /** + * Optional method implementation. This signals to the server transport + * that it should break out of any accept() or listen() that it is currently + * blocked on. This method, if implemented, MUST be thread safe, as it may + * be called from a different thread context than the other TServerTransport + * methods. + */ + public void interrupt() {} + +} diff --git a/lib/java/src/org/apache/thrift/transport/TSocket.java b/lib/java/src/org/apache/thrift/transport/TSocket.java new file mode 100644 index 000000000..cdf1bcc4b --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TSocket.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import org.apache.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketException; + +/** + * Socket implementation of the TTransport interface. To be commented soon! + * + */ +public class TSocket extends TIOStreamTransport { + + private static final Logger LOGGER = Logger.getLogger(TSocket.class.getName()); + + /** + * Wrapped Socket object + */ + private Socket socket_ = null; + + /** + * Remote host + */ + private String host_ = null; + + /** + * Remote port + */ + private int port_ = 0; + + /** + * Socket timeout + */ + private int timeout_ = 0; + + /** + * Constructor that takes an already created socket. + * + * @param socket Already created socket object + * @throws TTransportException if there is an error setting up the streams + */ + public TSocket(Socket socket) throws TTransportException { + socket_ = socket; + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + } catch (SocketException sx) { + LOGGER.warn("Could not configure socket.", sx); + } + + if (isOpen()) { + try { + inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); + outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); + } catch (IOException iox) { + close(); + throw new TTransportException(TTransportException.NOT_OPEN, iox); + } + } + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param host Remote host + * @param port Remote port + */ + public TSocket(String host, int port) { + this(host, port, 0); + } + + /** + * Creates a new unconnected socket that will connect to the given host + * on the given port. + * + * @param host Remote host + * @param port Remote port + * @param timeout Socket timeout + */ + public TSocket(String host, int port, int timeout) { + host_ = host; + port_ = port; + timeout_ = timeout; + initSocket(); + } + + /** + * Initializes the socket object + */ + private void initSocket() { + socket_ = new Socket(); + try { + socket_.setSoLinger(false, 0); + socket_.setTcpNoDelay(true); + socket_.setSoTimeout(timeout_); + } catch (SocketException sx) { + LOGGER.error("Could not configure socket.", sx); + } + } + + /** + * Sets the socket timeout + * + * @param timeout Milliseconds timeout + */ + public void setTimeout(int timeout) { + timeout_ = timeout; + try { + socket_.setSoTimeout(timeout); + } catch (SocketException sx) { + LOGGER.warn("Could not set socket timeout.", sx); + } + } + + /** + * Returns a reference to the underlying socket. + */ + public Socket getSocket() { + if (socket_ == null) { + initSocket(); + } + return socket_; + } + + /** + * Checks whether the socket is connected. + */ + public boolean isOpen() { + if (socket_ == null) { + return false; + } + return socket_.isConnected(); + } + + /** + * Connects the socket, creating a new socket object if necessary. + */ + public void open() throws TTransportException { + if (isOpen()) { + throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected."); + } + + if (host_.length() == 0) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host."); + } + if (port_ <= 0) { + throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port."); + } + + if (socket_ == null) { + initSocket(); + } + + try { + socket_.connect(new InetSocketAddress(host_, port_)); + inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024); + outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024); + } catch (IOException iox) { + close(); + throw new TTransportException(TTransportException.NOT_OPEN, iox); + } + } + + /** + * Closes the socket. + */ + public void close() { + // Close the underlying streams + super.close(); + + // Close the socket + if (socket_ != null) { + try { + socket_.close(); + } catch (IOException iox) { + LOGGER.warn("Could not close socket.", iox); + } + socket_ = null; + } + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TTransport.java b/lib/java/src/org/apache/thrift/transport/TTransport.java new file mode 100644 index 000000000..a6c047bb5 --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TTransport.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +/** + * Generic class that encapsulates the I/O layer. This is basically a thin + * wrapper around the combined functionality of Java input/output streams. + * + */ +public abstract class TTransport { + + /** + * Queries whether the transport is open. + * + * @return True if the transport is open. + */ + public abstract boolean isOpen(); + + /** + * Is there more data to be read? + * + * @return True if the remote side is still alive and feeding us + */ + public boolean peek() { + return isOpen(); + } + + /** + * Opens the transport for reading/writing. + * + * @throws TTransportException if the transport could not be opened + */ + public abstract void open() + throws TTransportException; + + /** + * Closes the transport. + */ + public abstract void close(); + + /** + * Reads up to len bytes into buffer buf, starting att offset off. + * + * @param buf Array to read into + * @param off Index to start reading at + * @param len Maximum number of bytes to read + * @return The number of bytes actually read + * @throws TTransportException if there was an error reading data + */ + public abstract int read(byte[] buf, int off, int len) + throws TTransportException; + + /** + * Guarantees that all of len bytes are actually read off the transport. + * + * @param buf Array to read into + * @param off Index to start reading at + * @param len Maximum number of bytes to read + * @return The number of bytes actually read, which must be equal to len + * @throws TTransportException if there was an error reading data + */ + public int readAll(byte[] buf, int off, int len) + throws TTransportException { + int got = 0; + int ret = 0; + while (got < len) { + ret = read(buf, off+got, len-got); + if (ret <= 0) { + throw new TTransportException("Cannot read. Remote side has closed. Tried to read " + len + " bytes, but only got " + got + " bytes."); + } + got += ret; + } + return got; + } + + /** + * Writes the buffer to the output + * + * @param buf The output data buffer + * @throws TTransportException if an error occurs writing data + */ + public void write(byte[] buf) throws TTransportException { + write(buf, 0, buf.length); + } + + /** + * Writes up to len bytes from the buffer. + * + * @param buf The output data buffer + * @param off The offset to start writing from + * @param len The number of bytes to write + * @throws TTransportException if there was an error writing data + */ + public abstract void write(byte[] buf, int off, int len) + throws TTransportException; + + /** + * Flush any pending data out of a transport buffer. + * + * @throws TTransportException if there was an error writing out data. + */ + public void flush() + throws TTransportException {} +} diff --git a/lib/java/src/org/apache/thrift/transport/TTransportException.java b/lib/java/src/org/apache/thrift/transport/TTransportException.java new file mode 100644 index 000000000..d08f3b02b --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TTransportException.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +import org.apache.thrift.TException; + +/** + * Transport exceptions. + * + */ +public class TTransportException extends TException { + + private static final long serialVersionUID = 1L; + + public static final int UNKNOWN = 0; + public static final int NOT_OPEN = 1; + public static final int ALREADY_OPEN = 2; + public static final int TIMED_OUT = 3; + public static final int END_OF_FILE = 4; + + protected int type_ = UNKNOWN; + + public TTransportException() { + super(); + } + + public TTransportException(int type) { + super(); + type_ = type; + } + + public TTransportException(int type, String message) { + super(message); + type_ = type; + } + + public TTransportException(String message) { + super(message); + } + + public TTransportException(int type, Throwable cause) { + super(cause); + type_ = type; + } + + public TTransportException(Throwable cause) { + super(cause); + } + + public TTransportException(String message, Throwable cause) { + super(message, cause); + } + + public TTransportException(int type, String message, Throwable cause) { + super(message, cause); + type_ = type; + } + + public int getType() { + return type_; + } + +} diff --git a/lib/java/src/org/apache/thrift/transport/TTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java new file mode 100644 index 000000000..3e71630ae --- /dev/null +++ b/lib/java/src/org/apache/thrift/transport/TTransportFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.thrift.transport; + +/** + * Factory class used to create wrapped instance of Transports. + * This is used primarily in servers, which get Transports from + * a ServerTransport and then may want to mutate them (i.e. create + * a BufferedTransport from the underlying base transport) + * + */ +public class TTransportFactory { + + /** + * Return a wrapped instance of the base Transport. + * + * @param trans The base transport + * @return Wrapped Transport + */ + public TTransport getTransport(TTransport trans) { + return trans; + } + +} |