summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-09 08:59:26 +0000
committerKeith Wall <kwall@apache.org>2011-11-09 08:59:26 +0000
commit03485e0e416d74c74c305bfbcb0864628c8075fe (patch)
tree2a8face28024b691002a2bc13b3ce9d6490fc3cb /java/client/src
parentcc6176cfb9c22ae2cdf56b03d33aca8975b7cd71 (diff)
downloadqpid-python-03485e0e416d74c74c305bfbcb0864628c8075fe.tar.gz
QPID-3519: refactor consumer argument handling
Applied patch from Oleksandr Rudyy<orudyy@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1199664 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java20
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java17
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java19
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java4
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java16
-rw-r--r--java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java17
7 files changed, 45 insertions, 52 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 4c4d2c75b1..ef44221ec1 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -92,7 +92,6 @@ import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.MessageFilter;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
@@ -2011,28 +2010,11 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
AMQDestination amqd = (AMQDestination) destination;
- // TODO: Define selectors in AMQP
- // TODO: construct the rawSelector from the selector string if rawSelector == null
- final FieldTable ft = FieldTableFactory.newFieldTable();
- // if (rawSelector != null)
- // ft.put("headers", rawSelector.getDataAsBytes());
- // rawSelector is used by HeadersExchange and is not a JMS Selector
- if (rawSelector != null)
- {
- ft.addAll(rawSelector);
- }
-
- // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
- // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
- // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
- // argument, as specifying null for the arguments when querying means they should not be checked at all
- ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
-
C consumer;
try
{
consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
- noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
+ noLocal, exclusive, messageSelector, rawSelector, noConsume, autoClose);
}
catch(TransportException e)
{
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 6bab715e4b..7e5edef38d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -509,13 +509,13 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
public BasicMessageConsumer_0_10 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal,
final boolean exclusive, String messageSelector,
- final FieldTable ft, final boolean noConsume,
+ final FieldTable rawSelector, final boolean noConsume,
final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_10(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry, this, protocolHandler, ft, prefetchHigh,
+ _messageFactoryRegistry, this, protocolHandler, rawSelector, prefetchHigh,
prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose);
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 85fc857014..e33410f5fe 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -337,21 +337,6 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
MessageFilter messageSelector,
int tag) throws AMQException, FailoverException
{
- FieldTable arguments = FieldTableFactory.newFieldTable();
- if (messageSelector != null)
- {
- arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector.getSelector());
- }
-
- if (consumer.isAutoClose())
- {
- arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
- }
-
- if (consumer.isBrowseOnly())
- {
- arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
- }
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
@@ -360,7 +345,7 @@ public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, B
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
- arguments);
+ consumer.getArguments());
AMQFrame jmsConsume = body.generateFrame(_channelId);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index b1975338b7..7bb400fada 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -27,6 +27,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
@@ -150,7 +151,7 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
_channelId = channelId;
@@ -160,7 +161,6 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -196,6 +196,21 @@ public abstract class BasicMessageConsumer<U> extends Closeable implements Messa
{
_acknowledgeMode = acknowledgeMode;
}
+
+ final FieldTable ft = FieldTableFactory.newFieldTable();
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
+ {
+ ft.addAll(rawSelector);
+ }
+
+ // We must always send the selector argument even if empty, so that we can tell when a selector is removed from a
+ // durable topic subscription that the broker arguments don't match any more. This is because it is not otherwise
+ // possible to determine when querying the broker whether there are no arguments or just a non-matching selector
+ // argument, as specifying null for the arguments when querying means they should not be checked at all
+ ft.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector == null ? "" : messageSelector);
+
+ _arguments = ft;
}
public AMQDestination getDestination()
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index ae2068b75b..47c20b683c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -72,12 +72,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession<?,?> session, AMQProtocolHandler protocolHandler,
- FieldTable arguments, int prefetchHigh, int prefetchLow,
+ FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
+ rawSelector, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, browseOnly, autoClose);
_0_10session = (AMQSession_0_10) session;
_preAcquire = evaluatePreAcquire(browseOnly, destination);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index cc061e35cb..cf1d7cedeb 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -27,6 +27,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.*;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,12 +38,23 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<UnprocessedMe
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ AMQProtocolHandler protocolHandler, FieldTable rawSelector, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean browseOnly, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
+ protocolHandler, rawSelector, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, browseOnly, autoClose);
+ final FieldTable consumerArguments = getArguments();
+ if (isAutoClose())
+ {
+ consumerArguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
+ }
+
+ if (isBrowseOnly())
+ {
+ consumerArguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
+ }
+
}
void sendCancel() throws AMQException, FailoverException
diff --git a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
index 849827216c..68531eee84 100644
--- a/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
+++ b/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
@@ -29,7 +29,6 @@ import javax.jms.MessageProducer;
import junit.framework.TestCase;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.Connection.SessionFactory;
@@ -334,7 +333,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.sendConsume(consumer, new AMQShortString("test"), null, true, null, 1);
}
catch (Exception e)
@@ -383,7 +382,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
fail("JMSException should be thrown");
@@ -401,7 +400,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receive(1);
}
@@ -419,7 +418,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
session.start();
consumer.receiveNoWait();
fail("JMSException should be thrown");
@@ -437,7 +436,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
fail("JMSException should be thrown");
}
@@ -454,7 +453,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.setMessageListener(new MockMessageListener());
}
catch (Exception e)
@@ -471,7 +470,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
}
catch (Exception e)
@@ -488,7 +487,7 @@ public class AMQSession_0_10Test extends TestCase
try
{
BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false,
- null, new FieldTable(), false, true);
+ null, null, false, true);
consumer.close();
fail("JMSException should be thrown");
}