diff options
| author | Aidan Skinner <aidan@apache.org> | 2008-07-30 16:47:41 +0000 |
|---|---|---|
| committer | Aidan Skinner <aidan@apache.org> | 2008-07-30 16:47:41 +0000 |
| commit | 5f2c1ce06597827d24428bc2479a6e79990d854b (patch) | |
| tree | 4c434aa27e3f4422dfd6abed6c5bf335ed91c2d4 /qpid/java/client | |
| parent | fc3f14b551d86d617699ccb07b72f3bacf500f9b (diff) | |
| download | qpid-python-5f2c1ce06597827d24428bc2479a6e79990d854b.tar.gz | |
QPID-1192: Make consumer send Selector as part of binding.
QPID-1191: Add test to exhibit leak
Change DurableSubscriptionTest to validate exception type recieved
Make BasicMessageConsumer validate the Selector before attempting creation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@681117 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
5 files changed, 38 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 544c8014b4..801cf53d83 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -1650,10 +1650,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess final FieldTable ft = FieldTableFactory.newFieldTable(); // if (rawSelector != null) // ft.put("headers", rawSelector.getDataAsBytes()); - if (rawSelector != null) + // rawSelector is used by HeadersExchange and is not a JMS Selector + if (rawSelector != null) { ft.addAll(rawSelector); } + + if (messageSelector != null) + { + ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector); + } BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow, noLocal, exclusive, messageSelector, ft, noConsume, autoClose); @@ -1700,7 +1706,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess } public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException; /** @@ -2357,8 +2363,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess // store the consumer queue name consumer.setQueuename(queueName); - // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable()); - bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd); + bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd); // If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch if (!_immediatePrefetch) diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java index 82bff1dda7..3ccff410eb 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java @@ -325,13 +325,13 @@ public final class AMQSession_0_8 extends AMQSession } public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh, - final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft, + final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments, final boolean noConsume, final boolean autoClose) throws JMSException { final AMQProtocolHandler protocolHandler = getProtocolHandler(); return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal, - _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow, + _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, _acknowledgeMode, noConsume, autoClose); } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java index 41880ee11e..9acc69ec55 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java @@ -91,7 +91,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me /** * We need to store the "raw" field table so that we can resubscribe in the event of failover being required */ - private final FieldTable _rawSelectorFieldTable; + private final FieldTable _arguments; /** * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of @@ -168,7 +168,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + FieldTable arguments, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) { _channelId = channelId; @@ -179,7 +179,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me _messageFactory = messageFactory; _session = session; _protocolHandler = protocolHandler; - _rawSelectorFieldTable = rawSelectorFieldTable; + _arguments = arguments; _prefetchHigh = prefetchHigh; _prefetchLow = prefetchLow; _exclusive = exclusive; @@ -343,9 +343,9 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me _receivingThread = null; } - public FieldTable getRawSelectorFieldTable() + public FieldTable getArguments() { - return _rawSelectorFieldTable; + return _arguments; } public int getPrefetch() diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java index 255b38aa10..ab9e1d285f 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java @@ -83,12 +83,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, AMQProtocolHandler protocolHandler, - FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, + FieldTable arguments, int prefetchHigh, int prefetchLow, boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException { super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler, - rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); _0_10session = (AMQSession_0_10) session; if (messageSelector != null && !messageSelector.equals("")) { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java index 5414e25539..816bc430b2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java @@ -22,12 +22,17 @@ package org.apache.qpid.client; import java.util.concurrent.TimeUnit; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; + import org.apache.qpid.AMQException; +import org.apache.qpid.QpidException; import org.apache.qpid.client.failover.FailoverException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.MessageFactoryRegistry; import org.apache.qpid.client.message.UnprocessedMessage; import org.apache.qpid.client.protocol.AMQProtocolHandler; +import org.apache.qpid.filter.JMSSelectorFilter; import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.BasicCancelBody; import org.apache.qpid.framing.BasicCancelOkBody; @@ -43,12 +48,24 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination, String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session, - AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow, - boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) + AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow, + boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException { super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session, - protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, + protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose); + try + { + + if (messageSelector != null && messageSelector.length() > 0) + { + JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector); + } + } + catch (QpidException e) + { + throw new InvalidSelectorException("cannot create consumer because of selector issue"); + } } void sendCancel() throws AMQException, FailoverException |
