summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
authorAidan Skinner <aidan@apache.org>2008-07-30 16:47:41 +0000
committerAidan Skinner <aidan@apache.org>2008-07-30 16:47:41 +0000
commit5f2c1ce06597827d24428bc2479a6e79990d854b (patch)
tree4c434aa27e3f4422dfd6abed6c5bf335ed91c2d4 /qpid/java/client
parentfc3f14b551d86d617699ccb07b72f3bacf500f9b (diff)
downloadqpid-python-5f2c1ce06597827d24428bc2479a6e79990d854b.tar.gz
QPID-1192: Make consumer send Selector as part of binding.
QPID-1191: Add test to exhibit leak Change DurableSubscriptionTest to validate exception type recieved Make BasicMessageConsumer validate the Selector before attempting creation git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@681117 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java13
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java10
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java4
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java23
5 files changed, 38 insertions, 16 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 544c8014b4..801cf53d83 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -1650,10 +1650,16 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
final FieldTable ft = FieldTableFactory.newFieldTable();
// if (rawSelector != null)
// ft.put("headers", rawSelector.getDataAsBytes());
- if (rawSelector != null)
+ // rawSelector is used by HeadersExchange and is not a JMS Selector
+ if (rawSelector != null)
{
ft.addAll(rawSelector);
}
+
+ if (messageSelector != null)
+ {
+ ft.put(new AMQShortString("x-filter-jms-selector"), messageSelector);
+ }
BasicMessageConsumer consumer = createMessageConsumer(amqd, prefetchHigh, prefetchLow,
noLocal, exclusive, messageSelector, ft, noConsume, autoClose);
@@ -1700,7 +1706,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
}
public abstract BasicMessageConsumer createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable rawSelector,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String selector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException;
/**
@@ -2357,8 +2363,7 @@ public abstract class AMQSession extends Closeable implements Session, QueueSess
// store the consumer queue name
consumer.setQueuename(queueName);
- // bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- bindQueue(queueName, amqd.getRoutingKey(), consumer.getRawSelectorFieldTable(), amqd.getExchangeName(), amqd);
+ bindQueue(queueName, amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd);
// If IMMEDIATE_PREFETCH is not required then suspsend the channel to delay prefetch
if (!_immediatePrefetch)
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 82bff1dda7..3ccff410eb 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -325,13 +325,13 @@ public final class AMQSession_0_8 extends AMQSession
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
- final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable ft,
+ final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
- _messageFactoryRegistry,this, protocolHandler, ft, prefetchHigh, prefetchLow,
+ _messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
exclusive, _acknowledgeMode, noConsume, autoClose);
}
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
index 41880ee11e..9acc69ec55 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
@@ -91,7 +91,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
- private final FieldTable _rawSelectorFieldTable;
+ private final FieldTable _arguments;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
@@ -168,7 +168,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
{
_channelId = channelId;
@@ -179,7 +179,7 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
_messageFactory = messageFactory;
_session = session;
_protocolHandler = protocolHandler;
- _rawSelectorFieldTable = rawSelectorFieldTable;
+ _arguments = arguments;
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
@@ -343,9 +343,9 @@ public abstract class BasicMessageConsumer<H, B> extends Closeable implements Me
_receivingThread = null;
}
- public FieldTable getRawSelectorFieldTable()
+ public FieldTable getArguments()
{
- return _rawSelectorFieldTable;
+ return _arguments;
}
public int getPrefetch()
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 255b38aa10..ab9e1d285f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -83,12 +83,12 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], By
protected BasicMessageConsumer_0_10(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory,
AMQSession session, AMQProtocolHandler protocolHandler,
- FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
+ FieldTable arguments, int prefetchHigh, int prefetchLow,
boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
throws JMSException
{
super(channelId, connection, destination, messageSelector, noLocal, messageFactory, session, protocolHandler,
- rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
+ arguments, prefetchHigh, prefetchLow, exclusive, acknowledgeMode, noConsume, autoClose);
_0_10session = (AMQSession_0_10) session;
if (messageSelector != null && !messageSelector.equals(""))
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
index 5414e25539..816bc430b2 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
@@ -22,12 +22,17 @@ package org.apache.qpid.client;
import java.util.concurrent.TimeUnit;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+
import org.apache.qpid.AMQException;
+import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.filter.JMSSelectorFilter;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicCancelBody;
import org.apache.qpid.framing.BasicCancelOkBody;
@@ -43,12 +48,24 @@ public class BasicMessageConsumer_0_8 extends BasicMessageConsumer<ContentHeader
protected BasicMessageConsumer_0_8(int channelId, AMQConnection connection, AMQDestination destination,
String messageSelector, boolean noLocal, MessageFactoryRegistry messageFactory, AMQSession session,
- AMQProtocolHandler protocolHandler, FieldTable rawSelectorFieldTable, int prefetchHigh, int prefetchLow,
- boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose)
+ AMQProtocolHandler protocolHandler, FieldTable arguments, int prefetchHigh, int prefetchLow,
+ boolean exclusive, int acknowledgeMode, boolean noConsume, boolean autoClose) throws JMSException
{
super(channelId, connection, destination,messageSelector,noLocal,messageFactory,session,
- protocolHandler, rawSelectorFieldTable, prefetchHigh, prefetchLow, exclusive,
+ protocolHandler, arguments, prefetchHigh, prefetchLow, exclusive,
acknowledgeMode, noConsume, autoClose);
+ try
+ {
+
+ if (messageSelector != null && messageSelector.length() > 0)
+ {
+ JMSSelectorFilter _filter = new JMSSelectorFilter(messageSelector);
+ }
+ }
+ catch (QpidException e)
+ {
+ throw new InvalidSelectorException("cannot create consumer because of selector issue");
+ }
}
void sendCancel() throws AMQException, FailoverException