diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-09-06 16:49:22 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-09-06 16:49:22 +0000 |
| commit | 8452b75e19e470b080e4d663d1e5f017cbc57ad7 (patch) | |
| tree | e110f1863ba823d445f35c4fd58fec0c0ba86804 | |
| parent | f2dd65475a2d6244a5fb8ebfb618bb608710ace7 (diff) | |
| download | qpid-python-8452b75e19e470b080e4d663d1e5f017cbc57ad7.tar.gz | |
QPID-6078 : [JMS AMQP 1.0 Client] Allow Message and Delivery annotations to be read and modified through the JMS API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1622888 13f79535-47bb-0310-9956-ffa450edef68
13 files changed, 1458 insertions, 128 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java index 1e774abbd6..f640dc55f5 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/Message.java @@ -21,7 +21,12 @@ package org.apache.qpid.amqp_1_0.jms; -import org.apache.qpid.amqp_1_0.messaging.MessageAttributes; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import javax.jms.JMSException; + import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Symbol; import org.apache.qpid.amqp_1_0.type.UnsignedByte; @@ -29,11 +34,6 @@ import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.UnsignedLong; import org.apache.qpid.amqp_1_0.type.UnsignedShort; -import javax.jms.JMSException; -import java.util.Date; -import java.util.List; -import java.util.Map; - public interface Message extends javax.jms.Message { @@ -118,14 +118,6 @@ public interface Message extends javax.jms.Message void setDeliveryFailures(UnsignedInteger failures); - MessageAttributes getHeaderMessageAttrs(); - - void setHeaderMessageAttrs(MessageAttributes messageAttrs); - - MessageAttributes getHeaderDeliveryAttrs(); - - void setHeaderDeliveryAttrs(MessageAttributes deliveryAttrs); - Boolean getDurable(); void setDurable(Boolean durable); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java index ed6f86a972..d780c67a76 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/AmqpMessageImpl.java @@ -18,30 +18,44 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; + import org.apache.qpid.amqp_1_0.jms.AmqpMessage; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Footer; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import java.util.*; - public class AmqpMessageImpl extends MessageImpl implements AmqpMessage { private List<Section> _sections; - protected AmqpMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List<Section> sections, - Footer footer, SessionImpl session) + protected AmqpMessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, + MessageAnnotations messageAnnotations, + Properties properties, + ApplicationProperties appProperties, + List<Section> sections, + Footer footer, + SessionImpl session) { - super(header, messageAnnotations, properties, appProperties, footer, session); + super(header, deliveryAnnotations, messageAnnotations, properties, appProperties, footer, session); _sections = sections; } protected AmqpMessageImpl(final SessionImpl session) { - super(new Header(), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), + super(new Header(), + new DeliveryAnnotations(new HashMap()), + new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), session); _sections = new ArrayList<Section>(); } @@ -65,6 +79,10 @@ public class AmqpMessageImpl extends MessageImpl implements AmqpMessage { List<Section> sections = new ArrayList<Section>(); sections.add(getHeader()); + if(getDeliveryAnnotations() != null && getDeliveryAnnotations().getValue() != null && !getDeliveryAnnotations().getValue().isEmpty()) + { + sections.add(getDeliveryAnnotations()); + } if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty()) { sections.add(getMessageAnnotations()); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java index e45c5a73fe..4fb9c511bf 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/BytesMessageImpl.java @@ -19,17 +19,32 @@ package org.apache.qpid.amqp_1_0.jms.impl; -import org.apache.qpid.amqp_1_0.jms.BytesMessage; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import java.io.*; -import java.util.*; + +import org.apache.qpid.amqp_1_0.jms.BytesMessage; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; public class BytesMessageImpl extends MessageImpl implements BytesMessage { @@ -39,10 +54,16 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage private Data _dataIn; // message created for reading - protected BytesMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Data data, - Footer footer, SessionImpl session) - { - super(header, messageAnnotations, properties, appProperties, footer, session); + protected BytesMessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, + MessageAnnotations messageAnnotations, + Properties properties, + ApplicationProperties appProperties, + Data data, + Footer footer, + SessionImpl session) + { + super(header, deliveryAnnotations, messageAnnotations, properties, appProperties, footer, session); _dataIn = data; final Binary dataBuffer = data.getValue(); _dataAsInput = new DataInputStream(new ByteArrayInputStream(dataBuffer.getArray(),dataBuffer.getArrayOffset(),dataBuffer.getLength())); @@ -53,7 +74,7 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage protected BytesMessageImpl(final SessionImpl session) { super(new Header(), - new MessageAnnotations(new HashMap()), + new DeliveryAnnotations(new HashMap()), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), @@ -524,6 +545,10 @@ public class BytesMessageImpl extends MessageImpl implements BytesMessage { List<Section> sections = new ArrayList<Section>(); sections.add(getHeader()); + if(getDeliveryAnnotations() != null && getDeliveryAnnotations().getValue() != null && !getDeliveryAnnotations().getValue().isEmpty()) + { + sections.add(getDeliveryAnnotations()); + } if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty()) { sections.add(getMessageAnnotations()); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java index cb57936d3d..acdfecfeee 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MapMessageImpl.java @@ -19,31 +19,49 @@ package org.apache.qpid.amqp_1_0.jms.impl; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jms.JMSException; +import javax.jms.MessageFormatException; + import org.apache.qpid.amqp_1_0.jms.MapMessage; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import java.util.*; - public class MapMessageImpl extends MessageImpl implements MapMessage { private Map _map; - public MapMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, Map map, + public MapMessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, + MessageAnnotations messageAnnotations, + Properties properties, + ApplicationProperties appProperties, + Map map, Footer footer, SessionImpl session) { - super(header, messageAnnotations, properties, appProperties, footer, session); + super(header, deliveryAnnotations, messageAnnotations, properties, appProperties, footer, session); _map = map; } MapMessageImpl(final SessionImpl session) { - super(new Header(), new MessageAnnotations(new HashMap()), + super(new Header(), new DeliveryAnnotations(new HashMap()), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), session); _map = new HashMap(); @@ -431,6 +449,10 @@ public class MapMessageImpl extends MessageImpl implements MapMessage { List<Section> sections = new ArrayList<Section>(); sections.add(getHeader()); + if(getDeliveryAnnotations() != null && getDeliveryAnnotations().getValue() != null && !getDeliveryAnnotations().getValue().isEmpty()) + { + sections.add(getDeliveryAnnotations()); + } if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty()) { sections.add(getMessageAnnotations()); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java index dcf531c6b5..c266f11e32 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageFactory.java @@ -19,18 +19,24 @@ package org.apache.qpid.amqp_1_0.jms.impl; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.Serializable; -import java.util.*; - class MessageFactory { private final SessionImpl _session; @@ -47,6 +53,7 @@ class MessageFactory List<Section> payload = msg.getPayload(); Header header = null; MessageAnnotations messageAnnotations = null; + DeliveryAnnotations deliveryAnnotations = null; Properties properties = null; ApplicationProperties appProperties = null; @@ -65,6 +72,7 @@ class MessageFactory if(section instanceof DeliveryAnnotations) { + deliveryAnnotations = (DeliveryAnnotations) section; section = iter.hasNext() ? iter.next() : null; } @@ -99,23 +107,28 @@ class MessageFactory Section bodySection = body.get(0); if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Map) { - message = new MapMessageImpl(header, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session); + message = new MapMessageImpl(header, deliveryAnnotations, messageAnnotations, properties, appProperties, (Map) ((AmqpValue)bodySection).getValue(), footer, _session); } else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof List) { - message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, - (List) ((AmqpValue)bodySection).getValue(), footer, _session); + message = new StreamMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties, appProperties, + (List) ((AmqpValue)bodySection).getValue(), footer, _session + ); } else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof String) { - message = new TextMessageImpl(header, messageAnnotations, properties, appProperties, + message = new TextMessageImpl(header, deliveryAnnotations, messageAnnotations, properties, appProperties, (String) ((AmqpValue)bodySection).getValue(), footer, _session); } else if(bodySection instanceof AmqpValue && ((AmqpValue)bodySection).getValue() instanceof Binary) { Binary value = (Binary) ((AmqpValue) bodySection).getValue(); - message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, + message = new BytesMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties, appProperties, new Data(value), footer, _session); } else if(bodySection instanceof Data) @@ -124,19 +137,26 @@ class MessageFactory { - message = new ObjectMessageImpl(header, messageAnnotations, properties, appProperties, + message = new ObjectMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session); } else { - message = new BytesMessageImpl(header, messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session); + message = new BytesMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties, appProperties, (Data) bodySection, footer, _session); } } else if(bodySection instanceof AmqpSequence) { - message = new StreamMessageImpl(header, messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session); + message = new StreamMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties, appProperties, ((AmqpSequence) bodySection).getValue(), footer, _session + ); } /*else if(bodySection instanceof AmqpDataSection) @@ -181,12 +201,16 @@ class MessageFactory }*/ else { - message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session); + message = new AmqpMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties,appProperties,body,footer, _session); } } else { - message = new AmqpMessageImpl(header,messageAnnotations, properties,appProperties,body,footer, _session); + message = new AmqpMessageImpl(header, + deliveryAnnotations, + messageAnnotations, properties,appProperties,body,footer, _session); } message.setReadOnly(); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java index 9400619061..5e89f93397 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageImpl.java @@ -21,8 +21,30 @@ package org.apache.qpid.amqp_1_0.jms.impl; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageFormatException; +import javax.jms.MessageNotReadableException; +import javax.jms.MessageNotWriteableException; + import org.apache.qpid.amqp_1_0.jms.Message; -import org.apache.qpid.amqp_1_0.messaging.MessageAttributes; +import org.apache.qpid.amqp_1_0.jms.impl.util.AnnotationDecoder; +import org.apache.qpid.amqp_1_0.jms.impl.util.AnnotationEncoder; import org.apache.qpid.amqp_1_0.type.Binary; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.Symbol; @@ -31,28 +53,24 @@ import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.UnsignedLong; import org.apache.qpid.amqp_1_0.type.UnsignedShort; import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Footer; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.MessageFormatException; -import javax.jms.MessageNotReadableException; -import javax.jms.MessageNotWriteableException; -import java.util.*; - public abstract class MessageImpl implements Message { static final Set<Class> _supportedClasses = new HashSet<Class>(Arrays.asList(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class, Character.class, String.class, byte[].class)); + static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); static final Symbol TO_TYPE = Symbol.valueOf("x-opt-to-type"); static final Symbol REPLY_TO_TYPE = Symbol.valueOf("x-opt-reply-type"); + static final Collection<Symbol> SYSTEM_MESSAGE_ANNOTATIONS = Arrays.asList(JMS_TYPE, TO_TYPE, REPLY_TO_TYPE); + static final String QUEUE_ATTRIBUTE = "queue"; static final String TOPIC_ATTRIBUTE = "topic"; static final String TEMPORARY_ATTRIBUTE = "temporary"; @@ -63,6 +81,8 @@ public abstract class MessageImpl implements Message static final Set<String> JMS_TEMP_TOPIC_ATTRIBUTES = set(TOPIC_ATTRIBUTE, TEMPORARY_ATTRIBUTE); private static final String JMSXGROUP_ID = "JMSXGroupID"; + public static final String JMS_AMQP_MESSAGE_ANNOTATIONS = "JMS_AMQP_MESSAGE_ANNOTATIONS"; + public static final String JMS_AMQP_DELIVERY_ANNOTATIONS = "JMS_AMQP_DELIVERY_ANNOTATIONS"; private Header _header; private Properties _properties; @@ -70,6 +90,7 @@ public abstract class MessageImpl implements Message private Footer _footer; private final SessionImpl _sessionImpl; private boolean _readOnly; + private DeliveryAnnotations _deliveryAnnotations; private MessageAnnotations _messageAnnotations; private boolean _isFromQueue; @@ -78,6 +99,7 @@ public abstract class MessageImpl implements Message private DestinationImpl _replyTo; protected MessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, @@ -87,6 +109,8 @@ public abstract class MessageImpl implements Message _header = header == null ? new Header() : header; _properties = properties == null ? new Properties() : properties; _messageAnnotations = messageAnnotations == null ? new MessageAnnotations(new HashMap()) : messageAnnotations; + _deliveryAnnotations = deliveryAnnotations == null ? new DeliveryAnnotations(new HashMap()) : deliveryAnnotations; + _footer = footer == null ? new Footer(Collections.EMPTY_MAP) : footer; _applicationProperties = appProperties == null ? new ApplicationProperties(new HashMap()) : appProperties; _sessionImpl = session; @@ -470,6 +494,49 @@ public abstract class MessageImpl implements Message { return _properties.getGroupId(); } + else if(JMS_AMQP_DELIVERY_ANNOTATIONS.equals(name)) + { + Map annotationsMap = deliveryAnnotationsMap(); + if(annotationsMap.isEmpty()) + { + return null; + } + else + { + try + { + return new AnnotationEncoder().encode(annotationsMap); + } + catch (IOException e) + { + throw new IllegalArgumentException(e); + } + } + } + else if(JMS_AMQP_MESSAGE_ANNOTATIONS.equals(name)) + { + Map annotationsMap = new LinkedHashMap(messageAnnotationMap()); + for(Symbol s : SYSTEM_MESSAGE_ANNOTATIONS) + { + annotationsMap.remove(s); + } + + if(annotationsMap.isEmpty()) + { + return null; + } + else + { + try + { + return new AnnotationEncoder().encode(annotationsMap); + } + catch (IOException e) + { + throw new IllegalArgumentException(e); + } + } + } return _applicationProperties.getValue().get(name); } @@ -740,6 +807,27 @@ public abstract class MessageImpl implements Message { names.add(JMSXGROUP_ID); } + if(!deliveryAnnotationsMap().isEmpty()) + { + names.add(JMS_AMQP_DELIVERY_ANNOTATIONS); + } + if(!messageAnnotationMap().isEmpty()) + { + boolean nonDefaultAnnotation = false; + for(Object key : messageAnnotationMap().keySet()) + { + if(!SYSTEM_MESSAGE_ANNOTATIONS.contains(key)) + { + nonDefaultAnnotation = true; + break; + } + } + + if(nonDefaultAnnotation) + { + names.add(JMS_AMQP_MESSAGE_ANNOTATIONS); + } + } return Collections.enumeration(names); } @@ -952,6 +1040,44 @@ public abstract class MessageImpl implements Message { _properties.setGroupId(value == null ? null : value.toString()); } + else if(JMS_AMQP_MESSAGE_ANNOTATIONS.equals(name)) + { + try + { + Map<Symbol, Object> annotationMap = new AnnotationDecoder().decode((String) value); + Map messageAnnotations = messageAnnotationMap(); + Map tmp = new LinkedHashMap(); + for(Symbol key : SYSTEM_MESSAGE_ANNOTATIONS) + { + if(messageAnnotations.containsKey(key)) + { + tmp.put(key, messageAnnotations.get(key)); + } + } + messageAnnotations.clear(); + messageAnnotations.putAll(annotationMap); + messageAnnotations.putAll(tmp); + } + catch (IOException e) + { + throw new IllegalArgumentException(e); + } + + } + else if(JMS_AMQP_DELIVERY_ANNOTATIONS.equals(name)) + { + try + { + Map<Symbol, Object> annotationMap = new AnnotationDecoder().decode((String) value); + Map deliveryAnnotations = deliveryAnnotationsMap(); + deliveryAnnotations.clear(); + deliveryAnnotations.putAll(annotationMap); + } + catch (IOException e) + { + throw new IllegalArgumentException(e); + } + } else { _applicationProperties.getValue().put(name, value); @@ -998,28 +1124,6 @@ public abstract class MessageImpl implements Message _header.setDeliveryCount(failures); } - public MessageAttributes getHeaderMessageAttrs() - { - // TODO - return null ; // _header.getMessageAttrs(); - } - - public void setHeaderMessageAttrs(final MessageAttributes messageAttrs) - { - // TODO - } - - public MessageAttributes getHeaderDeliveryAttrs() - { - // TODO - return null ; //_header.getDeliveryAttrs(); - } - - public void setHeaderDeliveryAttrs(final MessageAttributes deliveryAttrs) - { - //TODO - } - public Boolean getDurable() { return _header.getDurable(); @@ -1224,6 +1328,12 @@ public abstract class MessageImpl implements Message return _messageAnnotations; } + + DeliveryAnnotations getDeliveryAnnotations() + { + return _deliveryAnnotations; + } + public ApplicationProperties getApplicationProperties() { return _applicationProperties; @@ -1319,6 +1429,17 @@ public abstract class MessageImpl implements Message return messageAttrs; } + private Map deliveryAnnotationsMap() + { + Map deliveryAttrs = _deliveryAnnotations == null ? null : _deliveryAnnotations.getValue(); + if(deliveryAttrs == null) + { + deliveryAttrs = new HashMap(); + _deliveryAnnotations = new DeliveryAnnotations(deliveryAttrs); + } + return deliveryAttrs; + } + Set<String> splitCommaSeparateSet(String value) { if( value == null ) diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java index 5e180beb9e..a80af518c4 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ObjectMessageImpl.java @@ -19,22 +19,32 @@ package org.apache.qpid.amqp_1_0.jms.impl; -import org.apache.qpid.amqp_1_0.jms.ObjectMessage; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; - -import javax.jms.JMSException; -import javax.jms.MessageNotWriteableException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.MessageNotWriteableException; + +import org.apache.qpid.amqp_1_0.jms.ObjectMessage; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; public class ObjectMessageImpl extends MessageImpl implements ObjectMessage { @@ -61,6 +71,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage private Data _objectData = NULL_OBJECT_DATA; protected ObjectMessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, @@ -68,7 +79,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage Footer footer, SessionImpl session) { - super(header, messageAnnotations, properties, appProperties, footer, session); + super(header, deliveryAnnotations, messageAnnotations, properties, appProperties, footer, session); getProperties().setContentType(CONTENT_TYPE); Serializable serializable = null; _objectData = dataSection; @@ -77,7 +88,7 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage protected ObjectMessageImpl(final SessionImpl session) { - super(new Header(), new MessageAnnotations(new HashMap()), + super(new Header(), new DeliveryAnnotations(new HashMap()), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), session); getProperties().setContentType(CONTENT_TYPE); @@ -146,6 +157,10 @@ public class ObjectMessageImpl extends MessageImpl implements ObjectMessage { List<Section> sections = new ArrayList<Section>(); sections.add(getHeader()); + if(getDeliveryAnnotations() != null && getDeliveryAnnotations().getValue() != null && !getDeliveryAnnotations().getValue().isEmpty()) + { + sections.add(getDeliveryAnnotations()); + } if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty()) { sections.add(getMessageAnnotations()); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java index 9aededca5e..76a614fbd2 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/StreamMessageImpl.java @@ -19,17 +19,26 @@ package org.apache.qpid.amqp_1_0.jms.impl; -import org.apache.qpid.amqp_1_0.jms.StreamMessage; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import javax.jms.JMSException; import javax.jms.MessageEOFException; import javax.jms.MessageFormatException; -import java.io.EOFException; -import java.util.*; + +import org.apache.qpid.amqp_1_0.jms.StreamMessage; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; public class StreamMessageImpl extends MessageImpl implements StreamMessage { @@ -40,31 +49,27 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage - protected StreamMessageImpl(Header header, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, List list, - Footer footer, SessionImpl session) + protected StreamMessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, + MessageAnnotations messageAnnotations, + Properties properties, + ApplicationProperties appProperties, + List list, + Footer footer, + SessionImpl session) { - super(header, messageAnnotations, properties, appProperties, footer, session); + super(header, deliveryAnnotations, messageAnnotations, properties, appProperties, footer, session); _list = list; } StreamMessageImpl(final SessionImpl session) { - super(new Header(), new MessageAnnotations(new HashMap()), new Properties(), + super(new Header(), new DeliveryAnnotations(new HashMap()), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), session); _list = new ArrayList(); } - public StreamMessageImpl(final Header header, - final MessageAnnotations messageAnnotations, - final Properties properties, - final ApplicationProperties appProperties, - final List amqpListSection, final Footer footer) - { - super(header, messageAnnotations, properties, appProperties, footer, null); - _list = amqpListSection; - } - public boolean readBoolean() throws JMSException { Object obj = readObject(); @@ -453,6 +458,10 @@ public class StreamMessageImpl extends MessageImpl implements StreamMessage { List<Section> sections = new ArrayList<Section>(); sections.add(getHeader()); + if(getDeliveryAnnotations() != null && getDeliveryAnnotations().getValue() != null && !getDeliveryAnnotations().getValue().isEmpty()) + { + sections.add(getDeliveryAnnotations()); + } if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty()) { sections.add(getMessageAnnotations()); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java index de79defddb..6d061083fd 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TextMessageImpl.java @@ -19,20 +19,31 @@ package org.apache.qpid.amqp_1_0.jms.impl; -import org.apache.qpid.amqp_1_0.jms.TextMessage; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.messaging.*; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; -import java.util.*; + +import org.apache.qpid.amqp_1_0.jms.TextMessage; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; public class TextMessageImpl extends MessageImpl implements TextMessage { private String _text; protected TextMessageImpl(Header header, + DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, Properties properties, ApplicationProperties appProperties, @@ -40,13 +51,13 @@ public class TextMessageImpl extends MessageImpl implements TextMessage Footer footer, SessionImpl session) { - super(header, messageAnnotations, properties, appProperties, footer, session); + super(header, deliveryAnnotations, messageAnnotations, properties, appProperties, footer, session); _text = text; } protected TextMessageImpl(final SessionImpl session) { - super(new Header(), new MessageAnnotations(new HashMap()), + super(new Header(), new DeliveryAnnotations(new HashMap()), new MessageAnnotations(new HashMap()), new Properties(), new ApplicationProperties(new HashMap()), new Footer(Collections.EMPTY_MAP), session); } @@ -77,6 +88,10 @@ public class TextMessageImpl extends MessageImpl implements TextMessage { List<Section> sections = new ArrayList<Section>(); sections.add(getHeader()); + if(getDeliveryAnnotations() != null && getDeliveryAnnotations().getValue() != null && !getDeliveryAnnotations().getValue().isEmpty()) + { + sections.add(getDeliveryAnnotations()); + } if(getMessageAnnotations() != null && getMessageAnnotations().getValue() != null && !getMessageAnnotations().getValue().isEmpty()) { sections.add(getMessageAnnotations()); diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/AnnotationDecoder.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/AnnotationDecoder.java new file mode 100644 index 0000000000..4fa7a0940b --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/AnnotationDecoder.java @@ -0,0 +1,395 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.jms.impl.util; + +import java.io.IOException; +import java.io.StringReader; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.xml.bind.DatatypeConverter; + +import org.apache.qpid.amqp_1_0.codec.DescribedType; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.UnsignedShort; + +public class AnnotationDecoder +{ + private static final Map<String,Converter> CONVERTERS = new HashMap<>(); + + + private final JsonDecoder _decoder = new JsonDecoder(); + + public Map<Symbol,Object> decode(String value) throws IOException + { + Map<String,Object> map = (Map<String,Object>) _decoder.decode(new StringReader(value)); + Map<Symbol,Object> convertedMap = new LinkedHashMap<>(); + for(Map.Entry<String,Object> entry : map.entrySet()) + { + convertedMap.put(Symbol.valueOf(entry.getKey()), convertObject(entry.getValue())); + } + return convertedMap; + } + + private Object convertObject(final Object value) + { + if(value == null || value instanceof String || value instanceof Boolean) + { + return value; + } + else if(value instanceof Number) + { + return ((Number)value).intValue(); + } + else if(value instanceof Collection) + { + Collection<?> list = (Collection<?>)value; + List<Object> convertedList = new ArrayList<>(list.size()); + for(Object o : list) + { + convertedList.add(convertObject(o)); + } + return convertedList; + } + else if(value instanceof Map) + { + Map<String,Object> map = (Map<String,Object>)value; + if(map.size() != 1) + { + throw new IllegalArgumentException("Cannot parse map " + map + " as a value"); + } + Converter converter = CONVERTERS.get(map.keySet().iterator().next()); + return converter.convert(map.values().iterator().next(), this); + } + return null; + } + + private static abstract class Converter + { + Converter(String name) + { + CONVERTERS.put(name, this); + } + + abstract Object convert(Object value, final AnnotationDecoder decoder); + } + + private static Converter LONG_CONVERTER = new Converter("long") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return ((Number)value).longValue(); + } + }; + + private static Converter SHORT_CONVERTER = new Converter("short") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return ((Number)value).shortValue(); + } + }; + + private static Converter BYTE_CONVERTER = new Converter("byte") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return ((Number)value).byteValue(); + } + }; + + private static Converter ULONG_CONVERTER = new Converter("ulong") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + Number number = (Number) value; + return UnsignedLong.valueOf(number.toString()); + } + }; + + private static Converter UINT_CONVERTER = new Converter("uint") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return UnsignedInteger.valueOf(((Number) value).longValue()); + } + }; + + private static Converter USHORT_CONVERTER = new Converter("ushort") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + Number number = (Number) value; + return UnsignedShort.valueOf(number.toString()); + } + }; + + private static Converter UBYTE_CONVERTER = new Converter("ubyte") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + Number number = (Number) value; + + return UnsignedByte.valueOf(value.toString()); + } + }; + + private static Converter FLOAT_CONVERTER = new Converter("float") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return ((Number) value).floatValue(); + } + }; + + private static Converter DOUBLE_CONVERTER = new Converter("double") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return ((Number) value).doubleValue(); + } + }; + + + private static Converter SYMBOL_CONVERTER = new Converter("symbol") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return Symbol.valueOf((String) value); + } + }; + + + private static Converter CHAR_CONVERTER = new Converter("char") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + String stringValue = (String) value; + if(stringValue.length() != 1) + { + throw new IllegalArgumentException("Cannot decode '"+stringValue+"' as a char"); + } + return stringValue.charAt(0); + } + }; + + + private static Converter TIMESTAMP_CONVERTER = new Converter("timestamp") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + return new Date(((Number) value).longValue()); + } + }; + + + private static Converter MAP_CONVERTER = new Converter("map") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + Map<?,?> map = (Map<?,?>) value; + Map<Object,Object> convertedMap = new LinkedHashMap<>(); + for(Map.Entry<?,?> entry : map.entrySet()) + { + convertedMap.put(decoder.convertObject(entry.getKey()), decoder.convertObject(entry.getValue())); + } + return convertedMap; + } + }; + + + private static Converter DESCRIBED_CONVERTER = new Converter("described") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + Map<?,?> map = (Map<?,?>) value; + if(map.size() != 1) + { + throw new IllegalArgumentException("Cannot convert described type from: " + map); + } + Object descriptor = decoder.convertObject(map.keySet().iterator().next()); + Object described = decoder.convertObject(map.values().iterator().next()); + return new DescribedType(descriptor, described); + } + }; + + private static Converter BINARY_CONVERTER = new Converter("binary") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + String valueString = (String)value; + byte[] bytes = DatatypeConverter.parseBase64Binary(valueString); + return new Binary(bytes); + } + }; + + private static Converter ARRAY_CONVERTER = new Converter("array") + { + @Override + Object convert(final Object value, final AnnotationDecoder decoder) + { + Collection<?> list = (Collection<?>)value; + List<Object> convertedList = new ArrayList<>(list.size()); + Set<Class> objClasses = new HashSet<>(); + for(Object o : list) + { + Object convertObject = decoder.convertObject(o); + objClasses.add(convertObject == null ? Void.class : convertObject.getClass()); + convertedList.add(convertObject); + } + if(objClasses.size() != 1) + { + throw new IllegalArgumentException("Cannot convert object to an array: " + value); + } + Class objClass = objClasses.iterator().next(); + if(objClass == Void.class) + { + return new Void[convertedList.size()]; + } + else if(objClass == Boolean.class) + { + boolean[] array = new boolean[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Boolean) convertedList.get(i); + } + return array; + } + else if(objClass == Byte.class) + { + byte[] array = new byte[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Byte) convertedList.get(i); + } + return array; + } + else if(objClass == Character.class) + { + char[] array = new char[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Character) convertedList.get(i); + } + return array; + } + else if(objClass == Short.class) + { + short[] array = new short[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Short) convertedList.get(i); + } + return array; + } + else if(objClass == Integer.class) + { + int[] array = new int[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Integer) convertedList.get(i); + } + return array; + } + else if(objClass == Long.class) + { + long[] array = new long[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Long) convertedList.get(i); + } + return array; + } + else if(objClass == Float.class) + { + float[] array = new float[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Float) convertedList.get(i); + } + return array; + } + else if(objClass == Double.class) + { + double[] array = new double[convertedList.size()]; + for(int i = 0; i < convertedList.size(); i++) + { + array[i] = (Double) convertedList.get(i); + } + return array; + } + else + { + return convertedList.toArray((Object[])Array.newInstance(objClass, convertedList.size())); + } + } + }; + + + public static void main(String[] args) throws Exception + { + Map<Symbol, Object> foo = new LinkedHashMap<>(); + foo.put(Symbol.valueOf("ARG_1"), 2); + foo.put(Symbol.valueOf("ARG_2"), true); + foo.put(Symbol.valueOf("ARG_3"), "wibble"); + foo.put(Symbol.valueOf("ARG_4"), Arrays.asList("this", "is", "a", "test")); + foo.put(Symbol.valueOf("ARG_5"), Arrays.asList((Object)"this", 2l, Symbol.valueOf("a"), "test")); + foo.put(Symbol.valueOf("ARG_6"), Collections.singletonMap("wibble",0.3)); + + + + String encoded = new AnnotationEncoder().encode(foo); + System.err.println(encoded); + Object foo2 = new AnnotationDecoder().decode(encoded); + + System.out.println(foo2.equals(foo)); + } + +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/AnnotationEncoder.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/AnnotationEncoder.java new file mode 100644 index 0000000000..d8b90320e9 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/AnnotationEncoder.java @@ -0,0 +1,175 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.jms.impl.util; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import javax.xml.bind.DatatypeConverter; + +import org.apache.qpid.amqp_1_0.codec.DescribedType; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.UnsignedLong; +import org.apache.qpid.amqp_1_0.type.UnsignedShort; + +public class AnnotationEncoder +{ + private final JsonEncoder _encoder = new JsonEncoder(); + + public String encode(Map<Symbol,Object> annotations) throws IOException + { + Map<String,Object> convertedMap = convertMap(annotations); + return _encoder.encode(convertedMap); + } + + + private Map<String, Object> convertMap(final Map<Symbol,Object> value) + { + Map<String,Object> converted = new LinkedHashMap<>(); + for(Map.Entry<Symbol,Object> entry : value.entrySet()) + { + converted.put(entry.getKey().toString(), convert(entry.getValue())); + } + return converted; + } + + private Object convert(final Object value) + { + if(value == null || value instanceof String || value instanceof Boolean || value instanceof Integer) + { + return value; + } + else if(value instanceof Long) + { + return Collections.singletonMap("long", value); + } + else if(value instanceof Short) + { + return Collections.singletonMap("short", value); + } + else if(value instanceof Byte) + { + return Collections.singletonMap("byte", value); + } + else if(value instanceof UnsignedLong) + { + return Collections.singletonMap("ulong", ((UnsignedLong)value).bigIntegerValue()); + } + else if(value instanceof UnsignedInteger) + { + return Collections.singletonMap("uint", ((UnsignedInteger)value).longValue()); + } + else if(value instanceof UnsignedShort) + { + return Collections.singletonMap("ushort", ((UnsignedShort)value).intValue()); + } + else if(value instanceof UnsignedByte) + { + return Collections.singletonMap("ubyte", ((UnsignedByte)value).shortValue()); + } + else if(value instanceof Character) + { + return Collections.singletonMap("char", value.toString()); + } + else if(value instanceof Symbol) + { + return Collections.singletonMap("symbol", value.toString()); + } + else if(value instanceof Date) + { + return Collections.singletonMap("timestamp", ((Date)value).getTime()); + } + else if(value instanceof Float) + { + return Collections.singletonMap("float", value); + } + else if(value instanceof Double) + { + return Collections.singletonMap("double", value); + } + else if(value instanceof Binary) + { + Binary bin = (Binary) value; + byte[] bytes; + if(bin.getArrayOffset() != 0 || bin.getLength() != bin.getArray().length) + { + bytes = new byte[bin.getLength()]; + System.arraycopy(bin.getArray(), bin.getArrayOffset(),bytes, 0, bin.getLength()); + } + else + { + bytes = bin.getArray(); + } + return Collections.singletonMap("binary", DatatypeConverter.printBase64Binary(bytes)); + } + else if(value instanceof List) + { + List<?> list = (List) value; + List<Object> convertedList = new ArrayList<>(list.size()); + for(Object o : list) + { + convertedList.add(convert(o)); + } + return convertedList; + } + else if(value instanceof Map) + { + Map<?,?> map = (Map<?,?>) value; + Map<Object,Object> convertedMap = new LinkedHashMap<>(); + for(Map.Entry<?,?> entry : map.entrySet()) + { + convertedMap.put(convert(entry.getKey()), convert(entry.getValue())); + } + return Collections.singletonMap("map", convertedMap); + } + else if(value instanceof Object[]) + { + return Collections.singletonMap("array", convert(Arrays.asList((Object[])value))); + } + else if(value.getClass().isArray()) + { + int length = Array.getLength(value); + List<Object> list = new ArrayList<>(length); + for(int i = 0; i < length; i++) + { + list.add(Array.get(value, i)); + } + return Collections.singletonMap("array", convert(list)); + } + else if(value instanceof DescribedType) + { + DescribedType type = (DescribedType) value; + return Collections.singletonMap("described", Collections.singletonMap(convert(type.getDescriptor()), + convert(type.getDescribed()))); + } + throw new IllegalArgumentException("Cannot convert object of class: " + value.getClass().getName()); + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/JsonDecoder.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/JsonDecoder.java new file mode 100644 index 0000000000..2760492ca8 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/JsonDecoder.java @@ -0,0 +1,379 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.jms.impl.util; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Reader; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Stack; + +public class JsonDecoder +{ + static enum TokenType + { + BEGIN_MAP, + END_MAP, + BEGIN_ARRAY, + END_ARRAY, + COMMA, + COLON, + STRING, + BOOLEAN, + NUMBER, + NULL + } + + static private class Token + { + private final TokenType _type; + private final Object _value; + + + private Token(final TokenType type, final Object value) + { + _type = type; + _value = value; + } + + public TokenType getType() + { + return _type; + } + + public Object getValue() + { + return _value; + } + } + + public Object decode(Reader reader) throws IOException + { + if(!reader.markSupported()) + { + return decode(new BufferedReader(reader)); + } + else + { + return readValue(reader, new Stack<Token>()); + } + } + + private Object readValue(final Reader reader, final Stack<Token> tokenStack) throws IOException + { + Token token = readToken(reader, tokenStack); + switch(token.getType()) + { + case BOOLEAN: + case NUMBER: + case STRING: + case NULL: + return token.getValue(); + case BEGIN_MAP: + Map<Object,Object> map = new LinkedHashMap<>(); + token = readToken(reader, tokenStack); + if(token.getType() != TokenType.END_MAP) + { + tokenStack.push(token); + do + { + Object key = readValue(reader, tokenStack); + token = readToken(reader, tokenStack); + if(token.getType() != TokenType.COLON) + { + throw new IllegalArgumentException("Cannot parse Json string"); + } + Object value = readValue(reader, tokenStack); + map.put(key, value); + token = readToken(reader, tokenStack); + if(!(token.getType() == TokenType.END_MAP + || token.getType() == TokenType.COMMA)) + { + throw new IllegalArgumentException("Cannot parse Json string"); + } + } + while(token.getType() != TokenType.END_MAP); + + } + return map; + case BEGIN_ARRAY: + List<Object> list = new ArrayList<>(); + token = readToken(reader, tokenStack); + if(token.getType() != TokenType.END_MAP) + { + tokenStack.push(token); + do + { + Object element = readValue(reader, tokenStack); + list.add(element); + token = readToken(reader, tokenStack); + if(!(token.getType() == TokenType.END_ARRAY + || token.getType() == TokenType.COMMA)) + { + throw new IllegalArgumentException("Cannot parse Json string"); + } + } + while(token.getType() != TokenType.END_ARRAY); + + } + + return list; + default: + throw new IllegalArgumentException("Could not parse Json String"); + + } + } + + static final Map<Character, Token> PUNCTUATION_TOKENS; + static + { + final Map<Character, Token> tokenMap = new HashMap<>(); + tokenMap.put('{', new Token(TokenType.BEGIN_MAP, null)); + tokenMap.put('}', new Token(TokenType.END_MAP, null)); + tokenMap.put('[', new Token(TokenType.BEGIN_ARRAY, null)); + tokenMap.put(']', new Token(TokenType.END_ARRAY, null)); + tokenMap.put(':', new Token(TokenType.COLON, null)); + tokenMap.put(',', new Token(TokenType.COMMA, null)); + PUNCTUATION_TOKENS = Collections.unmodifiableMap(tokenMap); + } + + private Token readToken(final Reader reader, final Stack<Token> tokenStack) throws IOException + { + if(!tokenStack.isEmpty()) + { + return tokenStack.pop(); + } + ignoreWhitespace(reader); + + + char[] cb = new char[1]; + reader.mark(2); + if(reader.read(cb) == 1) + { + final char c = cb[0]; + Token token = PUNCTUATION_TOKENS.get(c); + if(token != null) + { + return token; + } + if(c == '"') + { + reader.reset(); + return readString(reader); + } + else if(c == '-' || (c >= '0' && c <= '9')) + { + reader.reset(); + return readNumber(reader); + } + else if(c == 't') + { + reader.reset(); + readLiteral(reader, "true"); + return new Token(TokenType.BOOLEAN, true); + } + else if(c == 'f') + { + reader.reset(); + readLiteral(reader, "false"); + return new Token(TokenType.BOOLEAN, false); + } + else if(c == 'n') + { + reader.reset(); + readLiteral(reader, "null"); + return new Token(TokenType.NULL, null); + } + else + { + throw new IllegalArgumentException("Could not parse json string"); + } + } + else + { + throw new IllegalArgumentException("Insufficient data"); + } + + } + + private Token readNumber(final Reader reader) throws IOException + { + StringBuilder buffer = new StringBuilder(); + reader.mark(1); + char[] cb = new char[1]; + int read; + while((read = reader.read(cb)) == 1 && (Character.isDigit(cb[0]) || cb[0] == '-' || Character.isAlphabetic(cb[0]) || cb[0] == '.')) + { + buffer.append(cb[0]); + reader.mark(1); + } + if(read == 1) + { + reader.reset(); + } + + // todo - here + String numberString = buffer.toString(); + if(!numberString.matches("-?\\d+(\\.\\d+)?([eE][+\\-]?\\d+)?")) + { + throw new IllegalArgumentException("Cannot parse number from " + numberString); + } + BigDecimal number = new BigDecimal(numberString.toUpperCase()); + try + { + // TODO - doesn't cope with unsigned longs > Long.MAX_VALUE + BigInteger bigInteger = number.toBigIntegerExact(); + if(bigInteger.longValue() > Integer.MAX_VALUE + || bigInteger.longValue() < Integer.MIN_VALUE) + { + return new Token(TokenType.NUMBER, bigInteger.longValue()); + } + else + { + return new Token(TokenType.NUMBER, bigInteger.intValue()); + } + } + catch(ArithmeticException e) + { + return new Token(TokenType.NUMBER, number.doubleValue()); + } + } + + private Token readString(final Reader reader) throws IOException + { + StringBuilder builder = new StringBuilder(); + // ignore starting quote + reader.read(); + + + do + { + char c = readChar(reader); + if(c == '\\') + { + c = readChar(reader); + if(c == '\\' || c == '/' || c == '"') + { + builder.append(c); + } + else if(c == 't') + { + builder.append('\t'); + } + else if(c == 'n') + { + builder.append('\n'); + } + else if(c == 'r') + { + builder.append('\r'); + } + else if(c == 'f') + { + builder.append('\f'); + } + else if(c == 'b') + { + builder.append('\b'); + } + else if(c == 'u') + { + char[] point = new char[4]; + if(reader.read(point) != 4) + { + throw new IllegalArgumentException("Insufficient data"); + } + char codePoint = (char)(Integer.parseInt((new String(point)).toUpperCase(), 16) & 0xffff); + builder.append(codePoint); + } + else + { + throw new IllegalArgumentException("Invalid escaped character"); + } + } + else if(c == '"') + { + break; + } + else + { + builder.append(c); + } + } + while(true); + + + return new Token(TokenType.STRING, builder.toString()); + } + + private char readChar(final Reader reader) throws IOException + { + char[] cb = new char[1]; + if(reader.read(cb) != 1) + { + throw new IllegalArgumentException("Insufficient data"); + } + return cb[0]; + } + + + private void readLiteral(final Reader reader, CharSequence expected) throws IOException + { + char[] cbuf = new char[expected.length()]; + int read = reader.read(cbuf); + if(read != expected.length()) + { + throw new IllegalArgumentException("Could not parse literal"); + } + for(int i = 0; i < expected.length(); i++) + { + if(cbuf[i] != expected.charAt(i)) + { + throw new IllegalArgumentException("Could not parse literal"); + } + } + } + + private void ignoreWhitespace(Reader reader) throws IOException + { + char[] cb = new char[1]; + reader.mark(1); + while(reader.read(cb) == 1) + { + if(!Character.isWhitespace(cb[0])) + { + reader.reset(); + break; + } + else + { + reader.mark(1); + } + } + } +} diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/JsonEncoder.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/JsonEncoder.java new file mode 100644 index 0000000000..7d9a066b24 --- /dev/null +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/util/JsonEncoder.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.amqp_1_0.jms.impl.util; + +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.io.Writer; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +public class JsonEncoder +{ + public void encode(Object object, Writer writer) throws IOException + { + if(object == null) + { + writer.append("null"); + } + else if(object instanceof Boolean) + { + writer.append(((Boolean)object)? "true" : "false"); + } + else if(object instanceof Number) + { + writer.append(object.toString()); + } + else if(object instanceof Object[]) + { + writer.append("[ "); + boolean first = true; + for(Object element : ((Object[])object)) + { + if(first) + { + first = false; + } + else + { + writer.append(','); + } + encode(element, writer); + writer.append(" ]"); + } + } + else if(object instanceof Iterable) + { + writer.append("[ "); + boolean first = true; + for(Object element : ((Iterable)object)) + { + if(first) + { + first = false; + } + else + { + writer.append(", "); + } + encode(element, writer); + } + writer.append(" ]"); + } + else if(object instanceof Map) + { + writer.append("{ "); + boolean first = true; + for(Map.Entry element : ((Map<?,?>)object).entrySet()) + { + if(first) + { + first = false; + } + else + { + writer.append(", "); + } + encode(element.getKey(), writer); + writer.append(" : "); + encode(element.getValue(), writer); + } + writer.append(" }"); + } + else if(object instanceof CharSequence) + { + CharSequence string = (CharSequence)object; + writer.append('"'); + for(int i = 0; i < string.length(); i++) + { + char c = string.charAt(i); + if(c == '"' || c=='\\') + { + writer.append('\\'); + } + writer.append(c); + } + writer.append('"'); + } + else + { + throw new IllegalArgumentException("Don't know how to encode class " + object.getClass().getName()); + } + } + + public String encode(Object object) throws IOException + { + StringWriter writer = new StringWriter(); + encode(object, writer); + return writer.toString(); + } + + + public static void main(String[] args) throws Exception + { + JsonEncoder encoder = new JsonEncoder(); + String encoded = + encoder.encode(Collections.singletonMap("hello \\\" world", Arrays.asList(3, 7.2, false, null, 4))); + Object decoded = new JsonDecoder().decode(new StringReader(encoded)); + System.err.println(encoded); + } +} |
