summaryrefslogtreecommitdiff
path: root/qpid/java/systests
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2011-02-04 08:14:00 +0000
committerRobert Gemmell <robbie@apache.org>2011-02-04 08:14:00 +0000
commitab57937c2f608245bae1671e52620cd4daf96414 (patch)
tree146f5ab60b795926ae53f7c6128e2c4b67a925ab /qpid/java/systests
parentc7d2d3fb92b0e8fe3000adfe5a021bb69ccdc34a (diff)
downloadqpid-python-ab57937c2f608245bae1671e52620cd4daf96414.tar.gz
QPID-1670: Implement an UncaughtExceptionHandler to log exceptions causing the permature termination of Qpid client threads.
Applied patch from Keith Wall <keith.wall@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1067108 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests')
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java106
1 files changed, 90 insertions, 16 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
index 11d9ce2bbc..e4d1c72208 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerTest.java
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.LogMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -46,14 +50,15 @@ import java.util.concurrent.TimeUnit;
* the message listener later the _synchronousQueue is just poll()'ed and the first message delivered the remaining
* messages will be left on the queue and lost, subsequent messages on the session will arrive first.
*/
-public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener
+public class MessageListenerTest extends QpidBrokerTestCase implements MessageListener, ExceptionListener
{
private static final Logger _logger = LoggerFactory.getLogger(MessageListenerTest.class);
Context _context;
private static final int MSG_COUNT = 5;
- private int receivedCount = 0;
+ private int _receivedCount = 0;
+ private int _errorCount = 0;
private MessageConsumer _consumer;
private Connection _clientConnection;
private CountDownLatch _awaitMessages = new CountDownLatch(MSG_COUNT);
@@ -94,11 +99,14 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
protected void tearDown() throws Exception
{
- _clientConnection.close();
+ if (_clientConnection != null)
+ {
+ _clientConnection.close();
+ }
super.tearDown();
}
- public void testSynchronousRecieve() throws Exception
+ public void testSynchronousReceive() throws Exception
{
for (int msg = 0; msg < MSG_COUNT; msg++)
{
@@ -106,15 +114,15 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
}
}
- public void testSynchronousRecieveNoWait() throws Exception
+ public void testSynchronousReceiveNoWait() throws Exception
{
for (int msg = 0; msg < MSG_COUNT; msg++)
{
- assertTrue(_consumer.receiveNoWait() != null);
+ assertTrue("Failed to receive message " + msg, _consumer.receiveNoWait() != null);
}
}
- public void testAsynchronousRecieve() throws Exception
+ public void testAsynchronousReceive() throws Exception
{
_consumer.setMessageListener(this);
@@ -128,18 +136,17 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
{
// do nothing
}
- // Should have recieved all async messages
- assertEquals(MSG_COUNT, receivedCount);
+ // Should have received all async messages
+ assertEquals(MSG_COUNT, _receivedCount);
}
- public void testRecieveThenUseMessageListener() throws Exception
+ public void testReceiveThenUseMessageListener() throws Exception
{
-
_logger.error("Test disabled as initial receive is not called first");
// Perform initial receive to start connection
assertTrue(_consumer.receive(2000) != null);
- receivedCount++;
+ _receivedCount++;
// Sleep to ensure remaining 4 msgs end up on _synchronousQueue
Thread.sleep(1000);
@@ -157,8 +164,8 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
{
// do nothing
}
- // Should have recieved all async messages
- assertEquals(MSG_COUNT, receivedCount);
+ // Should have received all async messages
+ assertEquals(MSG_COUNT, _receivedCount);
_clientConnection.close();
@@ -172,14 +179,81 @@ public class MessageListenerTest extends QpidBrokerTestCase implements MessageLi
assertTrue(cons.receive(2000) == null);
}
+ /**
+ * Tests the case where the message listener throws an java.lang.Error.
+ *
+ */
+ public void testMessageListenerThrowsError() throws Exception
+ {
+ final String javaLangErrorMessageText = "MessageListener failed with java.lang.Error";
+ _clientConnection.setExceptionListener(this);
+
+ _awaitMessages = new CountDownLatch(1);
+
+ _consumer.setMessageListener(new MessageListener()
+ {
+ public void onMessage(Message message)
+ {
+ try
+ {
+ _logger.debug("onMessage called");
+ _receivedCount++;
+
+
+ throw new Error(javaLangErrorMessageText);
+ }
+ finally
+ {
+ _awaitMessages.countDown();
+ }
+ }
+ });
+
+
+ _logger.info("Waiting 3 seconds for message");
+ _awaitMessages.await(3000, TimeUnit.MILLISECONDS);
+
+ assertEquals("onMessage should have been called", 1, _receivedCount);
+ assertEquals("onException should NOT have been called", 0, _errorCount);
+
+ // Check that Error has been written to the application log.
+
+ LogMonitor _monitor = new LogMonitor(_outputFile);
+ assertTrue("The expected message not written to log file.",
+ _monitor.waitForMessage(javaLangErrorMessageText, LOGMONITOR_TIMEOUT));
+
+ if (_clientConnection != null)
+ {
+ try
+ {
+ _clientConnection.close();
+ }
+ catch (JMSException e)
+ {
+ // Ignore connection close errors for this test.
+ }
+ finally
+ {
+ _clientConnection = null;
+ }
+ }
+ }
+
public void onMessage(Message message)
{
- _logger.info("Received Message(" + receivedCount + "):" + message);
+ _logger.info("Received Message(" + _receivedCount + "):" + message);
- receivedCount++;
+ _receivedCount++;
_awaitMessages.countDown();
}
+ @Override
+ public void onException(JMSException e)
+ {
+ _logger.info("Exception received", e);
+ _errorCount++;
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(MessageListenerTest.class);