From 03485e0e416d74c74c305bfbcb0864628c8075fe Mon Sep 17 00:00:00 2001 From: Keith Wall Date: Wed, 9 Nov 2011 08:59:26 +0000 Subject: QPID-3519: refactor consumer argument handling Applied patch from Oleksandr Rudyy and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1199664 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/client/AMQSession.java | 20 +------------------- .../java/org/apache/qpid/client/AMQSession_0_10.java | 4 ++-- .../java/org/apache/qpid/client/AMQSession_0_8.java | 17 +---------------- .../org/apache/qpid/client/BasicMessageConsumer.java | 19 +++++++++++++++++-- .../qpid/client/BasicMessageConsumer_0_10.java | 4 ++-- .../apache/qpid/client/BasicMessageConsumer_0_8.java | 16 ++++++++++++++-- .../org/apache/qpid/client/AMQSession_0_10Test.java | 17 ++++++++--------- 7 files changed, 45 insertions(+), 52 deletions(-) (limited to 'java/client/src') diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 4c4d2c75b1..ef44221ec1 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -92,7 +92,6 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.MessageFilter; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.jms.Session; import org.apache.qpid.protocol.AMQConstant; @@ -2011,28 +2010,11 @@ public abstract class AMQSession extends Closeable implements Messa protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable arguments, int prefetchHigh, int prefetchLow, + FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { _channelId = channelId; @@ -160,7 +161,6 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa _messageFactory = messageFactory; _session = session; _protocolHandler = protocolHandler; - _arguments = arguments; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; @@ -196,6 +196,21 @@ public abstract class BasicMessageConsumer extends Closeable implements Messa { _acknowledgeMode = acknowledgeMode; } + + final FieldTable ft = FieldTableFactory.newFieldTable(); + // rawSelector is used by HeadersExchange and is not a JMS Selector + if (rawSelector != null) + { + ft.addAll(rawSelector); + } + + // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a + // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise + // possible to determine when querying the broker whether there are no arguments or just a non-matching selector + // argument, as specifying null for the arguments when querying means they should not be checked at all + ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector); + + _arguments = ft; } public AMQDestination getDestination() diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index ae2068b75b..47c20b683c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -72,12 +72,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer session, AMQProtocolHandler protocolHandler, - FieldTable arguments, int prefetchHigh, int prefetchLow, + FieldTable rawSelector, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); + rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose); _0_10session = (AMQSession_0_10) session; _preAcquire = evaluatePreAcquire(browseOnly, destination); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index cc061e35cb..cf1d7cedeb 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -27,6 +27,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.*; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,12 +38,23 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer