summaryrefslogtreecommitdiff
path: root/java/systests
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-04-23 15:54:15 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-04-23 15:54:15 +0000
commite70ee3e8bff300dff233829741d2ddd2cb89b117 (patch)
treeafda2db38f34b367a94d328527edd608f4755cba /java/systests
parent8de38d68d7efe3b286da38c996cd8eabbe67ddc8 (diff)
downloadqpid-python-e70ee3e8bff300dff233829741d2ddd2cb89b117.tar.gz
QPID-436 - topic exchange doesn't obey the mandatory flag
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2@531513 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/systests')
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java140
1 files changed, 137 insertions, 3 deletions
diff --git a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
index 87491ed3d3..ca352b2fd7 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
@@ -22,6 +22,8 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
private static final Logger _logger = Logger.getLogger(ReturnUnroutableMandatoryMessageTest.class);
private final List<Message> _bouncedMessageList = Collections.synchronizedList(new ArrayList<Message>());
+ private static final String VIRTUALHOST = "test";
+ private static final String BROKER = "vm://:1";
static
{
@@ -53,10 +55,10 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
*
* @throws Exception
*/
- public void testReturnUnroutableMandatoryMessage() throws Exception
+ public void testReturnUnroutableMandatoryMessage_HEADERS() throws Exception
{
_bouncedMessageList.clear();
- Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test");
+ Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -70,7 +72,7 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
//((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
// This is the default now
- Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");
+ Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
con2.setExceptionListener(this);
AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -127,6 +129,138 @@ public class ReturnUnroutableMandatoryMessageTest extends TestCase implements Ex
}
+ public void testReturnUnroutableMandatoryMessage_QUEUE() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
+
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ AMQQueue valid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE");
+ AMQQueue invalid_queue = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_CLASS, "testReturnUnroutableMandatoryMessage_QUEUE_INVALID");
+ MessageConsumer consumer = consumerSession.createConsumer(valid_queue);
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
+
+ Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_queue, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_queue);
+
+ // First test - should be routed
+ _logger.info("Sending non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch (InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+
+
+ con.close();
+ con2.close();
+ }
+
+
+ public void testReturnUnroutableMandatoryMessage_TOPIC() throws Exception
+ {
+ _bouncedMessageList.clear();
+ Connection con = new AMQConnection(BROKER, "guest", "guest", "consumer1", VIRTUALHOST);
+
+
+ AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ AMQTopic valid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC");
+ AMQTopic invalid_topic = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_CLASS, "test.Return.Unroutable.Mandatory.Message.TOPIC.invalid");
+ MessageConsumer consumer = consumerSession.createConsumer(valid_topic);
+
+ //force synch to ensure the consumer has resulted in a bound queue
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
+
+ Connection con2 = new AMQConnection(BROKER, "guest", "guest", "producer1", VIRTUALHOST);
+
+ con2.setExceptionListener(this);
+ AMQSession producerSession = (AMQSession) con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Need to start the "producer" connection in order to receive bounced messages
+ _logger.info("Starting producer connection");
+ con2.start();
+
+
+ MessageProducer nonMandatoryProducer = producerSession.createProducer(valid_topic, false, false);
+ MessageProducer mandatoryProducer = producerSession.createProducer(invalid_topic);
+
+ // First test - should be routed
+ _logger.info("Sending non-mandatory message");
+ TextMessage msg1 = producerSession.createTextMessage("msg1");
+ nonMandatoryProducer.send(msg1);
+
+ // Second test - should be bounced
+ _logger.info("Sending non-routable mandatory message");
+ TextMessage msg2 = producerSession.createTextMessage("msg2");
+ mandatoryProducer.send(msg2);
+
+
+ _logger.info("Starting consumer connection");
+ con.start();
+ TextMessage tm = (TextMessage) consumer.receive(1000L);
+
+ assertTrue("No message routed to receiver", tm != null);
+ assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg1".equals(tm.getText()));
+
+ try
+ {
+ Thread.sleep(1000L);
+ }
+ catch (InterruptedException e)
+ {
+ ;
+ }
+
+ assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1);
+ Message m = _bouncedMessageList.get(0);
+ assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2"));
+
+
+ con.close();
+ con2.close();
+ }
+
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class);