From 1635ca5849b7c765d5d7be9cd01d46b06349f320 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 26 Aug 2014 17:01:07 +0000 Subject: QPID-6040 : [Java Broker] [Java Client] add the ability to create a single consumer that is consuming across a collection of queues git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1620659 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/client/AMQSession.java | 29 +++++++++-------- .../org/apache/qpid/client/AMQSession_0_10.java | 10 +++--- .../org/apache/qpid/client/AMQSession_0_8.java | 12 ++++++- .../client/messaging/address/AddressHelper.java | 38 +++++++++++++++------- .../apache/qpid/client/messaging/address/Link.java | 2 +- 5 files changed, 59 insertions(+), 32 deletions(-) (limited to 'qpid/java/client/src') diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 0183c30276..9bdcb9e83f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -683,47 +683,48 @@ public abstract class AMQSession) link.getSubscription().getArgs()); + arguments.putAll(link.getSubscription().getArgs()); } boolean acceptModeNone = getAcknowledgeMode() == NO_ACKNOWLEDGE; + String queue = queueName == null ? destination.getAddressName() : queueName.toString(); getQpidSession().messageSubscribe - (queueName.toString(), String.valueOf(tag), + (queue, String.valueOf(tag), acceptModeNone ? MessageAcceptMode.NONE : MessageAcceptMode.EXPLICIT, preAcquire ? MessageAcquireMode.PRE_ACQUIRED : MessageAcquireMode.NOT_ACQUIRED, null, 0, arguments, consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE); - String consumerTag = ((BasicMessageConsumer_0_10)consumer).getConsumerTagString(); + String consumerTag = (consumer).getConsumerTagString(); if (capacity == 0) { @@ -1175,7 +1176,8 @@ public class AMQSession_0_10 extends AMQSession arguments = FieldTable.convertToMap(consumer.getArguments()); + + Link link = destination.getLink(); + if (link != null && link.getSubscription() != null && link.getSubscription().getArgs() != null) + { + arguments.putAll(link.getSubscription().getArgs()); + } + BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(), queueName, new AMQShortString(String.valueOf(tag)), @@ -521,7 +531,7 @@ public class AMQSession_0_8 extends AMQSession)x_subscribe.get(ARGUMENTS)); + x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBE); + } + else if(linkMap.containsKey(X_SUBSCRIBES)) + { + // left in for backwards compatibility with old broken constant + x_subscribe = (Map)((Map) _address.getOptions().get(LINK)).get(X_SUBSCRIBES); + } + + if(x_subscribe != null) + { + if (x_subscribe.containsKey(ARGUMENTS)) + { + link.getSubscription().setArgs((Map) x_subscribe.get(ARGUMENTS)); + } + + boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? + Boolean.parseBoolean((String) x_subscribe.get(EXCLUSIVE)) : false; + + link.getSubscription().setExclusive(exclusive); } - - boolean exclusive = x_subscribe.containsKey(EXCLUSIVE) ? - Boolean.parseBoolean((String)x_subscribe.get(EXCLUSIVE)): false; - - link.getSubscription().setExclusive(exclusive); } link.setBindings(getBindings(linkMap)); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java index a614690f83..7e9cb3072a 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/messaging/address/Link.java @@ -28,7 +28,7 @@ import java.util.Map; import org.apache.qpid.client.AMQDestination.Binding; public class Link -{ +{ public enum FilterType { SQL92, XQUERY, SUBJECT } public enum Reliability { UNRELIABLE, AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE } -- cgit v1.2.1