summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-11-26 10:48:38 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-11-26 10:48:38 +0000
commit0639f931d3a7cf0aa3c0834ed3bfe670dc9b582d (patch)
tree0d5900125666765eb7cf4a5de108f1ed87096992 /java
parent92626b52ee79478f045dd3308b1c5087095d3d79 (diff)
downloadqpid-python-0639f931d3a7cf0aa3c0834ed3bfe670dc9b582d.tar.gz
QPID-2796 : Addressed review comments
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1413549 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/Connection.java2
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java8
-rw-r--r--java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java6
-rw-r--r--java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java47
-rwxr-xr-xjava/test-profiles/Java010Excludes3
5 files changed, 58 insertions, 8 deletions
diff --git a/java/common/src/main/java/org/apache/qpid/transport/Connection.java b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
index 3bff9aa346..5ae2f1ceb2 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/Connection.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/Connection.java
@@ -229,7 +229,7 @@ public class Connection extends ConnectionInvoker
addConnectionListener((ConnectionListener)secureReceiver);
}
- NetworkConnection network = transport.connect(settings, secureReceiver, null);
+ NetworkConnection network = transport.connect(settings, secureReceiver, new ConnectionActivity());
setRemoteAddress(network.getRemoteAddress());
setLocalAddress(network.getLocalAddress());
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
index b8a8d42c7c..54a2a360bb 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
@@ -43,14 +43,14 @@ class IdleTimeoutTicker implements Ticker
long nextTime = -1;
final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
- if(maxReadIdle != 0)
+ if(maxReadIdle > 0)
{
nextTime = _transport.getLastReadTime() + maxReadIdle;
}
long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
- if(maxWriteIdle != 0)
+ if(maxWriteIdle > 0)
{
long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
if(nextTime == -1l || writeTime < nextTime)
@@ -66,13 +66,13 @@ class IdleTimeoutTicker implements Ticker
{
// writer Idle
long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
- if(maxWriteIdle != 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime)
+ if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <= currentTime)
{
_transport.writerIdle();
}
// reader Idle
final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
- if(maxReadIdle != 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime)
+ if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <= currentTime)
{
_transport.readerIdle();
diff --git a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
index 11f28a2aee..06a43e21c6 100644
--- a/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
+++ b/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
@@ -170,7 +170,11 @@ final class IoReceiver implements Runnable, Closeable
if(_ticker != null)
{
- final int tick = _ticker.getTimeToNextTick(currentTime);
+ int tick = _ticker.getTimeToNextTick(currentTime);
+ if(tick <= 0)
+ {
+ tick = _ticker.tick(currentTime);
+ }
try
{
if(!socket.isClosed())
diff --git a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
index bb36c7dca4..0e01bda8d0 100644
--- a/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/client/HeartbeatTest.java
@@ -18,13 +18,17 @@
*/
package org.apache.qpid.client;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
public class HeartbeatTest extends QpidBrokerTestCase
{
public void testHeartbeats() throws Exception
{
- setTestSystemProperty("amqj.heartbeat.delay","1");
+ setTestSystemProperty("amqj.heartbeat.delay", "1");
AMQConnection conn = (AMQConnection) getConnection();
TestListener listener = new TestListener();
conn.setHeartbeatListener(listener);
@@ -40,7 +44,7 @@ public class HeartbeatTest extends QpidBrokerTestCase
public void testNoHeartbeats() throws Exception
{
- setTestSystemProperty("amqj.heartbeat.delay","0");
+ setTestSystemProperty("amqj.heartbeat.delay", "0");
AMQConnection conn = (AMQConnection) getConnection();
TestListener listener = new TestListener();
conn.setHeartbeatListener(listener);
@@ -54,6 +58,45 @@ public class HeartbeatTest extends QpidBrokerTestCase
conn.close();
}
+ public void testReadOnlyConnectionHeartbeats() throws Exception
+ {
+ setTestSystemProperty("amqj.heartbeat.delay","1");
+ AMQConnection receiveConn = (AMQConnection) getConnection();
+ AMQConnection sendConn = (AMQConnection) getConnection();
+ Destination destination = getTestQueue();
+ TestListener receiveListener = new TestListener();
+ TestListener sendListener = new TestListener();
+
+
+ Session receiveSession = receiveConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Session senderSession = sendConn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer consumer = receiveSession.createConsumer(destination);
+ MessageProducer producer = senderSession.createProducer(destination);
+
+ receiveConn.setHeartbeatListener(receiveListener);
+ sendConn.setHeartbeatListener(sendListener);
+ receiveConn.start();
+
+ for(int i = 0; i < 5; i++)
+ {
+ producer.send(senderSession.createTextMessage("Msg " + i));
+ Thread.sleep(500);
+ assertNotNull("Expected to received message", consumer.receive(500));
+ }
+
+
+
+ assertTrue("Too few heartbeats sent "+receiveListener._heartbeatsSent+" (expected at least 2)", receiveListener._heartbeatsSent>=2);
+ assertEquals("Unexpected sent at the sender: ",0,sendListener._heartbeatsSent);
+
+ assertTrue("Too few heartbeats received at the sender "+sendListener._heartbeatsReceived+" (expected at least 2)", sendListener._heartbeatsReceived>=2);
+ assertEquals("Unexpected received at the receiver: ",0,receiveListener._heartbeatsReceived);
+
+ receiveConn.close();
+ sendConn.close();
+ }
+
private class TestListener implements HeartbeatListener
{
int _heartbeatsReceived;
diff --git a/java/test-profiles/Java010Excludes b/java/test-profiles/Java010Excludes
index ca2383a8f3..3f12076dbe 100755
--- a/java/test-profiles/Java010Excludes
+++ b/java/test-profiles/Java010Excludes
@@ -64,3 +64,6 @@ org.apache.qpid.client.failover.AddressBasedFailoverBehaviourTest#testFlowContro
// QPID-3604: Immediate Prefetch no longer supported by 0-10
org.apache.qpid.client.AsynchMessageListenerTest#testImmediatePrefetchWithMessageListener
+
+// QPID-2796 : Java 0-10 client only sends heartbeats in response to heartbeats from the server, not timeout based
+org.apache.qpid.client.HeartbeatTest#testReadOnlyConnectionHeartbeats