diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-12 17:40:29 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-12 17:40:29 +0000 |
| commit | 1fa76894ffd73c85c31ce161cb23155c0d42e138 (patch) | |
| tree | 03d9b0dafc6ebf83539e250a3311b9c4eb9b77b5 /qpid/java/broker | |
| parent | 6118d8d221424cd49e311d75b8495cad08502200 (diff) | |
| download | qpid-python-1fa76894ffd73c85c31ce161cb23155c0d42e138.tar.gz | |
QPID-3753 : [Java Broker] Improve automatic conversion of messages between 0-8/9/9-1 and 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1230661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
6 files changed, 88 insertions, 27 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 82ac01cea8..3f5c204f43 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -266,7 +266,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException { - if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), info.getRoutingKey().asString(), e.getName())) + String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString(); + if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName())) { throw new AMQSecurityException("Permission denied: " + e.getName()); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java index 483bca894e..fa06a99204 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java @@ -20,22 +20,23 @@ */ package org.apache.qpid.server.output; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.*; import org.apache.qpid.AMQPInvalidClassException; +import java.util.HashMap; import java.util.Map; public class HeaderPropertiesConverter { - public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage) + public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost) { BasicContentHeaderProperties props = new BasicContentHeaderProperties(); @@ -82,11 +83,23 @@ public class HeaderPropertiesConverter } if(messageProps.hasMessageId()) { - props.setMessageId(messageProps.getMessageId().toString()); + props.setMessageId("ID:" + messageProps.getMessageId().toString()); } + if(messageProps.hasReplyTo()) + { + ReplyTo replyTo = messageProps.getReplyTo(); + String exchangeName = replyTo.getExchange(); + String routingKey = replyTo.getRoutingKey(); + if(exchangeName == null) + { + exchangeName = ""; + } - // TODO Reply-to + Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName); + String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString(); + props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'")); + } if(messageProps.hasUserId()) { props.setUserId(new AMQShortString(messageProps.getUserId())); @@ -94,7 +107,12 @@ public class HeaderPropertiesConverter if(messageProps.hasApplicationHeaders()) { - Map<String, Object> appHeaders = messageProps.getApplicationHeaders(); + Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); + if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) + { + props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); + } + FieldTable ft = new FieldTable(); for(Map.Entry<String, Object> entry : appHeaders.entrySet()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java index efd904f6aa..1e62e5e9ca 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java @@ -91,7 +91,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java index 010afcb1a9..78507b0cf2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java @@ -86,7 +86,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java index 5e2b3e4556..9102b6c651 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java @@ -85,7 +85,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter else { final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message); + BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID); chb.bodySize = message.getSize(); return chb; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index 3246dc0275..7aac7a3f7f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.transport.ServerSession; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.transport.DeliveryProperties; -import org.apache.qpid.transport.Header; -import org.apache.qpid.transport.MessageAcceptMode; -import org.apache.qpid.transport.MessageAcquireMode; -import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryPriority; -import org.apache.qpid.transport.MessageFlowMode; -import org.apache.qpid.transport.MessageProperties; -import org.apache.qpid.transport.MessageTransfer; -import org.apache.qpid.transport.Method; -import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.Struct; +import org.apache.qpid.transport.*; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.AMQException; +import org.apache.qpid.transport.util.Logger; +import org.apache.qpid.url.AMQBindingURL; +import java.net.URISyntaxException; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -85,7 +77,6 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { - private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -461,7 +452,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); } - // TODO - ReplyTo + if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) + { + String origReplyToString = properties.getReplyTo().asString(); + ReplyTo replyTo = new ReplyTo(); + // if the string looks like a binding URL, then attempt to parse it... + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + } + catch (URISyntaxException e) + { + replyTo.setRoutingKey(origReplyToString); + } + messageProps.setReplyTo(replyTo); + + } + + if(properties.getMessageId() != null) + { + try + { + String messageIdAsString = properties.getMessageIdAsString(); + if(messageIdAsString.startsWith("ID:")) + { + messageIdAsString = messageIdAsString.substring(3); + } + UUID uuid = UUID.fromString(messageIdAsString); + messageProps.setMessageId(uuid); + } + catch(IllegalArgumentException e) + { + // ignore - can't parse + } + } + + if(properties.getUserId() != null) { @@ -470,7 +507,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr FieldTable fieldTable = properties.getHeaders(); - final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + + if(properties.getType() != null) + { + appHeaders.put("x-jms-type", properties.getTypeAsString()); + } messageProps.setApplicationHeaders(appHeaders); |
