diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-01-11 13:10:36 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-01-11 13:10:36 +0000 |
| commit | 2c5e10a0f46335acf8b09cebd889d5920cda8d48 (patch) | |
| tree | ac1141f56aeddf2fc6e406d24e3355484660f19f /java/systests/src | |
| parent | c9e5b84950a75530b82ac72acf71ee3141d8bef1 (diff) | |
| download | qpid-python-2c5e10a0f46335acf8b09cebd889d5920cda8d48.tar.gz | |
QPID-3720 : Add alternative (C++ style) grouping and apply comments from Robbie Gemmel
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1229996 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests/src')
| -rw-r--r-- | java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java | 263 |
1 files changed, 198 insertions, 65 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java index dc29ef378e..08a932eba1 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java @@ -34,18 +34,13 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import javax.naming.NamingException; import java.util.HashMap; import java.util.Map; public class MessageGroupQueueTest extends QpidBrokerTestCase { - private static final int TIMEOUT = 1500; - protected final String QUEUE = "MessageGroupQueue"; - private static final int MSG_COUNT = 50; - private Connection producerConnection; private MessageProducer producer; private Session producerSession; @@ -73,38 +68,53 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase super.tearDown(); } + + public void testSimpleGroupAssignment() throws Exception + { + simpleGroupAssignment(false); + } + + public void testSharedGroupSimpleGroupAssignment() throws Exception + { + simpleGroupAssignment(true); + } + + /** * Pre populate the queue with messages with groups as follows - * + * * ONE * TWO * ONE * TWO - * + * * Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second * consumer assigned group TWO if they are started in sequence. - * + * * Thus doing - * + * * c1 <--- (ONE) * c2 <--- (TWO) * c2 ack ---> - * + * * c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE) - * + * * i.e. - * + * * c2 <--- (TWO) * c2 ack ---> * c1 <--- (ONE) * c1 ack ---> - * + * */ - public void testSimpleGroupAssignment() throws Exception + private void simpleGroupAssignment(boolean sharedGroups) throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("qpid.group_header_key","group"); - arguments.put("qpid.shared_msg_group","1"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); @@ -112,7 +122,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase producer = producerSession.createProducer(queue); String[] groups = { "ONE", "TWO"}; - + for (int msg = 0; msg < 4; msg++) { producer.send(createMessage(msg, groups[msg % groups.length])); @@ -125,7 +135,7 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - + MessageConsumer consumer1 = cs1.createConsumer(queue); MessageConsumer consumer2 = cs2.createConsumer(queue); @@ -154,33 +164,47 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase cs1Received2.acknowledge(); cs2Received2.acknowledge(); - + assertNull(consumer1.receive(1000)); assertNull(consumer2.receive(1000)); } + + public void testConsumerCloseGroupAssignment() throws Exception + { + consumerCloseGroupAssignment(false); + } + + public void testSharedGroupConsumerCloseGroupAssignment() throws Exception + { + consumerCloseGroupAssignment(true); + } + /** - * + * * Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different * consumer. - * + * * Pre-populate the queue as ONE, ONE, TWO, ONE - * + * * create in sequence two consumers - * + * * receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2) - * + * * Then close c1 before acking. - * + * * If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which * requires c2 to go "backwards" in the queue). - * - * */ - public void testConsumerCloseGroupAssignment() throws Exception + * + **/ + private void consumerCloseGroupAssignment(boolean sharedGroups) throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("qpid.group_header_key","group"); - arguments.put("qpid.shared_msg_group","1"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); @@ -197,9 +221,8 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase producerSession.close(); producerConnection.close(); - Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); MessageConsumer consumer1 = cs1.createConsumer(queue); @@ -208,40 +231,54 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase Message cs1Received = consumer1.receive(1000); assertNotNull("Consumer 1 should have received first message", cs1Received); + assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg")); Message cs2Received = consumer2.receive(1000); assertNotNull("Consumer 2 should have received first message", cs2Received); - cs2Received.acknowledge(); + assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg")); + cs2.commit(); Message cs2Received2 = consumer2.receive(1000); - assertNull("Consumer 2 should not have received second message", cs2Received2); + assertNull("Consumer 2 should not yet have received a second message", cs2Received2); consumer1.close(); - cs1Received.acknowledge(); + cs1.commit(); Message cs2Received3 = consumer2.receive(1000); assertNotNull("Consumer 2 should have received second message", cs2Received3); - assertEquals("Unexpected group", cs2Received3.getStringProperty("group"), - "ONE"); + assertEquals("Unexpected group", "ONE", cs2Received3.getStringProperty("group")); + assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg")); - cs2Received3.acknowledge(); + cs2.commit(); Message cs2Received4 = consumer2.receive(1000); assertNotNull("Consumer 2 should have received third message", cs2Received4); - assertEquals("Unexpected group", cs2Received4.getStringProperty("group"), - "ONE"); - - cs2Received4.acknowledge(); + assertEquals("Unexpected group", "ONE", cs2Received4.getStringProperty("group")); + assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg")); + cs2.commit(); assertNull(consumer2.receive(1000)); } + + + public void testConsumerCloseWithRelease() throws Exception + { + consumerCloseWithRelease(false); + } + + public void testSharedGroupConsumerCloseWithRelease() throws Exception + { + consumerCloseWithRelease(true); + } + + /** * * Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned @@ -259,12 +296,14 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase * requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered * */ - - public void testConsumerCloseWithRelease() throws Exception + private void consumerCloseWithRelease(boolean sharedGroups) throws AMQException, JMSException { final Map<String,Object> arguments = new HashMap<String, Object>(); arguments.put("qpid.group_header_key","group"); - arguments.put("qpid.shared_msg_group","1"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); @@ -282,61 +321,155 @@ public class MessageGroupQueueTest extends QpidBrokerTestCase producerSession.close(); producerConnection.close(); - Session cs1 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); - Session cs2 = ((AMQConnection)consumerConnection).createSession(false, Session.CLIENT_ACKNOWLEDGE,1); + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); MessageConsumer consumer1 = cs1.createConsumer(queue); - MessageConsumer consumer2 = cs2.createConsumer(queue); consumerConnection.start(); + + MessageConsumer consumer2 = cs2.createConsumer(queue); + Message cs1Received = consumer1.receive(1000); - assertNotNull("Consumer 1 should have received first message", cs1Received); + assertNotNull("Consumer 1 should have received its first message", cs1Received); + assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg")); Message received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received first message", received); - Message first = received; + assertNotNull("Consumer 2 should have received its first message", received); + assertEquals("incorrect message received", 3, received.getIntProperty("msg")); received = consumer2.receive(1000); - assertNull("Consumer 2 should not have received second message", received); + assertNull("Consumer 2 should not yet have received second message", received); consumer1.close(); cs1.close(); - first.acknowledge(); + cs2.commit(); received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received second message", received); - assertEquals("Unexpected group", received.getStringProperty("group"), - "ONE"); + assertNotNull("Consumer 2 should now have received second message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 1, received.getIntProperty("msg")); assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"), received.getJMSRedelivered()); - received.acknowledge(); + cs2.commit(); received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received third message", received); - assertEquals("Unexpected group", received.getStringProperty("group"), - "ONE"); + assertNotNull("Consumer 2 should have received a third message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 2, received.getIntProperty("msg")); - received.acknowledge(); + cs2.commit(); received = consumer2.receive(1000); - assertNotNull("Consumer 2 should have received fourth message", received); - assertEquals("Unexpected group", received.getStringProperty("group"), - "ONE"); + assertNotNull("Consumer 2 should have received a fourth message", received); + assertEquals("Unexpected group", "ONE", received.getStringProperty("group")); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); - received.acknowledge(); + cs2.commit(); assertNull(consumer2.receive(1000)); } - + public void testGroupAssignmentSurvivesEmpty() throws JMSException, AMQException + { + groupAssignmentOnEmpty(false); + } + + public void testSharedGroupAssignmentDoesNotSurviveEmpty() throws JMSException, AMQException + { + groupAssignmentOnEmpty(true); + } + + private void groupAssignmentOnEmpty(boolean sharedGroups) throws AMQException, JMSException + { + final Map<String,Object> arguments = new HashMap<String, Object>(); + arguments.put("qpid.group_header_key","group"); + if(sharedGroups) + { + arguments.put("qpid.shared_msg_group","1"); + } + + ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments); + queue = (Queue) producerSession.createQueue("direct://amq.direct/"+QUEUE+"/"+QUEUE+"?durable='false'&autodelete='true'"); + + ((AMQSession) producerSession).declareAndBind((AMQDestination)queue); + producer = producerSession.createProducer(queue); + + producer.send(createMessage(1, "ONE")); + producer.send(createMessage(2, "TWO")); + producer.send(createMessage(3, "THREE")); + producer.send(createMessage(4, "ONE")); + + producerSession.commit(); + producer.close(); + producerSession.close(); + producerConnection.close(); + + Session cs1 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + Session cs2 = ((AMQConnection)consumerConnection).createSession(true, Session.SESSION_TRANSACTED,1); + + + MessageConsumer consumer1 = cs1.createConsumer(queue); + + consumerConnection.start(); + + MessageConsumer consumer2 = cs2.createConsumer(queue); + + Message received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its first message", received); + assertEquals("incorrect message received", 1, received.getIntProperty("msg")); + + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its first message", received); + assertEquals("incorrect message received", 2, received.getIntProperty("msg")); + + cs1.commit(); + + received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its second message", received); + assertEquals("incorrect message received", 3, received.getIntProperty("msg")); + + // We expect different behaviours from "shared groups": here the assignment of a subscription to a group + // is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a + // standard message grouping queue the assignment will be retained until the subscription is no longer + // registered + if(sharedGroups) + { + cs2.commit(); + received = consumer2.receive(1000); + + assertNotNull("Consumer 2 should have received its second message", received); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + cs2.commit(); + } + else + { + cs2.commit(); + received = consumer2.receive(1000); + + assertNull("Consumer 2 should not have received a second message", received); + + cs1.commit(); + + received = consumer1.receive(1000); + assertNotNull("Consumer 1 should have received its third message", received); + assertEquals("incorrect message received", 4, received.getIntProperty("msg")); + + } + + } + + private Message createMessage(int msg, String group) throws JMSException { Message send = producerSession.createTextMessage("Message: " + msg); |
