From 0639f931d3a7cf0aa3c0834ed3bfe670dc9b582d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 26 Nov 2012 10:48:38 +0000 Subject: QPID-2796 : Addressed review comments git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1413549 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/transport/Connection.java | 2 +- .../transport/network/io/IdleTimeoutTicker.java | 8 ++-- .../qpid/transport/network/io/IoReceiver.java | 6 ++- .../java/org/apache/qpid/client/HeartbeatTest.java | 47 +++++++++++++++++++++- java/test-profiles/Java010Excludes | 3 ++ 5 files changed, 58 insertions(+), 8 deletions(-) (limited to 'java') diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java index 3bff9aa346..5ae2f1ceb2 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java +++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java @@ -229,7 +229,7 @@ public class Connection extends ConnectionInvoker addConnectionListener((ConnectionListener)secureReceiver); } - NetworkConnection network = transport.connect(settings, secureReceiver, null); + NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity()); setRemoteAddress(network.getRemoteAddress()); setLocalAddress(network.getLocalAddress()); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java index b8a8d42c7c..54a2a360bb 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java @@ -43,14 +43,14 @@ class IdleTimeoutTicker implements Ticker long nextTime = -1; final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); - if(maxReadIdle != 0) + if(maxReadIdle > 0) { nextTime = _transport.getLastReadTime() + maxReadIdle; } long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); - if(maxWriteIdle != 0) + if(maxWriteIdle > 0) { long writeTime = _transport.getLastWriteTime() + maxWriteIdle; if(nextTime == -1l || writeTime < nextTime) @@ -66,13 +66,13 @@ class IdleTimeoutTicker implements Ticker { // writer Idle long maxWriteIdle = 1000l * _connection.getMaxWriteIdle(); - if(maxWriteIdle != 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) + if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime) { _transport.writerIdle(); } // reader Idle final long maxReadIdle = 1000l * _connection.getMaxReadIdle(); - if(maxReadIdle != 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) + if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime) { _transport.readerIdle(); diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java index 11f28a2aee..06a43e21c6 100644 --- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java +++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java @@ -170,7 +170,11 @@ final class IoReceiver implements Runnable, Closeable if(_ticker != null) { - final int tick = _ticker.getTimeToNextTick(currentTime); + int tick = _ticker.getTimeToNextTick(currentTime); + if(tick <= 0) + { + tick = _ticker.tick(currentTime); + } try { if(!socket.isClosed()) diff --git a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java index bb36c7dca4..0e01bda8d0 100644 --- a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java +++ b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java @@ -18,13 +18,17 @@ */ package org.apache.qpid.client; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; import org.apache.qpid.test.utils.QpidBrokerTestCase; public class HeartbeatTest extends QpidBrokerTestCase { public void testHeartbeats() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay","1"); + setTestSystemProperty("amqj.heartbeat.delay", "1"); AMQConnection conn = (AMQConnection) getConnection(); TestListener listener = new TestListener(); conn.setHeartbeatListener(listener); @@ -40,7 +44,7 @@ public class HeartbeatTest extends QpidBrokerTestCase public void testNoHeartbeats() throws Exception { - setTestSystemProperty("amqj.heartbeat.delay","0"); + setTestSystemProperty("amqj.heartbeat.delay", "0"); AMQConnection conn = (AMQConnection) getConnection(); TestListener listener = new TestListener(); conn.setHeartbeatListener(listener); @@ -54,6 +58,45 @@ public class HeartbeatTest extends QpidBrokerTestCase conn.close(); } + public void testReadOnlyConnectionHeartbeats() throws Exception + { + setTestSystemProperty("amqj.heartbeat.delay","1"); + AMQConnection receiveConn = (AMQConnection) getConnection(); + AMQConnection sendConn = (AMQConnection) getConnection(); + Destination destination = getTestQueue(); + TestListener receiveListener = new TestListener(); + TestListener sendListener = new TestListener(); + + + Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + MessageConsumer consumer = receiveSession.createConsumer(destination); + MessageProducer producer = senderSession.createProducer(destination); + + receiveConn.setHeartbeatListener(receiveListener); + sendConn.setHeartbeatListener(sendListener); + receiveConn.start(); + + for(int i = 0; i < 5; i++) + { + producer.send(senderSession.createTextMessage("Msg " + i)); + Thread.sleep(500); + assertNotNull("Expected to received message", consumer.receive(500)); + } + + + + assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2); + assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent); + + assertTrue("Too few heartbeats received at the sender "+sendListener._heartbeatsReceived+" (expected at least 2)", sendListener._heartbeatsReceived>=2); + assertEquals("Unexpected received at the receiver: ",0,receiveListener._heartbeatsReceived); + + receiveConn.close(); + sendConn.close(); + } + private class TestListener implements HeartbeatListener { int _heartbeatsReceived; diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes index ca2383a8f3..3f12076dbe 100755 --- a/java/test-profiles/Java010Excludes +++ b/java/test-profiles/Java010Excludes @@ -64,3 +64,6 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro // QPID-3604: Immediate Prefetch no longer supported by 0-10 org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener + +// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based +org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats -- cgit v1.2.1