summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-10-02 10:37:52 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-10-02 10:37:52 +0000
commit6ba4bf02207048c78eca475cc752df979b274cbc (patch)
tree893e589947110e48145765306898c54e0d2690ae /java
parent3fa4aa627242cfae62752ba4bf6cc0653e59ee22 (diff)
downloadqpid-python-6ba4bf02207048c78eca475cc752df979b274cbc.tar.gz
Merged revisions 580992-580993 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1 ........ r580992 | ritchiem | 2007-10-01 16:27:54 +0100 (Mon, 01 Oct 2007) | 1 line QPID-595 CommitRollbackTest Patch provided by Aidan Skinner to address intermittent test failure. ........ r580993 | ritchiem | 2007-10-01 16:31:00 +0100 (Mon, 01 Oct 2007) | 1 line QPID-611 QPID-620. DurableSubscriptionTest was failing due to a race condition when using NO_ACK. This is due to the Queue Total Size being updated after the send, but after the send and NO_ACK the msg data is purged and so there is no size to retrieve. Changed all references to msg.dequeue to queue.dequeue where appropriate so we can use that single point in the future for updating the Queue Total Size. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@581186 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java31
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java43
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java7
6 files changed, 66 insertions, 33 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index b8c5e821f7..7088c704ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -60,7 +60,7 @@ public class UnacknowledgedMessage
{
if (queue != null)
{
- message.dequeue(storeContext, queue);
+ queue.dequeue(storeContext, message);
}
//if the queue is null then the message is waiting to be acked, but has been removed.
message.decrementReference(storeContext);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afa581f0c5..cd8c0198f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -592,7 +592,19 @@ public class AMQMessage
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
+ /**
+ * NOTE: Think about why you are using this method. Normal usages would want to do
+ * AMQQueue.dequeue(StoreContext, AMQMessage)
+ * This will keep the queue statistics up-to-date.
+ * Currently this method is only called _correctly_ from AMQQueue dequeue.
+ * Ideally we would have a better way for the queue to dequeue the message.
+ * Especially since enqueue isn't the recipriocal of this method.
+ * @deprecated
+ * @param storeContext
+ * @param queue
+ * @throws AMQException
+ */
+ void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_messageHandle.dequeue(storeContext, _messageId, queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4a336ef71c..2ea0b6d3d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -809,7 +809,7 @@ public class AMQQueue implements Managable, Comparable
}
}
- void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+ public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index ad02f477e0..8e72e995d0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -413,14 +413,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = _messages.poll();
- message.dequeue(storeContext, queue);
-
- message.decrementReference(storeContext);
-
if (message != null)
{
+ queue.dequeue(storeContext, message);
+
_totalMessageSize.addAndGet(-message.getSize());
- }
+
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ message.decrementReference(storeContext);
+
+ }
_lock.unlock();
}
@@ -485,7 +487,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_totalMessageSize.addAndGet(-message.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- message.dequeue(_reapingStoreContext, _queue);
+ _queue.dequeue(_reapingStoreContext, message);
message.decrementReference(_reapingStoreContext);
@@ -511,13 +513,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This method will return true if the message is to be purged from the queue.
+ * This method will return true if the message is to be purged from the queue.
*
*
- * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ *
* @param message
* @param sub
+ *
* @return
+ *
* @throws AMQException
*/
private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
@@ -607,6 +612,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
") to :" + System.identityHashCode(sub));
}
+
+ if (messageQueue == _messages)
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
+
sub.send(message, _queue);
//remove sent message from our queue.
@@ -654,10 +665,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- if ((message != null) && (messageQueue == _messages))
- {
- _totalMessageSize.addAndGet(-message.getSize());
- }
}
catch (AMQException e)
{
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c8d43a47a5..a9c93d7227 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -114,18 +114,28 @@ public class DurableSubscriptionTest extends TestCase
con.close();
}
- public void testDurability() throws AMQException, JMSException, URLSyntaxException
+ public void testDurabilityNOACK() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
AMQTopic topic = new AMQTopic(con, "MyTopic");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session sessionProd = con.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
@@ -133,36 +143,41 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
- assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(100);
- assertEquals(null, msg);
+ assertNotNull("Message should be available", msg);
+ assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+
+ msg = consumer1.receive(100);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
msg = consumer2.receive();
- assertEquals("A", ((TextMessage) msg).getText());
+ assertNotNull(msg);
+ assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer2.", msg);
consumer2.close();
- Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
- assertEquals("B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 1 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
- assertEquals("B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
con.close();
}
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
index 6db27d6be0..3d5a1096b4 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
@@ -407,7 +407,6 @@ public class CommitRollbackTest extends TestCase
{
_logger.info("Got 1 redelivered");
assertTrue("Message is not marked as redelivered", result.getJMSRedelivered());
- assertFalse("Already received message one", _gotone);
_gotone = true;
}
@@ -418,15 +417,15 @@ public class CommitRollbackTest extends TestCase
if (result.getJMSRedelivered())
{
_logger.info("Got 2 redelivered, message was prefetched");
- assertFalse("Already received message redelivered two", _gottwoRedelivered);
-
_gottwoRedelivered = true;
+
}
else
{
_logger.warn("Got 2, message prefetched wasn't cleared or messages was in transit when rollback occured");
assertFalse("Already received message two", _gottwo);
-
+ assertFalse("Already received message redelivered two", _gottwoRedelivered);
+
_gottwo = true;
}
}