summaryrefslogtreecommitdiff
path: root/java/systests/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-01-11 13:10:36 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-01-11 13:10:36 +0000
commit2c5e10a0f46335acf8b09cebd889d5920cda8d48 (patch)
treeac1141f56aeddf2fc6e406d24e3355484660f19f /java/systests/src
parentc9e5b84950a75530b82ac72acf71ee3141d8bef1 (diff)
downloadqpid-python-2c5e10a0f46335acf8b09cebd889d5920cda8d48.tar.gz
QPID-3720 : Add alternative (C++ style) grouping and apply comments from Robbie Gemmel
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229996 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java263
1 files changed, 198 insertions, 65 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
index dc29ef378e..08a932eba1 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
@@ -34,18 +34,13 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.naming.NamingException;
import java.util.HashMap;
import java.util.Map;
public class MessageGroupQueueTest extends QpidBrokerTestCase
{
- private static final int TIMEOUT = 1500;
-
protected final String QUEUE = "MessageGroupQueue";
- private static final int MSG_COUNT = 50;
-
private Connection producerConnection;
private MessageProducer producer;
private Session producerSession;
@@ -73,38 +68,53 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
super.tearDown();
}
+
+ public void testSimpleGroupAssignment() throws Exception
+ {
+ simpleGroupAssignment(false);
+ }
+
+ public void testSharedGroupSimpleGroupAssignment() throws Exception
+ {
+ simpleGroupAssignment(true);
+ }
+
+
/**
* Pre populate the queue with messages with groups as follows
- *
+ *
* ONE
* TWO
* ONE
* TWO
- *
+ *
* Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
* consumer assigned group TWO if they are started in sequence.
- *
+ *
* Thus doing
- *
+ *
* c1 <--- (ONE)
* c2 <--- (TWO)
* c2 ack --->
- *
+ *
* c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
- *
+ *
* i.e.
- *
+ *
* c2 <--- (TWO)
* c2 ack --->
* c1 <--- (ONE)
* c1 ack --->
- *
+ *
*/
- public void testSimpleGroupAssignment() throws Exception
+ private void simpleGroupAssignment(boolean sharedGroups) throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.group_header_key","group");
- arguments.put("qpid.shared_msg_group","1");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -112,7 +122,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producer = producerSession.createProducer(queue);
String[] groups = { "ONE", "TWO"};
-
+
for (int msg = 0; msg < 4; msg++)
{
producer.send(createMessage(msg, groups[msg % groups.length]));
@@ -125,7 +135,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
-
+
MessageConsumer consumer1 = cs1.createConsumer(queue);
MessageConsumer consumer2 = cs2.createConsumer(queue);
@@ -154,33 +164,47 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
cs1Received2.acknowledge();
cs2Received2.acknowledge();
-
+
assertNull(consumer1.receive(1000));
assertNull(consumer2.receive(1000));
}
+
+ public void testConsumerCloseGroupAssignment() throws Exception
+ {
+ consumerCloseGroupAssignment(false);
+ }
+
+ public void testSharedGroupConsumerCloseGroupAssignment() throws Exception
+ {
+ consumerCloseGroupAssignment(true);
+ }
+
/**
- *
+ *
* Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
* consumer.
- *
+ *
* Pre-populate the queue as ONE, ONE, TWO, ONE
- *
+ *
* create in sequence two consumers
- *
+ *
* receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
- *
+ *
* Then close c1 before acking.
- *
+ *
* If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
* requires c2 to go "backwards" in the queue).
- *
- * */
- public void testConsumerCloseGroupAssignment() throws Exception
+ *
+ **/
+ private void consumerCloseGroupAssignment(boolean sharedGroups) throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.group_header_key","group");
- arguments.put("qpid.shared_msg_group","1");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -197,9 +221,8 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
-
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
MessageConsumer consumer1 = cs1.createConsumer(queue);
@@ -208,40 +231,54 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
Message cs1Received = consumer1.receive(1000);
assertNotNull("Consumer 1 should have received first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message cs2Received = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received first message", cs2Received);
- cs2Received.acknowledge();
+ assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg"));
+ cs2.commit();
Message cs2Received2 = consumer2.receive(1000);
- assertNull("Consumer 2 should not have received second message", cs2Received2);
+ assertNull("Consumer 2 should not yet have received a second message", cs2Received2);
consumer1.close();
- cs1Received.acknowledge();
+ cs1.commit();
Message cs2Received3 = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received second message", cs2Received3);
- assertEquals("Unexpected group", cs2Received3.getStringProperty("group"),
- "ONE");
+ assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group"));
+ assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
- cs2Received3.acknowledge();
+ cs2.commit();
Message cs2Received4 = consumer2.receive(1000);
assertNotNull("Consumer 2 should have received third message", cs2Received4);
- assertEquals("Unexpected group", cs2Received4.getStringProperty("group"),
- "ONE");
-
- cs2Received4.acknowledge();
+ assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group"));
+ assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
+ cs2.commit();
assertNull(consumer2.receive(1000));
}
+
+
+ public void testConsumerCloseWithRelease() throws Exception
+ {
+ consumerCloseWithRelease(false);
+ }
+
+ public void testSharedGroupConsumerCloseWithRelease() throws Exception
+ {
+ consumerCloseWithRelease(true);
+ }
+
+
/**
*
* Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
@@ -259,12 +296,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
* requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
*
*/
-
- public void testConsumerCloseWithRelease() throws Exception
+ private void consumerCloseWithRelease(boolean sharedGroups) throws AMQException, JMSException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
arguments.put("qpid.group_header_key","group");
- arguments.put("qpid.shared_msg_group","1");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
@@ -282,61 +321,155 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase
producerSession.close();
producerConnection.close();
- Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
- Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1);
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
MessageConsumer consumer1 = cs1.createConsumer(queue);
- MessageConsumer consumer2 = cs2.createConsumer(queue);
consumerConnection.start();
+
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
Message cs1Received = consumer1.receive(1000);
- assertNotNull("Consumer 1 should have received first message", cs1Received);
+ assertNotNull("Consumer 1 should have received its first message", cs1Received);
+ assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received first message", received);
- Message first = received;
+ assertNotNull("Consumer 2 should have received its first message", received);
+ assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
received = consumer2.receive(1000);
- assertNull("Consumer 2 should not have received second message", received);
+ assertNull("Consumer 2 should not yet have received second message", received);
consumer1.close();
cs1.close();
- first.acknowledge();
+ cs2.commit();
received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received second message", received);
- assertEquals("Unexpected group", received.getStringProperty("group"),
- "ONE");
+ assertNotNull("Consumer 2 should now have received second message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
received.getJMSRedelivered());
- received.acknowledge();
+ cs2.commit();
received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received third message", received);
- assertEquals("Unexpected group", received.getStringProperty("group"),
- "ONE");
+ assertNotNull("Consumer 2 should have received a third message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
- received.acknowledge();
+ cs2.commit();
received = consumer2.receive(1000);
- assertNotNull("Consumer 2 should have received fourth message", received);
- assertEquals("Unexpected group", received.getStringProperty("group"),
- "ONE");
+ assertNotNull("Consumer 2 should have received a fourth message", received);
+ assertEquals("Unexpected group", "ONE", received.getStringProperty("group"));
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
- received.acknowledge();
+ cs2.commit();
assertNull(consumer2.receive(1000));
}
-
+ public void testGroupAssignmentSurvivesEmpty() throws JMSException, AMQException
+ {
+ groupAssignmentOnEmpty(false);
+ }
+
+ public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, AMQException
+ {
+ groupAssignmentOnEmpty(true);
+ }
+
+ private void groupAssignmentOnEmpty(boolean sharedGroups) throws AMQException, JMSException
+ {
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("qpid.group_header_key","group");
+ if(sharedGroups)
+ {
+ arguments.put("qpid.shared_msg_group","1");
+ }
+
+ ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'");
+
+ ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
+ producer = producerSession.createProducer(queue);
+
+ producer.send(createMessage(1, "ONE"));
+ producer.send(createMessage(2, "TWO"));
+ producer.send(createMessage(3, "THREE"));
+ producer.send(createMessage(4, "ONE"));
+
+ producerSession.commit();
+ producer.close();
+ producerSession.close();
+ producerConnection.close();
+
+ Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+ Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1);
+
+
+ MessageConsumer consumer1 = cs1.createConsumer(queue);
+
+ consumerConnection.start();
+
+ MessageConsumer consumer2 = cs2.createConsumer(queue);
+
+ Message received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its first message", received);
+ assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
+
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its first message", received);
+ assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
+
+ cs1.commit();
+
+ received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its second message", received);
+ assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
+
+ // We expect different behaviours from "shared groups": here the assignment of a subscription to a group
+ // is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a
+ // standard message grouping queue the assignment will be retained until the subscription is no longer
+ // registered
+ if(sharedGroups)
+ {
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNotNull("Consumer 2 should have received its second message", received);
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ cs2.commit();
+ }
+ else
+ {
+ cs2.commit();
+ received = consumer2.receive(1000);
+
+ assertNull("Consumer 2 should not have received a second message", received);
+
+ cs1.commit();
+
+ received = consumer1.receive(1000);
+ assertNotNull("Consumer 1 should have received its third message", received);
+ assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
+
+ }
+
+ }
+
+
private Message createMessage(int msg, String group) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);