summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-10-18 11:20:53 +0000
committerKeith Wall <kwall@apache.org>2011-10-18 11:20:53 +0000
commit5a5e04545e7e45977efaf077da21b64f870caf04 (patch)
tree86c6788bc7f1db19fb4191eafe5aa4398a449b04 /java
parentb7bc31df25274935c2c00f9afcc1aad37bd93354 (diff)
downloadqpid-python-5a5e04545e7e45977efaf077da21b64f870caf04.tar.gz
QPID-3542: ensure session complete sent for filtered out messages
Applied patch from Andrew MacBean <andymacbean@gmail.com> and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1185580 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java27
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java23
-rw-r--r--java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java12
3 files changed, 46 insertions, 16 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 3812e612aa..c6a64ec894 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -294,23 +294,34 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- void messageAcknowledge(RangeSet ranges, boolean accept)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept)
{
messageAcknowledge(ranges,accept,false);
}
- void messageAcknowledge(RangeSet ranges, boolean accept,boolean setSyncBit)
+ void messageAcknowledge(final RangeSet ranges, final boolean accept, final boolean setSyncBit)
{
- Session ssn = getQpidSession();
- for (Range range : ranges)
+ final Session ssn = getQpidSession();
+ flushProcessed(ranges,accept);
+ if (accept)
{
- ssn.processed(range);
+ ssn.messageAccept(ranges, UNRELIABLE, setSyncBit ? SYNC : NONE);
}
- ssn.flushProcessed(accept ? BATCH : NONE);
- if (accept)
+ }
+
+ /**
+ * Flush any outstanding commands. This causes session complete to be sent.
+ * @param ranges the range of command ids.
+ * @param batch true if batched.
+ */
+ void flushProcessed(final RangeSet ranges, final boolean batch)
+ {
+ final Session ssn = getQpidSession();
+ for (final Range range : ranges)
{
- ssn.messageAccept(ranges, UNRELIABLE,setSyncBit? SYNC : NONE);
+ ssn.processed(range);
}
+ ssn.flushProcessed(batch ? BATCH : NONE);
}
/**
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
index 5fba351d8a..548e274571 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
@@ -269,8 +269,9 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
{
if (_logger.isDebugEnabled())
{
- _logger.debug("filterMessage - not ack'ing messaage as not aquired");
+ _logger.debug("filterMessage - not ack'ing message as not acquired");
}
+ flushUnwantedMessage(message);
}
}
@@ -312,6 +313,26 @@ public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<UnprocessedM
}
/**
+ * Flush an unwanted message. For 0-10 we need to ensure that all messages are indicated
+ * processed to ensure their AMQP command-id is marked completed.
+ *
+ * @param message The unwanted message to be flushed
+ * @throws AMQException If the unwanted message cannot be flushed due to some internal error.
+ */
+ private void flushUnwantedMessage(final AbstractJMSMessage message) throws AMQException
+ {
+ final RangeSet ranges = new RangeSet();
+ ranges.add((int) message.getDeliveryTag());
+ _0_10session.flushProcessed(ranges,false);
+
+ final AMQException amqe = _0_10session.getCurrentException();
+ if (amqe != null)
+ {
+ throw amqe;
+ }
+ }
+
+ /**
* Acquire a message
*
* @param message The message to be acquired
diff --git a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
index a7a06a357a..826545a23d 100644
--- a/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
@@ -33,6 +33,7 @@ import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -320,6 +321,7 @@ public class TopicSessionTest extends QpidBrokerTestCase
final Connection con1 = getConnection();
final Session session1 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Topic topic1 = session1.createTopic(topicName);
+ final AMQQueue internalNameOnBroker = new AMQQueue("amq.topic", "clientid" + ":" + clientId);
// Setup subscriber with selector
final TopicSubscriber subscriberWithSelector = session1.createDurableSubscriber(topic1, clientId, "Selector = 'select'", false);
@@ -339,13 +341,9 @@ public class TopicSessionTest extends QpidBrokerTestCase
session1.close();
- // Now recreate the session and subscriber (same clientid) but without selector and check that the message still
- // is not received. This defect meant that such a message would be received.
+ // Now verify queue depth on broker.
final Session session2 = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic topic2 = session2.createTopic(topicName);
-
- final TopicSubscriber sameSubscriberWithoutSelector = session2.createDurableSubscriber(topic2, clientId, null, false);
- final Message message2 = sameSubscriberWithoutSelector.receive(1000);
- assertNull("still should not have received message", message2);
+ final long depth = ((AMQSession) session2).getQueueDepth(internalNameOnBroker);
+ assertEquals("Expected queue depth of zero", 0, depth);
}
}