diff options
| author | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2013-10-30 21:38:03 +0000 |
| commit | 9b1bf6023a9869af744e2fbc03664d41ced37df6 (patch) | |
| tree | 8999f016b40ce753bb560589326fa84d507759ea /qpid/java/systests/src | |
| parent | f6c119bb54dceaa1451f31c7c0be504a7f6bf3ca (diff) | |
| download | qpid-python-9b1bf6023a9869af744e2fbc03664d41ced37df6.tar.gz | |
QPID-4534: unify client heartbeat system properties/connection url options.
* Connection url 'heartbeat' broker-option (and deprecated 'idle_timeout') now understood for all protocols
* System property 'qpid.heartbeat' (and deprecated 'amqj.heartbeat.delay' and 'idle_timeout') now understood for all protocols
* Enhanced heartbeat system tests
* Docbook updates
Original patch from Keith Wall, plus updates from Robbie Gemmell
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1537313 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/systests/src')
| -rw-r--r-- | qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java | 116 |
1 files changed, 100 insertions, 16 deletions
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java index 0e01bda8d0..143565c648 100644 --- a/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java +++ b/qpid/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java @@ -18,49 +18,86 @@ */ package org.apache.qpid.client; +import static org.apache.qpid.configuration.ClientProperties.AMQJ_HEARTBEAT_DELAY; +import static org.apache.qpid.configuration.ClientProperties.IDLE_TIMEOUT_PROP_NAME; +import static org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL; + import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; + +import org.apache.qpid.server.model.Broker; import org.apache.qpid.test.utils.QpidBrokerTestCase; public class HeartbeatTest extends QpidBrokerTestCase { - public void testHeartbeats() throws Exception + private static final String CONNECTION_URL_WITH_HEARTBEAT = "amqp://guest:guest@clientid/?brokerlist='localhost:%d?heartbeat='%d''"; + private TestListener _listener = new TestListener(); + + @Override + public void setUp() throws Exception + { + if (getName().equals("testHeartbeatsEnabledBrokerSide")) + { + getBrokerConfiguration().setBrokerAttribute(Broker.CONNECTION_HEART_BEAT_DELAY, "1"); + } + super.setUp(); + } + + public void testHeartbeatsEnabledUsingUrl() throws Exception + { + final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT, DEFAULT_PORT, 1); + AMQConnection conn = (AMQConnection) getConnection(new AMQConnectionURL(url)); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + public void testHeartbeatsEnabledUsingSystemProperty() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay", "1"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "1"); AMQConnection conn = (AMQConnection) getConnection(); - TestListener listener = new TestListener(); - conn.setHeartbeatListener(listener); + conn.setHeartbeatListener(_listener); conn.start(); Thread.sleep(2500); - assertTrue("Too few heartbeats received: "+listener._heartbeatsReceived+" (expected at least 2)", listener._heartbeatsReceived>=2); - assertTrue("Too few heartbeats sent "+listener._heartbeatsSent+" (expected at least 2)", listener._heartbeatsSent>=2); + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); conn.close(); } - public void testNoHeartbeats() throws Exception + public void testHeartbeatsDisabledUsingSystemProperty() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay", "0"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL, "0"); AMQConnection conn = (AMQConnection) getConnection(); - TestListener listener = new TestListener(); - conn.setHeartbeatListener(listener); + conn.setHeartbeatListener(_listener); conn.start(); Thread.sleep(2500); - assertEquals("Heartbeats unexpectedly received", 0, listener._heartbeatsReceived); - assertEquals("Heartbeats unexpectedly sent ", 0, listener._heartbeatsSent); + assertEquals("Heartbeats unexpectedly received", 0, _listener._heartbeatsReceived); + assertEquals("Heartbeats unexpectedly sent ", 0, _listener._heartbeatsSent); conn.close(); } - public void testReadOnlyConnectionHeartbeats() throws Exception + /** + * This test carefully arranges message flow so that bytes flow only from producer to broker + * on the producer side and broker to consumer on the consumer side, deliberately leaving the + * reverse path quiet so heartbeats will flow. + */ + public void testUnidirectionalHeartbeating() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay","1"); + setTestSystemProperty(QPID_HEARTBEAT_INTERVAL,"1"); AMQConnection receiveConn = (AMQConnection) getConnection(); AMQConnection sendConn = (AMQConnection) getConnection(); Destination destination = getTestQueue(); @@ -83,10 +120,9 @@ public class HeartbeatTest extends QpidBrokerTestCase producer.send(senderSession.createTextMessage("Msg " + i)); Thread.sleep(500); assertNotNull("Expected to received message", consumer.receive(500)); + // Consumer does not ack the message in order to generate no bytes from consumer back to Broker } - - assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2); assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent); @@ -97,6 +133,54 @@ public class HeartbeatTest extends QpidBrokerTestCase sendConn.close(); } + public void testHeartbeatsEnabledBrokerSide() throws Exception + { + + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + + @SuppressWarnings("deprecation") + public void testHeartbeatsEnabledUsingAmqjLegacySystemProperty() throws Exception + { + setTestSystemProperty(AMQJ_HEARTBEAT_DELAY, "1"); + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + + @SuppressWarnings("deprecation") + public void testHeartbeatsEnabledUsingOlderLegacySystemProperty() throws Exception + { + setTestSystemProperty(IDLE_TIMEOUT_PROP_NAME, "1000"); + AMQConnection conn = (AMQConnection) getConnection(); + conn.setHeartbeatListener(_listener); + conn.start(); + + Thread.sleep(2500); + + assertTrue("Too few heartbeats received: "+_listener._heartbeatsReceived+" (expected at least 2)", _listener._heartbeatsReceived>=2); + assertTrue("Too few heartbeats sent "+_listener._heartbeatsSent+" (expected at least 2)", _listener._heartbeatsSent>=2); + + conn.close(); + } + private class TestListener implements HeartbeatListener { int _heartbeatsReceived; |
