summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-18 00:30:14 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-18 00:30:14 +0000
commit37e2f371b04760bf0b9641e3969b070dda38d451 (patch)
tree362b98d31e120140c7e6bdff7ea931391ac85ddf /qpid/java
parent7723aaf6d5b48f9829a2b123a18cd6550f6aed3a (diff)
downloadqpid-python-37e2f371b04760bf0b9641e3969b070dda38d451.tar.gz
QPID-4000 : [Java Broker] add initial conversions from AMQP 1.0 messages to 0-8 and 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569151 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java278
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java212
-rwxr-xr-xqpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java11
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java155
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter1
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java209
-rw-r--r--qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter1
7 files changed, 655 insertions, 212 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
new file mode 100644
index 0000000000..3974ab0af6
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java
@@ -0,0 +1,278 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
+import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.typedmessage.TypedBytesContentWriter;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+public class MessageConverter_from_1_0
+{
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ public static Object convertBodyToObject(final Message_1_0 serverMessage)
+ {
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getStoredMessage().getContent(0, ByteBuffer.wrap(data));
+
+ SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY);
+
+ Object bodyObject;
+ try
+ {
+ List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
+ ListIterator<Section> iterator = sections.listIterator();
+ Section previousSection = null;
+ while(iterator.hasNext())
+ {
+ Section section = iterator.next();
+ if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence))
+ {
+ iterator.remove();
+ }
+ else
+ {
+ if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
+ {
+ throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+ }
+ else
+ {
+ previousSection = section;
+ }
+ }
+ }
+
+
+ if(sections.isEmpty())
+ {
+ // should actually be illegal
+ bodyObject = new byte[0];
+ }
+ else
+ {
+ Section firstBodySection = sections.get(0);
+ if(firstBodySection instanceof AmqpValue)
+ {
+ bodyObject = fixObject(((AmqpValue)firstBodySection).getValue());
+ }
+ else if(firstBodySection instanceof Data)
+ {
+ int totalSize = 0;
+ for(Section section : sections)
+ {
+ totalSize += ((Data)section).getValue().getLength();
+ }
+ byte[] bodyData = new byte[totalSize];
+ ByteBuffer buf = ByteBuffer.wrap(bodyData);
+ for(Section section : sections)
+ {
+ buf.put(((Data)section).getValue().asByteBuffer());
+ }
+ bodyObject = bodyData;
+ }
+ else
+ {
+ ArrayList totalSequence = new ArrayList();
+ for(Section section : sections)
+ {
+ totalSequence.addAll(((AmqpSequence)section).getValue());
+ }
+ bodyObject = fixObject(totalSequence);
+ }
+ }
+
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ return bodyObject;
+ }
+
+ private static Object fixObject(final Object value)
+ {
+ if(value instanceof Binary)
+ {
+ final Binary binaryValue = (Binary) value;
+ byte[] data = new byte[binaryValue.getLength()];
+ binaryValue.asByteBuffer().get(data);
+ return data;
+ }
+ else if(value instanceof List)
+ {
+ List listValue = (List) value;
+ List fixedValue = new ArrayList(listValue.size());
+ for(Object o : listValue)
+ {
+ fixedValue.add(fixObject(o));
+ }
+ return fixedValue;
+ }
+ else if(value instanceof Map)
+ {
+ Map<?,?> mapValue = (Map) value;
+ Map fixedValue = new LinkedHashMap(mapValue.size());
+ for(Map.Entry<?,?> entry : mapValue.entrySet())
+ {
+ fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue()));
+ }
+ return fixedValue;
+ }
+ else
+ {
+ return value;
+ }
+
+ }
+
+ public static byte[] convertToBody(Object object)
+ {
+ if(object instanceof String)
+ {
+ return ((String)object).getBytes(UTF_8);
+ }
+ else if(object instanceof byte[])
+ {
+ return (byte[]) object;
+ }
+ else if(object instanceof Map)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap((Map)object);
+ ByteBuffer buf = encoder.segment();
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+
+ }
+ else if(object instanceof List)
+ {
+ try
+ {
+ ByteBuffer buf;
+ if(onlyPrimitiveTypes((List)object))
+ {
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+ for(Object value : (List)object)
+ {
+ writer.writeObject(value);
+ }
+ buf = writer.getData();
+
+ }
+ else
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeList((List) object);
+ buf = encoder.segment();
+ }
+ int remaining = buf.remaining();
+ byte[] data = new byte[remaining];
+ buf.get(data);
+ return data;
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ }
+ else
+ {
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ try
+ {
+ ObjectOutputStream os = new ObjectOutputStream(bytesOut);
+ os.writeObject(object);
+ return bytesOut.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ }
+ }
+
+ public static boolean onlyPrimitiveTypes(final List list)
+ {
+ for(Object value : list)
+ {
+ if(!(value instanceof String
+ || value instanceof Integer
+ || value instanceof Long
+ || value instanceof Double
+ || value instanceof Float
+ || value instanceof Byte
+ || value instanceof Short
+ || value instanceof Character
+ || value instanceof Boolean
+ || value instanceof byte[]))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static String getBodyMimeType(Object object)
+ {
+ if(object instanceof String)
+ {
+ return "text/plain";
+ }
+ else if(object instanceof byte[])
+ {
+ return "application/octet-stream";
+ }
+ else if(object instanceof Map)
+ {
+ return "amqp/map";
+ }
+ else if(object instanceof List)
+ {
+ return onlyPrimitiveTypes((List)object) ? "jms/stream-message" : "amqp/list";
+ }
+ else
+ {
+ return "application/java-object-stream";
+ }
+ }
+}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index ec6d5a924c..bb01ddad7f 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -20,14 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl;
-import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
-import org.apache.qpid.amqp_1_0.type.Binary;
-import org.apache.qpid.amqp_1_0.type.Section;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence;
-import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
-import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -41,7 +34,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage>
@@ -71,212 +63,14 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
@Override
public InternalMessage convert(Message_1_0 serverMessage, VirtualHost vhost)
{
- final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMessage);
-
-
-
- byte[] data = new byte[(int) serverMessage.getSize()];
- serverMessage.getStoredMessage().getContent(0,ByteBuffer.wrap(data));
-
- SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(TYPE_REGISTRY);
-
- try
- {
- List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data));
- ListIterator<Section> iterator = sections.listIterator();
- Section previousSection = null;
- while(iterator.hasNext())
- {
- Section section = iterator.next();
- if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence))
- {
- iterator.remove();
- }
- else
- {
- if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
- {
- throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
- }
- else
- {
- previousSection = section;
- }
- }
- }
-
- Object bodyObject;
-
- if(sections.isEmpty())
- {
- // should actually be illegal
- bodyObject = new byte[0];
- }
- else
- {
- Section firstBodySection = sections.get(0);
- if(firstBodySection instanceof AmqpValue)
- {
- bodyObject = fixObject(((AmqpValue)firstBodySection).getValue());
- }
- else if(firstBodySection instanceof Data)
- {
- int totalSize = 0;
- for(Section section : sections)
- {
- totalSize += ((Data)section).getValue().getLength();
- }
- byte[] bodyData = new byte[totalSize];
- ByteBuffer buf = ByteBuffer.wrap(bodyData);
- for(Section section : sections)
- {
- buf.put(((Data)section).getValue().asByteBuffer());
- }
- bodyObject = bodyData;
- }
- else
- {
- ArrayList totalSequence = new ArrayList();
- for(Section section : sections)
- {
- totalSequence.addAll(((AmqpSequence)section).getValue());
- }
- bodyObject = fixObject(totalSequence);
- }
- }
- return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
-
- }
- catch (AmqpErrorException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
-
-
-
-
- }
-
- private Object fixObject(final Object value)
- {
- if(value instanceof Binary)
- {
- final Binary binaryValue = (Binary) value;
- byte[] data = new byte[binaryValue.getLength()];
- binaryValue.asByteBuffer().get(data);
- return data;
- }
- else if(value instanceof List)
- {
- List listValue = (List) value;
- List fixedValue = new ArrayList(listValue.size());
- for(Object o : listValue)
- {
- fixedValue.add(fixObject(o));
- }
- return fixedValue;
- }
- else if(value instanceof Map)
- {
- Map<?,?> mapValue = (Map) value;
- Map fixedValue = new LinkedHashMap(mapValue.size());
- for(Map.Entry<?,?> entry : mapValue.entrySet())
- {
- fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue()));
- }
- return fixedValue;
- }
- else
- {
- return value;
- }
-
- }
-
- private static Object convertMessageBody(String mimeType, byte[] data)
- {
- if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
- {
- String text = new String(data);
- return text;
- }
- else if("jms/map-message".equals(mimeType))
- {
- TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
-
- LinkedHashMap map = new LinkedHashMap();
- final int entries = reader.readIntImpl();
- for (int i = 0; i < entries; i++)
- {
- try
- {
- String propName = reader.readStringImpl();
- Object value = reader.readObject();
-
- map.put(propName, value);
- }
- catch (EOFException e)
- {
- throw new IllegalArgumentException(e);
- }
- catch (TypedBytesFormatException e)
- {
- throw new IllegalArgumentException(e);
- }
-
- }
-
- return map;
-
- }
- else if("amqp/map".equals(mimeType))
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(ByteBuffer.wrap(data));
- final Map<String,Object> map = decoder.readMap();
-
- return map;
-
- }
- else if("amqp/list".equals(mimeType))
- {
- BBDecoder decoder = new BBDecoder();
- decoder.init(ByteBuffer.wrap(data));
- return decoder.readList();
- }
- else if("jms/stream-message".equals(mimeType))
- {
- TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
-
- List list = new ArrayList();
- while (reader.remaining() != 0)
- {
- try
- {
- list.add(reader.readObject());
- }
- catch (TypedBytesFormatException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- catch (EOFException e)
- {
- throw new ConnectionScopedRuntimeException(e);
- }
- }
- return list;
- }
- else
- {
- return data;
-
- }
+ return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject);
}
@Override
public String getType()
{
- return "v0-8 to Internal";
+ return "v1-0 to Internal";
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index f28e25e080..d5a349304c 100755
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -416,7 +416,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
}
else
{
- return _properties.getMessageId().toString();
+ return _properties.getCorrelationId().toString();
}
}
@@ -427,13 +427,13 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
public String getMessageId()
{
- if(_properties == null || _properties.getCorrelationId() == null)
+ if(_properties == null || _properties.getMessageId() == null)
{
return null;
}
else
{
- return _properties.getCorrelationId().toString();
+ return _properties.getMessageId().toString();
}
}
@@ -558,6 +558,11 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
{
return _properties == null ? null : _properties.getSubject();
}
+
+ public String getTo()
+ {
+ return _properties == null ? null : _properties.getTo();
+ }
}
}
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
new file mode 100644
index 0000000000..0f0197cb63
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+
+import java.nio.ByteBuffer;
+
+public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1_0, MessageTransferMessage>
+{
+
+ public Class<Message_1_0> getInputClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getOutputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public MessageTransferMessage convert(Message_1_0 serverMsg, VirtualHost vhost)
+ {
+ return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+ }
+
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg)
+ {
+ Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg);
+
+ final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject);
+
+ final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg,
+ MessageConverter_from_1_0.getBodyMimeType(bodyObject),
+ messageContent.length);
+
+ return new StoredMessage<MessageMetaData_0_10>()
+ {
+ @Override
+ public MessageMetaData_0_10 getMetaData()
+ {
+ return messageMetaData_0_10;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMsg.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ int size = messageContent.length - offsetInMessage;
+ if(dst.remaining() < size)
+ {
+ size = dst.remaining();
+ }
+ ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size)
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+
+
+ deliveryProps.setExpiration(serverMsg.getExpiration());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
+ deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+
+ messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+ messageProps.setContentLength(size);
+ messageProps.setContentType(bodyMimeType);
+ if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+ }
+
+ Header header = new Header(deliveryProps, messageProps, null);
+ return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
+ }
+
+
+
+ @Override
+ public String getType()
+ {
+ return "v1-0 to v0-10";
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
index 045eb72cb1..4c9e612f32 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -17,3 +17,4 @@
# under the License.
#
org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
+org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
new file mode 100644
index 0000000000..d9391d89a5
--- /dev/null
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java
@@ -0,0 +1,209 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0;
+import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+import org.apache.qpid.server.protocol.v1_0.Message_1_0;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_0, AMQMessage>
+{
+ private static final int BASIC_CLASS_ID = 60;
+
+
+ public Class<Message_1_0> getInputClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(Message_1_0 serverMsg, VirtualHost vhost)
+ {
+ return new AMQMessage(convertToStoredMessage(serverMsg), null);
+ }
+
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final Message_1_0 serverMsg)
+ {
+ Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg);
+
+
+
+
+ final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject);
+ final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg,
+ MessageConverter_from_1_0.getBodyMimeType(bodyObject),
+ messageContent.length);
+
+ return new StoredMessage<MessageMetaData>()
+ {
+ @Override
+ public MessageMetaData getMetaData()
+ {
+ return messageMetaData_0_8;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMsg.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ int size = messageContent.length - offsetInMessage;
+ if(dst.remaining() < size)
+ {
+ size = dst.remaining();
+ }
+ ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return ByteBuffer.wrap(messageContent, offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData convertMetaData(final Message_1_0 serverMsg, final String bodyMimeType, final int size)
+ {
+
+ MessagePublishInfo publishInfo = new MessagePublishInfo()
+ {
+ @Override
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ @Override
+ public void setExchange(final AMQShortString amqShortString)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ MessageMetaData_1_0.MessageHeader_1_0 header =
+ serverMsg.getMessageHeader();
+ String key = header.getTo();
+ if(key == null)
+ {
+ key = header.getSubject();
+ }
+
+ return AMQShortString.valueOf(key);
+ }
+ };
+
+
+ final BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setAppId(serverMsg.getMessageHeader().getAppId());
+ props.setContentType(bodyMimeType);
+ props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId());
+ props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT);
+ props.setExpiration(serverMsg.getExpiration());
+ props.setMessageId(serverMsg.getMessageHeader().getMessageId());
+ props.setPriority(serverMsg.getMessageHeader().getPriority());
+ props.setReplyTo(serverMsg.getMessageHeader().getReplyTo());
+ props.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+ props.setUserId(serverMsg.getMessageHeader().getUserId());
+
+ Map<String,Object> headerProps = new LinkedHashMap<String, Object>();
+
+ for(String headerName : serverMsg.getMessageHeader().getHeaderNames())
+ {
+ headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName));
+ }
+
+ props.setHeaders(FieldTable.convertToFieldTable(headerProps));
+
+ final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ chb.setBodySize(size);
+
+ return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime());
+ }
+
+
+ @Override
+ public String getType()
+ {
+ return "v1-0 to v0-8";
+ }
+
+
+}
diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
index cf4643f2b8..61259f870d 100644
--- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
+++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -17,3 +17,4 @@
# under the License.
#
org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_0_8_to_1_0
+org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_1_0_to_v0_8