diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-18 00:30:14 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-18 00:30:14 +0000 |
| commit | 37e2f371b04760bf0b9641e3969b070dda38d451 (patch) | |
| tree | 362b98d31e120140c7e6bdff7ea931391ac85ddf /qpid/java | |
| parent | 7723aaf6d5b48f9829a2b123a18cd6550f6aed3a (diff) | |
| download | qpid-python-37e2f371b04760bf0b9641e3969b070dda38d451.tar.gz | |
QPID-4000 : [Java Broker] add initial conversions from AMQP 1.0 messages to 0-8 and 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569151 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
7 files changed, 655 insertions, 212 deletions
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java new file mode 100644 index 0000000000..3974ab0af6 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -0,0 +1,278 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.transport.codec.BBEncoder; +import org.apache.qpid.typedmessage.TypedBytesContentWriter; +import org.apache.qpid.typedmessage.TypedBytesFormatException; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +public class MessageConverter_from_1_0 +{ + private static final Charset UTF_8 = Charset.forName("UTF-8"); + + public static Object convertBodyToObject(final Message_1_0 serverMessage) + { + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getStoredMessage().getContent(0, ByteBuffer.wrap(data)); + + SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(MessageConverter_v1_0_to_Internal.TYPE_REGISTRY); + + Object bodyObject; + try + { + List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data)); + ListIterator<Section> iterator = sections.listIterator(); + Section previousSection = null; + while(iterator.hasNext()) + { + Section section = iterator.next(); + if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence)) + { + iterator.remove(); + } + else + { + if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue)) + { + throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence"); + } + else + { + previousSection = section; + } + } + } + + + if(sections.isEmpty()) + { + // should actually be illegal + bodyObject = new byte[0]; + } + else + { + Section firstBodySection = sections.get(0); + if(firstBodySection instanceof AmqpValue) + { + bodyObject = fixObject(((AmqpValue)firstBodySection).getValue()); + } + else if(firstBodySection instanceof Data) + { + int totalSize = 0; + for(Section section : sections) + { + totalSize += ((Data)section).getValue().getLength(); + } + byte[] bodyData = new byte[totalSize]; + ByteBuffer buf = ByteBuffer.wrap(bodyData); + for(Section section : sections) + { + buf.put(((Data)section).getValue().asByteBuffer()); + } + bodyObject = bodyData; + } + else + { + ArrayList totalSequence = new ArrayList(); + for(Section section : sections) + { + totalSequence.addAll(((AmqpSequence)section).getValue()); + } + bodyObject = fixObject(totalSequence); + } + } + + } + catch (AmqpErrorException e) + { + throw new ConnectionScopedRuntimeException(e); + } + return bodyObject; + } + + private static Object fixObject(final Object value) + { + if(value instanceof Binary) + { + final Binary binaryValue = (Binary) value; + byte[] data = new byte[binaryValue.getLength()]; + binaryValue.asByteBuffer().get(data); + return data; + } + else if(value instanceof List) + { + List listValue = (List) value; + List fixedValue = new ArrayList(listValue.size()); + for(Object o : listValue) + { + fixedValue.add(fixObject(o)); + } + return fixedValue; + } + else if(value instanceof Map) + { + Map<?,?> mapValue = (Map) value; + Map fixedValue = new LinkedHashMap(mapValue.size()); + for(Map.Entry<?,?> entry : mapValue.entrySet()) + { + fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue())); + } + return fixedValue; + } + else + { + return value; + } + + } + + public static byte[] convertToBody(Object object) + { + if(object instanceof String) + { + return ((String)object).getBytes(UTF_8); + } + else if(object instanceof byte[]) + { + return (byte[]) object; + } + else if(object instanceof Map) + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeMap((Map)object); + ByteBuffer buf = encoder.segment(); + int remaining = buf.remaining(); + byte[] data = new byte[remaining]; + buf.get(data); + return data; + + } + else if(object instanceof List) + { + try + { + ByteBuffer buf; + if(onlyPrimitiveTypes((List)object)) + { + TypedBytesContentWriter writer = new TypedBytesContentWriter(); + for(Object value : (List)object) + { + writer.writeObject(value); + } + buf = writer.getData(); + + } + else + { + BBEncoder encoder = new BBEncoder(1024); + encoder.writeList((List) object); + buf = encoder.segment(); + } + int remaining = buf.remaining(); + byte[] data = new byte[remaining]; + buf.get(data); + return data; + } + catch (TypedBytesFormatException e) + { + throw new ConnectionScopedRuntimeException(e); + } + } + else + { + ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + try + { + ObjectOutputStream os = new ObjectOutputStream(bytesOut); + os.writeObject(object); + return bytesOut.toByteArray(); + } + catch (IOException e) + { + throw new ConnectionScopedRuntimeException(e); + } + } + } + + public static boolean onlyPrimitiveTypes(final List list) + { + for(Object value : list) + { + if(!(value instanceof String + || value instanceof Integer + || value instanceof Long + || value instanceof Double + || value instanceof Float + || value instanceof Byte + || value instanceof Short + || value instanceof Character + || value instanceof Boolean + || value instanceof byte[])) + { + return false; + } + } + return true; + } + + public static String getBodyMimeType(Object object) + { + if(object instanceof String) + { + return "text/plain"; + } + else if(object instanceof byte[]) + { + return "application/octet-stream"; + } + else if(object instanceof Map) + { + return "amqp/map"; + } + else if(object instanceof List) + { + return onlyPrimitiveTypes((List)object) ? "jms/stream-message" : "amqp/list"; + } + else + { + return "application/java-object-stream"; + } + } +} diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java index ec6d5a924c..bb01ddad7f 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java @@ -20,14 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -41,7 +34,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; -import java.util.ListIterator; import java.util.Map; public class MessageConverter_v1_0_to_Internal implements MessageConverter<Message_1_0, InternalMessage> @@ -71,212 +63,14 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa @Override public InternalMessage convert(Message_1_0 serverMessage, VirtualHost vhost) { - final String mimeType = serverMessage.getMessageHeader().getMimeType(); + Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMessage); - - - - byte[] data = new byte[(int) serverMessage.getSize()]; - serverMessage.getStoredMessage().getContent(0,ByteBuffer.wrap(data)); - - SectionDecoderImpl sectionDecoder = new SectionDecoderImpl(TYPE_REGISTRY); - - try - { - List<Section> sections = sectionDecoder.parseAll(ByteBuffer.wrap(data)); - ListIterator<Section> iterator = sections.listIterator(); - Section previousSection = null; - while(iterator.hasNext()) - { - Section section = iterator.next(); - if(!(section instanceof AmqpValue || section instanceof Data || section instanceof AmqpSequence)) - { - iterator.remove(); - } - else - { - if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue)) - { - throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence"); - } - else - { - previousSection = section; - } - } - } - - Object bodyObject; - - if(sections.isEmpty()) - { - // should actually be illegal - bodyObject = new byte[0]; - } - else - { - Section firstBodySection = sections.get(0); - if(firstBodySection instanceof AmqpValue) - { - bodyObject = fixObject(((AmqpValue)firstBodySection).getValue()); - } - else if(firstBodySection instanceof Data) - { - int totalSize = 0; - for(Section section : sections) - { - totalSize += ((Data)section).getValue().getLength(); - } - byte[] bodyData = new byte[totalSize]; - ByteBuffer buf = ByteBuffer.wrap(bodyData); - for(Section section : sections) - { - buf.put(((Data)section).getValue().asByteBuffer()); - } - bodyObject = bodyData; - } - else - { - ArrayList totalSequence = new ArrayList(); - for(Section section : sections) - { - totalSequence.addAll(((AmqpSequence)section).getValue()); - } - bodyObject = fixObject(totalSequence); - } - } - return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject); - - } - catch (AmqpErrorException e) - { - throw new ConnectionScopedRuntimeException(e); - } - - - - - } - - private Object fixObject(final Object value) - { - if(value instanceof Binary) - { - final Binary binaryValue = (Binary) value; - byte[] data = new byte[binaryValue.getLength()]; - binaryValue.asByteBuffer().get(data); - return data; - } - else if(value instanceof List) - { - List listValue = (List) value; - List fixedValue = new ArrayList(listValue.size()); - for(Object o : listValue) - { - fixedValue.add(fixObject(o)); - } - return fixedValue; - } - else if(value instanceof Map) - { - Map<?,?> mapValue = (Map) value; - Map fixedValue = new LinkedHashMap(mapValue.size()); - for(Map.Entry<?,?> entry : mapValue.entrySet()) - { - fixedValue.put(fixObject(entry.getKey()),fixObject(entry.getValue())); - } - return fixedValue; - } - else - { - return value; - } - - } - - private static Object convertMessageBody(String mimeType, byte[] data) - { - if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) - { - String text = new String(data); - return text; - } - else if("jms/map-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - LinkedHashMap map = new LinkedHashMap(); - final int entries = reader.readIntImpl(); - for (int i = 0; i < entries; i++) - { - try - { - String propName = reader.readStringImpl(); - Object value = reader.readObject(); - - map.put(propName, value); - } - catch (EOFException e) - { - throw new IllegalArgumentException(e); - } - catch (TypedBytesFormatException e) - { - throw new IllegalArgumentException(e); - } - - } - - return map; - - } - else if("amqp/map".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - final Map<String,Object> map = decoder.readMap(); - - return map; - - } - else if("amqp/list".equals(mimeType)) - { - BBDecoder decoder = new BBDecoder(); - decoder.init(ByteBuffer.wrap(data)); - return decoder.readList(); - } - else if("jms/stream-message".equals(mimeType)) - { - TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); - - List list = new ArrayList(); - while (reader.remaining() != 0) - { - try - { - list.add(reader.readObject()); - } - catch (TypedBytesFormatException e) - { - throw new ConnectionScopedRuntimeException(e); - } - catch (EOFException e) - { - throw new ConnectionScopedRuntimeException(e); - } - } - return list; - } - else - { - return data; - - } + return InternalMessage.convert(serverMessage.getMessageNumber(), serverMessage.isPersistent(), serverMessage.getMessageHeader(), bodyObject); } @Override public String getType() { - return "v0-8 to Internal"; + return "v1-0 to Internal"; } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index f28e25e080..d5a349304c 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -416,7 +416,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData } else { - return _properties.getMessageId().toString(); + return _properties.getCorrelationId().toString(); } } @@ -427,13 +427,13 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData public String getMessageId() { - if(_properties == null || _properties.getCorrelationId() == null) + if(_properties == null || _properties.getMessageId() == null) { return null; } else { - return _properties.getCorrelationId().toString(); + return _properties.getMessageId().toString(); } } @@ -558,6 +558,11 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData { return _properties == null ? null : _properties.getSubject(); } + + public String getTo() + { + return _properties == null ? null : _properties.getTo(); + } } } diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java new file mode 100644 index 0000000000..0f0197cb63 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10.java @@ -0,0 +1,155 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.converter.v0_10_v1_0; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; + +import java.nio.ByteBuffer; + +public class MessageConverter_1_0_to_v0_10 implements MessageConverter<Message_1_0, MessageTransferMessage> +{ + + public Class<Message_1_0> getInputClass() + { + return Message_1_0.class; + } + + @Override + public Class<MessageTransferMessage> getOutputClass() + { + return MessageTransferMessage.class; + } + + @Override + public MessageTransferMessage convert(Message_1_0 serverMsg, VirtualHost vhost) + { + return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); + } + + private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final Message_1_0 serverMsg) + { + Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); + + final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject); + + final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg, + MessageConverter_from_1_0.getBodyMimeType(bodyObject), + messageContent.length); + + return new StoredMessage<MessageMetaData_0_10>() + { + @Override + public MessageMetaData_0_10 getMetaData() + { + return messageMetaData_0_10; + } + + @Override + public long getMessageNumber() + { + return serverMsg.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + int size = messageContent.length - offsetInMessage; + if(dst.remaining() < size) + { + size = dst.remaining(); + } + ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size); + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return ByteBuffer.wrap(messageContent, offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg, final String bodyMimeType, final int size) + { + DeliveryProperties deliveryProps = new DeliveryProperties(); + MessageProperties messageProps = new MessageProperties(); + + + + deliveryProps.setExpiration(serverMsg.getExpiration()); + deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); + deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + + messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); + messageProps.setContentLength(size); + messageProps.setContentType(bodyMimeType); + if(serverMsg.getMessageHeader().getCorrelationId() != null) + { + messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); + } + + Header header = new Header(deliveryProps, messageProps, null); + return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); + } + + + + @Override + public String getType() + { + return "v1-0 to v0-10"; + } + + +} diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter index 045eb72cb1..4c9e612f32 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -17,3 +17,4 @@ # under the License. # org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0 +org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_1_0_to_v0_10 diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java new file mode 100644 index 0000000000..d9391d89a5 --- /dev/null +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8.java @@ -0,0 +1,209 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.converter.v0_8_v1_0; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0; +import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; +import org.apache.qpid.server.protocol.v1_0.Message_1_0; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import java.nio.ByteBuffer; +import java.util.LinkedHashMap; +import java.util.Map; + +public class MessageConverter_1_0_to_v0_8 implements MessageConverter<Message_1_0, AMQMessage> +{ + private static final int BASIC_CLASS_ID = 60; + + + public Class<Message_1_0> getInputClass() + { + return Message_1_0.class; + } + + @Override + public Class<AMQMessage> getOutputClass() + { + return AMQMessage.class; + } + + @Override + public AMQMessage convert(Message_1_0 serverMsg, VirtualHost vhost) + { + return new AMQMessage(convertToStoredMessage(serverMsg), null); + } + + private StoredMessage<MessageMetaData> convertToStoredMessage(final Message_1_0 serverMsg) + { + Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); + + + + + final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject); + final MessageMetaData messageMetaData_0_8 = convertMetaData(serverMsg, + MessageConverter_from_1_0.getBodyMimeType(bodyObject), + messageContent.length); + + return new StoredMessage<MessageMetaData>() + { + @Override + public MessageMetaData getMetaData() + { + return messageMetaData_0_8; + } + + @Override + public long getMessageNumber() + { + return serverMsg.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + int size = messageContent.length - offsetInMessage; + if(dst.remaining() < size) + { + size = dst.remaining(); + } + ByteBuffer buf = ByteBuffer.wrap(messageContent, offsetInMessage, size); + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return ByteBuffer.wrap(messageContent, offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData convertMetaData(final Message_1_0 serverMsg, final String bodyMimeType, final int size) + { + + MessagePublishInfo publishInfo = new MessagePublishInfo() + { + @Override + public AMQShortString getExchange() + { + return null; + } + + @Override + public void setExchange(final AMQShortString amqShortString) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isImmediate() + { + return false; + } + + @Override + public boolean isMandatory() + { + return false; + } + + @Override + public AMQShortString getRoutingKey() + { + MessageMetaData_1_0.MessageHeader_1_0 header = + serverMsg.getMessageHeader(); + String key = header.getTo(); + if(key == null) + { + key = header.getSubject(); + } + + return AMQShortString.valueOf(key); + } + }; + + + final BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + props.setAppId(serverMsg.getMessageHeader().getAppId()); + props.setContentType(bodyMimeType); + props.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId()); + props.setDeliveryMode(serverMsg.isPersistent() ? BasicContentHeaderProperties.PERSISTENT : BasicContentHeaderProperties.NON_PERSISTENT); + props.setExpiration(serverMsg.getExpiration()); + props.setMessageId(serverMsg.getMessageHeader().getMessageId()); + props.setPriority(serverMsg.getMessageHeader().getPriority()); + props.setReplyTo(serverMsg.getMessageHeader().getReplyTo()); + props.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + props.setUserId(serverMsg.getMessageHeader().getUserId()); + + Map<String,Object> headerProps = new LinkedHashMap<String, Object>(); + + for(String headerName : serverMsg.getMessageHeader().getHeaderNames()) + { + headerProps.put(headerName, serverMsg.getMessageHeader().getHeader(headerName)); + } + + props.setHeaders(FieldTable.convertToFieldTable(headerProps)); + + final ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + chb.setBodySize(size); + + return new MessageMetaData(publishInfo, chb, serverMsg.getArrivalTime()); + } + + + @Override + public String getType() + { + return "v1-0 to v0-8"; + } + + +} diff --git a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter index cf4643f2b8..61259f870d 100644 --- a/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter +++ b/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -17,3 +17,4 @@ # under the License. # org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_0_8_to_1_0 +org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_1_0_to_v0_8 |
