summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-12 17:40:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-12 17:40:29 +0000
commit1fa76894ffd73c85c31ce161cb23155c0d42e138 (patch)
tree03d9b0dafc6ebf83539e250a3311b9c4eb9b77b5 /qpid/java/broker
parent6118d8d221424cd49e311d75b8495cad08502200 (diff)
downloadqpid-python-1fa76894ffd73c85c31ce161cb23155c0d42e138.tar.gz
QPID-3753 : [Java Broker] Improve automatic conversion of messages between 0-8/9/9-1 and 0-10
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1230661 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java3
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java34
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java72
6 files changed, 88 insertions, 27 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 82ac01cea8..3f5c204f43 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -266,7 +266,8 @@ public class AMQChannel implements SessionConfig, AMQSessionModel
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQSecurityException
{
- if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), info.getRoutingKey().asString(), e.getName()))
+ String routingKey = info.getRoutingKey() == null ? null : info.getRoutingKey().asString();
+ if (!getVirtualHost().getSecurityManager().authorisePublish(info.isImmediate(), routingKey, e.getName()))
{
throw new AMQSecurityException("Permission denied: " + e.getName());
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
index 483bca894e..fa06a99204 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
@@ -20,22 +20,23 @@
*/
package org.apache.qpid.server.output;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.*;
import org.apache.qpid.AMQPInvalidClassException;
+import java.util.HashMap;
import java.util.Map;
public class HeaderPropertiesConverter
{
- public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage)
+ public static BasicContentHeaderProperties convert(MessageTransferMessage messageTransferMessage, VirtualHost vhost)
{
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
@@ -82,11 +83,23 @@ public class HeaderPropertiesConverter
}
if(messageProps.hasMessageId())
{
- props.setMessageId(messageProps.getMessageId().toString());
+ props.setMessageId("ID:" + messageProps.getMessageId().toString());
}
+ if(messageProps.hasReplyTo())
+ {
+ ReplyTo replyTo = messageProps.getReplyTo();
+ String exchangeName = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+ if(exchangeName == null)
+ {
+ exchangeName = "";
+ }
- // TODO Reply-to
+ Exchange exchange = vhost.getExchangeRegistry().getExchange(exchangeName);
+ String exchangeClass = exchange == null ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() : exchange.getType().getName().asString();
+ props.setReplyTo(exchangeClass + "://"+exchangeName+"//?routingkey='"+(routingKey==null ? "" : routingKey+"'"));
+ }
if(messageProps.hasUserId())
{
props.setUserId(new AMQShortString(messageProps.getUserId()));
@@ -94,7 +107,12 @@ public class HeaderPropertiesConverter
if(messageProps.hasApplicationHeaders())
{
- Map<String, Object> appHeaders = messageProps.getApplicationHeaders();
+ Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
+ if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
+ {
+ props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
+ }
+
FieldTable ft = new FieldTable();
for(Map.Entry<String, Object> entry : appHeaders.entrySet())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
index efd904f6aa..1e62e5e9ca 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
@@ -91,7 +91,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
else
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
chb.bodySize = message.getSize();
return chb;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
index 010afcb1a9..78507b0cf2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
@@ -86,7 +86,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
else
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
chb.bodySize = message.getSize();
return chb;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
index 5e2b3e4556..9102b6c651 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
@@ -85,7 +85,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
else
{
final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
chb.bodySize = message.getSize();
return chb;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index 3246dc0275..7aac7a3f7f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -51,23 +51,15 @@ import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.transport.ServerSession;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Header;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryPriority;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.*;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.url.AMQBindingURL;
+import java.net.URISyntaxException;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -85,7 +77,6 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
-
private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
@@ -461,7 +452,53 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
}
- // TODO - ReplyTo
+ if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ {
+ String origReplyToString = properties.getReplyTo().asString();
+ ReplyTo replyTo = new ReplyTo();
+ // if the string looks like a binding URL, then attempt to parse it...
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+ AMQShortString routingKey = burl.getRoutingKey();
+ if(routingKey != null)
+ {
+ replyTo.setRoutingKey(routingKey.asString());
+ }
+
+ AMQShortString exchangeName = burl.getExchangeName();
+ if(exchangeName != null)
+ {
+ replyTo.setExchange(exchangeName.asString());
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ replyTo.setRoutingKey(origReplyToString);
+ }
+ messageProps.setReplyTo(replyTo);
+
+ }
+
+ if(properties.getMessageId() != null)
+ {
+ try
+ {
+ String messageIdAsString = properties.getMessageIdAsString();
+ if(messageIdAsString.startsWith("ID:"))
+ {
+ messageIdAsString = messageIdAsString.substring(3);
+ }
+ UUID uuid = UUID.fromString(messageIdAsString);
+ messageProps.setMessageId(uuid);
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore - can't parse
+ }
+ }
+
+
if(properties.getUserId() != null)
{
@@ -470,7 +507,12 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
FieldTable fieldTable = properties.getHeaders();
- final Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+ Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+ if(properties.getType() != null)
+ {
+ appHeaders.put("x-jms-type", properties.getTypeAsString());
+ }
messageProps.setApplicationHeaders(appHeaders);