From 3e4d1f2f56ef296ea5132511faaa8689867c499c Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sat, 1 Jun 2013 19:24:36 +0000 Subject: QPID-4897 : [Java Broker] Allow selectors on bindings fro non-topic exchanges git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1488561 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/client/AMQHeadersExchange.java | 2 +- .../java/org/apache/qpid/client/AMQSession.java | 115 ++++++++++------ .../org/apache/qpid/client/AMQSession_0_10.java | 146 +++++++++++---------- .../org/apache/qpid/client/AMQSession_0_8.java | 62 ++++++--- .../main/java/org/apache/qpid/client/AMQTopic.java | 20 ++- 5 files changed, 206 insertions(+), 139 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java index b9e9a33cd6..922cc1e2a7 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java @@ -31,7 +31,7 @@ public class AMQHeadersExchange extends AMQDestination { public AMQHeadersExchange(BindingURL binding) { - this(binding.getExchangeName()); + super(binding); } public AMQHeadersExchange(String name) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index e784e903fa..018a1ec851 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -440,7 +440,7 @@ public abstract class AMQSession subscriber = _subscriptions.get(name); - + // Not subscribed to this name in the current session if (subscriber == null) { // After the address is resolved routing key will not be null. AMQShortString topicName = dest.getRoutingKey(); - + if (_strictAMQP) { if (_strictAMQPFATAL) @@ -1046,8 +1046,8 @@ public abstract class AMQSession args = new HashMap(); - - // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise // possible to determine when querying the broker whether there are no arguments or just a non-matching selector // argument, as specifying null for the arguments when querying means they should not be checked at all @@ -1060,16 +1060,28 @@ public abstract class AMQSession subscriber; - + _subscriberDetails.lock(); try { @@ -1896,11 +1916,11 @@ public abstract class AMQSession args) throws JMSException; /** @@ -2844,14 +2865,19 @@ public abstract class AMQSession bindings = new ArrayList(); bindings.addAll(destination.getNode().getBindings()); - + String defaultExchange = destination.getAddressType() == AMQDestination.TOPIC_TYPE ? destination.getAddressName(): "amq.topic"; - + for (Binding binding: bindings) { // Currently there is a bug (QPID-3317) with setting up and tearing down x-bindings for link. @@ -386,22 +386,22 @@ public class AMQSession_0_10 extends AMQSession0) { rk = bindingKeys[0].toString(); @@ -583,10 +581,10 @@ public class AMQSession_0_10 extends AMQSession args) { boolean res; @@ -598,21 +596,27 @@ public class AMQSession_0_10 extends AMQSession arguments = new HashMap(); if (noLocal) - { + { arguments.put(AddressHelper.NO_LOCAL, true); - } + } getQpidSession().queueDeclare(queueName.toString(), "" , arguments, amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE, @@ -790,7 +794,7 @@ public class AMQSession_0_10 extends AMQSession false @@ -837,7 +841,7 @@ public class AMQSession_0_10 extends AMQSession target,Map source) { boolean match = true; for (String key: source.keySet()) { - match = target.containsKey(key) && + match = target.containsKey(key) && target.get(key).equals(source.get(key)); - - if (!match) - { + + if (!match) + { StringBuffer buf = new StringBuffer(); buf.append("Property given in address did not match with the args sent by the broker."); buf.append(" Expected { ").append(key).append(" : ").append(source.get(key)).append(" }, "); @@ -1184,22 +1188,22 @@ public class AMQSession_0_10 extends AMQSession tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); getPrefetchedMessageTags().addAll(tags); } - + RangeSet delivered = gatherRangeSet(getUnacknowledgedMessageTags()); RangeSet prefetched = gatherRangeSet(getPrefetchedMessageTags()); RangeSet all = RangeSetFactory.createRangeSet(delivered.size() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 3097b33da3..9a9da62f2a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -184,7 +184,7 @@ public class AMQSession_0_8 extends AMQSession( + new FailoverProtectedOperation() + { + public AMQMethodEvent execute() throws AMQException, FailoverException + { + return sendExchangeBound(exchangeName, routingKey, queueName); + + } + }, getAMQConnection()).execute(); + + // Extract and return the response code from the query. + ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod(); + + return (responseBody.getReplyCode() == 0); + } + + private AMQMethodEvent sendExchangeBound(AMQShortString exchangeName, + AMQShortString routingKey, + AMQShortString queueName) throws AMQException, FailoverException + { + AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody + (exchangeName, routingKey, queueName).generateFrame(getChannelId()); + + return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class); + } @Override public void sendConsume(BasicMessageConsumer_0_8 consumer, @@ -527,7 +555,7 @@ public class AMQSession_0_8 extends AMQSession