diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2012-02-13 22:30:47 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2012-02-13 22:30:47 +0000 |
commit | a3542fa13e0096b53319216532b2a79fe1d3f0f5 (patch) | |
tree | f095e3c0b136afebb6aad14d7d599feb0e0f6d75 /java/client | |
parent | 938df01f68a303b8d54167093452e0756befdf0b (diff) | |
download | qpid-python-a3542fa13e0096b53319216532b2a79fe1d3f0f5.tar.gz |
QPID-3836 Modified the address handling code to pass the noLocal
argument to queue-declare method.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1243719 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
4 files changed, 19 insertions, 10 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index ac728e78eb..e7e937b689 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1045,7 +1045,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { try { - handleAddressBasedDestination(dest,false,true); + handleAddressBasedDestination(dest,false,noLocal,true); if (dest.getAddressType() != AMQDestination.TOPIC_TYPE) { throw new JMSException("Durable subscribers can only be created for Topics"); @@ -2905,7 +2905,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic if (amqd.getDestSyntax() == DestSyntax.ADDR) { - handleAddressBasedDestination(amqd,true,nowait); + handleAddressBasedDestination(amqd,true,consumer.isNoLocal(),nowait); } else { @@ -2966,6 +2966,7 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public abstract void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, + boolean noLocal, boolean noWait) throws AMQException; private void registerProducer(long producerId, MessageProducer producer) diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index a27c52c686..816ad1f222 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -766,8 +766,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else { QueueNode node = (QueueNode)amqd.getSourceNode(); + Map<String,Object> arguments = new HashMap<String,Object>(); + arguments.putAll((Map<? extends String, ? extends Object>) node.getDeclareArgs()); + if (arguments == null || arguments.get(AddressHelper.NO_LOCAL) == null) + { + arguments.put(AddressHelper.NO_LOCAL, noLocal); + } getQpidSession().queueDeclare(queueName.toString(), node.getAlternateExchange() , - node.getDeclareArgs(), + arguments, node.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, node.isDurable() ? Option.DURABLE : Option.NONE, node.isExclusive() ? Option.EXCLUSIVE : Option.NONE); @@ -1167,13 +1173,14 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic @SuppressWarnings("deprecation") public void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, + boolean noLocal, boolean noWait) throws AMQException { if (dest.isAddressResolved() && dest.isResolvedAfter(getAMQConnection().getLastFailoverTime())) { if (isConsumer && AMQDestination.TOPIC_TYPE == dest.getAddressType()) { - createSubscriptionQueue(dest); + createSubscriptionQueue(dest,noLocal); } } else @@ -1202,7 +1209,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic else if(createNode) { setLegacyFiledsForQueueType(dest); - send0_10QueueDeclare(dest,null,false,noWait); + send0_10QueueDeclare(dest,null,noLocal,noWait); sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(), null,dest.getExchangeName(),dest, false); break; @@ -1217,7 +1224,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic verifySubject(dest); if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) { - createSubscriptionQueue(dest); + createSubscriptionQueue(dest, noLocal); } break; } @@ -1232,7 +1239,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic false); if (isConsumer && !isQueueExist(dest,(QueueNode)dest.getSourceNode(),true)) { - createSubscriptionQueue(dest); + createSubscriptionQueue(dest,noLocal); } break; } @@ -1295,7 +1302,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - private void createSubscriptionQueue(AMQDestination dest) throws AMQException + private void createSubscriptionQueue(AMQDestination dest, boolean noLocal) throws AMQException { QueueNode node = (QueueNode)dest.getSourceNode(); // source node is never null @@ -1308,7 +1315,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } node.setExclusive(true); node.setAutoDelete(!node.isDurable()); - send0_10QueueDeclare(dest,null,false,true); + send0_10QueueDeclare(dest,null,noLocal,true); getQpidSession().exchangeBind(dest.getQueueName(), dest.getAddressName(), dest.getSubject(), diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index a49fb256a7..29f1925cbc 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -624,6 +624,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe public void handleAddressBasedDestination(AMQDestination dest, boolean isConsumer, + boolean noLocal, boolean noWait) throws AMQException { throw new UnsupportedOperationException("The new addressing based sytanx is " diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 91811ccf98..024219cfd6 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -86,7 +86,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { try { - getSession().handleAddressBasedDestination(destination,false,false); + getSession().handleAddressBasedDestination(destination,false,false,false); } catch(Exception e) { |