diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-09-28 17:27:50 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-09-28 17:27:50 +0000 |
| commit | fb6cfba978c4d175f66a0355c0e3c21aba318235 (patch) | |
| tree | 1677e998e7c464c66f4eb6b107a9c1d900cb10d9 /qpid/java | |
| parent | 8d69b55ef5375fbdddb5f1ea76936e2304a89220 (diff) | |
| download | qpid-python-fb6cfba978c4d175f66a0355c0e3c21aba318235.tar.gz | |
QPID-3906 List Message support for JMS.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1391565 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
12 files changed, 1288 insertions, 10 deletions
diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/ListReceiver.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/ListReceiver.java new file mode 100644 index 0000000000..b12cfab9de --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/ListReceiver.java @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.StreamMessage; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.jms.MessageEOFException; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +import org.apache.qpid.jms.ListMessage; + +import java.util.Enumeration; +import java.util.Iterator; + +public class ListReceiver { + + public static void main(String[] args) throws Exception + { + if (args.length != 1) { + System.out.println("Usage: java org.apache.qpid.example.ListReceiver <-l | -m | -s>"); + System.out.println("where:"); + System.out.println("\t-l\tAccept ListMessage and print it"); + System.out.println("\t-m\tAccept ListMessage as a MapMessage"); + System.out.println("\t-s\tAccept ListMessage as a StreamMessage"); + return; + } + + Connection connection = + new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); + + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); + MessageConsumer consumer = session.createConsumer(queue); + + if (args[0].equals("-l")) { + System.out.println("Receiving as ListMessage"); + ListMessage m = (ListMessage)consumer.receive(); + System.out.println(m); + System.out.println("=========================================="); + System.out.println("Printing list contents:"); + Iterator i = m.iterator(); + while(i.hasNext()) + System.out.println(i.next()); + } + else if (args[0].equals("-m")) { + System.out.println("Receiving as MapMessage"); + MapMessage m = (MapMessage)consumer.receive(); + System.out.println(m); + System.out.println("=========================================="); + System.out.println("Printing map contents:"); + Enumeration keys = m.getMapNames(); + while(keys.hasMoreElements()) { + String key = (String)keys.nextElement(); + System.out.println(key + " => " + m.getObject(key)); + } + } + else if (args[0].equals("-s")) { + System.out.println("Receiving as StreamMessage"); + StreamMessage m = (StreamMessage)consumer.receive(); + System.out.println(m); + System.out.println("=========================================="); + System.out.println("Printing stream contents:"); + try { + while(true) + System.out.println(m.readObject()); + } + catch (MessageEOFException e) { + // DONE + } + } + + connection.close(); + } +} diff --git a/qpid/java/client/example/src/main/java/org/apache/qpid/example/ListSender.java b/qpid/java/client/example/src/main/java/org/apache/qpid/example/ListSender.java new file mode 100644 index 0000000000..fe2c1ec472 --- /dev/null +++ b/qpid/java/client/example/src/main/java/org/apache/qpid/example/ListSender.java @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.example; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.client.AMQAnyDestination; +import org.apache.qpid.client.AMQConnection; + +import org.apache.qpid.jms.ListMessage; + + +public class ListSender { + + public static void main(String[] args) throws Exception + { + Connection connection = + new AMQConnection("amqp://guest:guest@test/?brokerlist='tcp://localhost:5672'"); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = new AMQAnyDestination("ADDR:message_queue; {create: always}"); + MessageProducer producer = session.createProducer(queue); + + ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage(); + m.setIntProperty("Id", 987654321); + m.setStringProperty("name", "Widget"); + m.setDoubleProperty("price", 0.99); + + List<String> colors = new ArrayList<String>(); + colors.add("red"); + colors.add("green"); + colors.add("white"); + m.add(colors); + + Map<String,Double> dimensions = new HashMap<String,Double>(); + dimensions.put("length",10.2); + dimensions.put("width",5.1); + dimensions.put("depth",2.0); + m.add(dimensions); + + List<List<Integer>> parts = new ArrayList<List<Integer>>(); + parts.add(Arrays.asList(new Integer[] {1,2,5})); + parts.add(Arrays.asList(new Integer[] {8,2,5})); + m.add(parts); + + Map<String,Object> specs = new HashMap<String,Object>(); + specs.put("colours", colors); + specs.put("dimensions", dimensions); + specs.put("parts", parts); + m.add(specs); + + producer.send((Message)m); + System.out.println("Sent: " + m); + connection.close(); + } + +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java index d1c1554705..6758c2bf1e 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -179,6 +179,10 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect // new amqp-0-10 encoded format. private boolean _useLegacyMapMessageFormat; + // Indicates whether to use the old stream message format or the + // new amqp-0-10 list encoded format. + private boolean _useLegacyStreamMessageFormat; + //used to track the last failover time for //Address resolution purposes private volatile long _lastFailoverTime = 0; @@ -294,6 +298,17 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect _useLegacyMapMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_MAP_MESSAGE_FORMAT); } + if (connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT) != null) + { + _useLegacyStreamMessageFormat = Boolean.parseBoolean( + connectionURL.getOption(ConnectionURL.OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT)); + } + else + { + // use the default value set for all connections + _useLegacyStreamMessageFormat = Boolean.getBoolean(ClientProperties.USE_LEGACY_STREAM_MESSAGE_FORMAT); + } + String amqpVersion = System.getProperty((ClientProperties.AMQP_VERSION), "0-10"); if (_logger.isDebugEnabled()) { @@ -1498,6 +1513,11 @@ public class AMQConnection extends Closeable implements Connection, QueueConnect return _useLegacyMapMessageFormat; } + public boolean isUseLegacyStreamMessageFormat() + { + return _useLegacyStreamMessageFormat; + } + private void verifyClientID() throws AMQException { if (Boolean.getBoolean(ClientProperties.QPID_VERIFY_CLIENT_ID)) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 9af06eeaf4..2aef0625bf 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -35,6 +35,7 @@ import org.apache.qpid.client.failover.FailoverProtectedOperation; import org.apache.qpid.client.failover.FailoverRetrySupport; import org.apache.qpid.client.message.AMQMessageDelegateFactory; import org.apache.qpid.client.message.AMQPEncodedMapMessage; +import org.apache.qpid.client.message.AMQPEncodedListMessage; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.CloseConsumerMessage; import org.apache.qpid.client.message.JMSBytesMessage; @@ -50,6 +51,7 @@ import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.jms.Session; +import org.apache.qpid.jms.ListMessage; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.thread.Threading; import org.apache.qpid.transport.SessionException; @@ -123,6 +125,8 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic private final boolean _useAMQPEncodedMapMessage; + private final boolean _useAMQPEncodedStreamMessage; + /** * Flag indicating to start dispatcher as a daemon thread */ @@ -398,6 +402,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) { _useAMQPEncodedMapMessage = con == null ? true : !con.isUseLegacyMapMessageFormat(); + _useAMQPEncodedStreamMessage = con == null ? true : !con.isUseLegacyStreamMessageFormat(); _strictAMQP = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP, STRICT_AMQP_DEFAULT)); _strictAMQPFATAL = Boolean.parseBoolean(System.getProperties().getProperty(STRICT_AMQP_FATAL, STRICT_AMQP_FATAL_DEFAULT)); @@ -1111,6 +1116,14 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic } } + public ListMessage createListMessage() throws JMSException + { + checkNotClosed(); + AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; + } + public MapMessage createMapMessage() throws JMSException { checkNotClosed(); @@ -1353,17 +1366,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public StreamMessage createStreamMessage() throws JMSException { - // This method needs to be improved. Throwables only arrive here from the mina : exceptionRecived - // calls through connection.closeAllSessions which is also called by the public connection.close() - // with a null cause - // When we are closing the Session due to a protocol session error we simply create a new AMQException - // with the correct error code and text this is cleary WRONG as the instanceof check below will fail. - // We need to determin here if the connection should be - - synchronized (getFailoverMutex()) + checkNotClosed(); + if (_useAMQPEncodedMapMessage) + { + AMQPEncodedListMessage msg = new AMQPEncodedListMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; + } + else { - checkNotClosed(); - JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory()); msg.setAMQSession(this); return msg; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessage.java new file mode 100644 index 0000000000..a6802c8dec --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessage.java @@ -0,0 +1,949 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.client.message; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.qpid.AMQException; + +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.transport.codec.BBEncoder; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageEOFException; +import java.lang.NumberFormatException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Enumeration; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +public class AMQPEncodedListMessage extends JMSStreamMessage implements + org.apache.qpid.jms.ListMessage, javax.jms.MapMessage +{ + private static final Logger _logger = LoggerFactory + .getLogger(AMQPEncodedListMessage.class); + + public static final String MIME_TYPE = "amqp/list"; + + private List<Object> _list = new ArrayList<Object>(); + + public AMQPEncodedListMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException + { + super(delegateFactory); + currentIndex = 0; + } + + AMQPEncodedListMessage(AMQMessageDelegate delegate, ByteBuffer data) + throws AMQException + { + super(delegate, data); + if (data != null) + { + try + { + populateListFromData(data); + } + catch (JMSException je) + { + throw new AMQException(null, + "Error populating ListMessage from ByteBuffer", je); + } + } + currentIndex = 0; + } + + public String toBodyString() throws JMSException + { + return _list == null ? "" : _list.toString(); + } + + protected String getMimeType() + { + return MIME_TYPE; + } + + /* ListMessage Implementation. */ + public boolean add(Object a) throws JMSException + { + checkWritable(); + checkAllowedValue(a); + checkIfElementIsOfCorrectType(a); + try + { + return _list.add(a); + } + catch (Exception e) + { + MessageFormatException ex = new MessageFormatException("Error adding to ListMessage"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + + } + } + + public void add(int index, Object element) throws JMSException + { + checkWritable(); + checkAllowedValue(element); + try + { + _list.add(index, element); + } + catch (Exception e) + { + MessageFormatException ex = new MessageFormatException("Error adding to ListMessage at " + + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public boolean contains(Object o) throws JMSException + { + try + { + return _list.contains(o); + } + catch (Exception e) + { + JMSException ex = new JMSException("Error when looking up object"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public Object get(int index) throws JMSException + { + try + { + return _list.get(index); + } + catch (IndexOutOfBoundsException e) + { + MessageFormatException ex = new MessageFormatException( + "Error getting ListMessage element at " + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public int indexOf(Object o) + { + return _list.indexOf(o); + } + + public Iterator iterator() + { + return _list.iterator(); + } + + public Object remove(int index) throws JMSException + { + checkWritable(); + try + { + return _list.remove(index); + } + catch (IndexOutOfBoundsException e) + { + MessageFormatException ex = new MessageFormatException( + "Error removing ListMessage element at " + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public boolean remove(Object o) throws JMSException + { + checkWritable(); + return _list.remove(o); + } + + public Object set(int index, Object element) throws JMSException + { + checkWritable(); + checkAllowedValue(element); + try + { + return _list.set(index, element); + } + catch (Exception e) + { + MessageFormatException ex = new MessageFormatException( + "Error setting ListMessage element at " + index); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + } + + public int size() + { + return _list.size(); + } + + public Object[] toArray() + { + return _list.toArray(); + } + + /* MapMessage Implementation */ + private boolean isValidIndex(int index) + { + if (index >= 0 && index < size()) + return true; + + return false; + } + + private int getValidIndex(String indexStr) throws JMSException + { + if ((indexStr == null) || indexStr.equals("")) + { + throw new IllegalArgumentException( + "Property name cannot be null, or the empty String."); + } + + int index = 0; + try + { + index = Integer.parseInt(indexStr); + } + catch (NumberFormatException e) + { + JMSException ex = new JMSException("Invalid index string"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + if (isValidIndex(index)) + return index; + + throw new MessageFormatException("Property " + indexStr + + " should be a valid index into the list of size " + size()); + } + + private void setGenericForMap(String propName, Object o) + throws JMSException + { + checkWritable(); + int index = 0; + try + { + index = Integer.parseInt(propName); + } + catch (NumberFormatException e) + { + JMSException ex = new JMSException("The property name should be a valid index"); + ex.initCause(e); + ex.setLinkedException(e); + throw ex; + } + + if (isValidIndex(index)) + remove(index); + add(index, o); + } + + public boolean getBoolean(String propName) throws JMSException + { + return getBooleanImpl(getValidIndex(propName)); + } + + public byte getByte(String propName) throws JMSException + { + return getByteImpl(getValidIndex(propName)); + } + + public short getShort(String propName) throws JMSException + { + return getShortImpl(getValidIndex(propName)); + } + + public int getInt(String propName) throws JMSException + { + return getIntImpl(getValidIndex(propName)); + } + + public long getLong(String propName) throws JMSException + { + return getLongImpl(getValidIndex(propName)); + } + + public char getChar(String propName) throws JMSException + { + return getCharImpl(getValidIndex(propName)); + + } + + public float getFloat(String propName) throws JMSException + { + return getFloatImpl(getValidIndex(propName)); + } + + public double getDouble(String propName) throws JMSException + { + return getDoubleImpl(getValidIndex(propName)); + } + + public String getString(String propName) throws JMSException + { + return getStringImpl(getValidIndex(propName)); + } + + public byte[] getBytes(String propName) throws JMSException + { + return getBytesImpl(getValidIndex(propName)); + } + + public Object getObject(String propName) throws JMSException + { + return get(getValidIndex(propName)); + } + + public Enumeration getMapNames() throws JMSException + { + List<String> names = new ArrayList<String>(); + int i = 0; + + while (i < size()) + names.add(Integer.toString(i++)); + + return Collections.enumeration(names); + } + + public void setBoolean(String propName, boolean b) throws JMSException + { + setGenericForMap(propName, b); + } + + public void setByte(String propName, byte b) throws JMSException + { + setGenericForMap(propName, b); + } + + public void setShort(String propName, short i) throws JMSException + { + setGenericForMap(propName, i); + } + + public void setChar(String propName, char c) throws JMSException + { + setGenericForMap(propName, c); + } + + public void setInt(String propName, int i) throws JMSException + { + setGenericForMap(propName, i); + } + + public void setLong(String propName, long l) throws JMSException + { + setGenericForMap(propName, l); + } + + public void setFloat(String propName, float v) throws JMSException + { + setGenericForMap(propName, v); + } + + public void setDouble(String propName, double v) throws JMSException + { + setGenericForMap(propName, v); + } + + public void setString(String propName, String string1) throws JMSException + { + setGenericForMap(propName, string1); + } + + public void setBytes(String propName, byte[] bytes) throws JMSException + { + setGenericForMap(propName, bytes); + } + + public void setBytes(String propName, byte[] bytes, int offset, int length) + throws JMSException + { + if ((offset == 0) && (length == bytes.length)) + { + setBytes(propName, bytes); + } + else + { + byte[] newBytes = new byte[length]; + System.arraycopy(bytes, offset, newBytes, 0, length); + setBytes(propName, newBytes); + } + } + + public void setObject(String propName, Object value) throws JMSException + { + checkAllowedValue(value); + setGenericForMap(propName, value); + } + + public boolean itemExists(String propName) throws JMSException + { + return isValidIndex(Integer.parseInt(propName)); + } + + // StreamMessage methods + + private int currentIndex; + + private static final String MESSAGE_EOF_EXCEPTION = "End of Stream (ListMessage) at index: "; + + private void setGenericForStream(Object o) throws JMSException + { + checkWritable(); + add(o); + currentIndex++; + } + + @Override + public boolean readBoolean() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getBooleanImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public byte readByte() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getByteImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public int readBytes(byte[] value) throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + { + ByteBuffer res = ByteBuffer.wrap(getBytesImpl(currentIndex++)); + res.get(value); + return value.length; + } + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public char readChar() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getCharImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public double readDouble() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getDoubleImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public float readFloat() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getFloatImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public int readInt() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getIntImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public long readLong() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getLongImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public Object readObject() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return get(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public short readShort() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getShortImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public String readString() throws JMSException + { + checkReadable(); + if (isValidIndex(currentIndex)) + return getStringImpl(currentIndex++); + + throw new MessageEOFException(MESSAGE_EOF_EXCEPTION + currentIndex); + } + + @Override + public void writeBoolean(boolean value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeByte(byte value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeBytes(byte[] value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeBytes(byte[] value, int offset, int length) + throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeChar(char value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeDouble(double value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeFloat(float value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeInt(int value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeLong(long value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeObject(Object value) throws JMSException + { + checkAllowedValue(value); + setGenericForStream(value); + } + + @Override + public void writeShort(short value) throws JMSException + { + setGenericForStream(value); + } + + @Override + public void writeString(String value) throws JMSException + { + setGenericForStream(value); + } + + // Common methods + + private void checkAllowedValue(Object value) throws MessageFormatException + { + if (((value instanceof Boolean) || (value instanceof Byte) + || (value instanceof Short) || (value instanceof Integer) + || (value instanceof Long) || (value instanceof Character) + || (value instanceof Float) || (value instanceof Double) + || (value instanceof String) || (value instanceof byte[]) + || (value instanceof List) || (value instanceof Map) + || (value instanceof UUID) || (value == null)) == false) + { + throw new MessageFormatException("Invalid value " + value + + "of type " + value.getClass().getName() + "."); + } + } + + @Override + public void reset() + { + currentIndex = 0; + setReadable(true); + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + _list.clear(); + currentIndex = 0; + setReadable(false); + } + + private boolean getBooleanImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Boolean) + { + return ((Boolean) value).booleanValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Boolean.valueOf((String) value); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + + " cannot be converted to boolean."); + } + + private byte getByteImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Byte) + { + return ((Byte) value).byteValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Byte.valueOf((String) value).byteValue(); + } catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to byte."); + } + + private short getShortImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Short) + { + return ((Short) value).shortValue(); + } + else if (value instanceof Byte) + { + return ((Byte) value).shortValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Short.valueOf((String) value).shortValue(); + } catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to short."); + } + + private int getIntImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Integer) + { + return ((Integer) value).intValue(); + } + + if (value instanceof Short) + { + return ((Short) value).intValue(); + } + + if (value instanceof Byte) + { + return ((Byte) value).intValue(); + } + + if ((value instanceof String) || (value == null)) + { + try + { + return Integer.valueOf((String) value).intValue(); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to int."); + } + + private long getLongImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Long) + { + return ((Long) value).longValue(); + } else if (value instanceof Integer) + { + return ((Integer) value).longValue(); + } + + if (value instanceof Short) + { + return ((Short) value).longValue(); + } + + if (value instanceof Byte) + { + return ((Byte) value).longValue(); + } else if ((value instanceof String) || (value == null)) + { + try + { + return Long.valueOf((String) value).longValue(); + } catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to long."); + } + + private char getCharImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Character) + { + return ((Character) value).charValue(); + } else if (value == null) + { + throw new NullPointerException("Property at " + index + + " has null value and therefore cannot " + + "be converted to char."); + } else + { + throw new MessageFormatException("Property at " + index + + " of type " + value.getClass().getName() + + " cannot be converted to a char."); + } + } + + private float getFloatImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Float) + { + return ((Float) value).floatValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Float.valueOf((String) value).floatValue(); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + " cannot be converted to float."); + } + + private double getDoubleImpl(int index) throws JMSException + { + Object value = get(index); + + if (value instanceof Double) + { + return ((Double) value).doubleValue(); + } + else if (value instanceof Float) + { + return ((Float) value).doubleValue(); + } + else if ((value instanceof String) || (value == null)) + { + try + { + return Double.valueOf((String) value).doubleValue(); + } + catch (NumberFormatException e) + { + // FALLTHROUGH to exception + } + } + + throw new MessageFormatException("Property at " + index + " of type " + + value.getClass().getName() + + " cannot be converted to double."); + } + + private String getStringImpl(int index) throws JMSException + { + Object value = get(index); + + if ((value instanceof String) || (value == null)) + { + return (String) value; + } else if (value instanceof byte[]) + { + throw new MessageFormatException("Property at " + index + + " of type byte[] " + "cannot be converted to String."); + } else + { + return value.toString(); + } + } + + private byte[] getBytesImpl(int index) throws JMSException + { + Object value = get(index); + + if ((value instanceof byte[]) || (value == null)) + { + return (byte[]) value; + } + else + { + throw new MessageFormatException("Property at " + index + + " of type " + value.getClass().getName() + + " cannot be converted to byte[]."); + } + } + + protected void populateListFromData(ByteBuffer data) throws JMSException + { + if (data != null) + { + data.rewind(); + BBDecoder decoder = new BBDecoder(); + decoder.init(data); + _list = decoder.readList(); + } + else + { + _list.clear(); + } + } + + public ByteBuffer getData() throws JMSException + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList(_list); + return encoder.segment(); + } + + public void setList(List<Object> l) + { + _list = l; + } + + public List<Object> asList() + { + return _list; + } + + private void checkIfElementIsOfCorrectType(Object obj) throws JMSException + { + if (!_list.isEmpty()) + { + if (obj.getClass() != _list.get(0).getClass()) + { + throw new MessageFormatException("List can only contain elements of the same type."+ + " The first element of the list is of type " + _list.get(0).getClass() + + " ,while the element suplied here is of type " + obj.getClass()); + } + } + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java new file mode 100644 index 0000000000..b503dccb91 --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedListMessageFactory.java @@ -0,0 +1,44 @@ +package org.apache.qpid.client.message; +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +import org.apache.qpid.AMQException; + +import javax.jms.JMSException; +import java.nio.ByteBuffer; + +public class AMQPEncodedListMessageFactory extends AbstractJMSMessageFactory +{ + @Override + protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, + ByteBuffer data) throws AMQException + { + return new AMQPEncodedListMessage(delegate,data); + } + + + public AbstractJMSMessage createMessage( + AMQMessageDelegateFactory delegateFactory) throws JMSException + { + return new AMQPEncodedListMessage(delegateFactory); + } +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java index b958d89515..b1af262580 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java @@ -44,7 +44,12 @@ public class JMSStreamMessage extends AbstractBytesTypedMessage implements Strea } + JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws AMQException + { + super(delegateFactory, data!=null); + _typedBytesContentWriter = new TypedBytesContentWriter(); + } JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java index fa39b4c93c..4154003b23 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java @@ -66,6 +66,7 @@ public class MessageFactoryRegistry mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory()); mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory()); mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory()); + mf.registerFactory(AMQPEncodedListMessage.MIME_TYPE, new AMQPEncodedListMessageFactory()); mf.registerFactory(null, mf._default); return mf; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java index 8fd6ff6d33..af79787f94 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java @@ -38,6 +38,7 @@ public interface ConnectionURL public static final String OPTIONS_SYNC_ACK = "sync_ack"; public static final String OPTIONS_SYNC_PUBLISH = "sync_publish"; public static final String OPTIONS_USE_LEGACY_MAP_MESSAGE_FORMAT = "use_legacy_map_msg_format"; + public static final String OPTIONS_USE_LEGACY_STREAM_MESSAGE_FORMAT = "use_legacy_stream_msg_format"; public static final String OPTIONS_BROKERLIST = "brokerlist"; public static final String OPTIONS_FAILOVER = "failover"; public static final String OPTIONS_FAILOVER_CYCLE = "cyclecount"; diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/ListMessage.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/ListMessage.java new file mode 100644 index 0000000000..21dd2a89ee --- /dev/null +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/ListMessage.java @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms; + +import javax.jms.JMSException; + +import java.util.Iterator; +import java.util.List; + +public interface ListMessage extends javax.jms.StreamMessage +{ + boolean add(Object e) throws JMSException; + + void add(int index, Object e) throws JMSException; + + boolean contains(Object e) throws JMSException; + + Object get(int index) throws JMSException; + + int indexOf(Object e) throws JMSException; + + Iterator<Object> iterator() throws JMSException; + + Object remove(int index) throws JMSException; + + boolean remove(Object e)throws JMSException; + + Object set(int index, Object e) throws JMSException; + + int size() throws JMSException; + + Object[] toArray() throws JMSException; + + List<Object> asList() throws JMSException; + + void setList(List<Object> l) throws JMSException; +} diff --git a/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java b/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java index b4bf2d1d85..4801f87295 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/jms/Session.java @@ -21,6 +21,7 @@ package org.apache.qpid.jms; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.jms.ListMessage; import javax.jms.Destination; import javax.jms.JMSException; @@ -100,4 +101,6 @@ public interface Session extends TopicSession, QueueSession AMQShortString getDefaultTopicExchangeName(); AMQShortString getTemporaryQueueExchangeName(); + + ListMessage createListMessage() throws JMSException; } diff --git a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java index 20e523ca97..7594d87b93 100644 --- a/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java +++ b/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java @@ -87,6 +87,8 @@ public class ClientProperties public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message"; + public static final String USE_LEGACY_STREAM_MESSAGE_FORMAT = "qpid.use_legacy_stream_message"; + public static final String AMQP_VERSION = "qpid.amqp.version"; public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id"; |
