diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2012-11-25 17:15:43 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2012-11-25 17:15:43 +0000 |
| commit | a80c4fea8b8c0f2113a917904611e39f6b93100b (patch) | |
| tree | 316e95ecdebe29295f7591c1fb891ac75449b753 /qpid/java/client/src | |
| parent | d429e749a971b2157b56e07835ac29c885683342 (diff) | |
| download | qpid-python-a80c4fea8b8c0f2113a917904611e39f6b93100b.tar.gz | |
QPID-2796 : Implement hearbeating in Java Broker (all protocol versions) and Java Client (0-8/9/9-1 path)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1413376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/client/src')
3 files changed, 24 insertions, 2 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 5dd6e55e64..a8cf947f6d 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -484,10 +484,14 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec heartbeat = Integer.getInteger(ClientProperties.IDLE_TIMEOUT_PROP_NAME)/1000; _logger.warn("JVM arg -Didle_timeout=<mili_secs> is deprecated, please use -Dqpid.heartbeat=<secs>"); } - else + else if(Integer.getInteger(ClientProperties.HEARTBEAT) != null) { heartbeat = Integer.getInteger(ClientProperties.HEARTBEAT,ClientProperties.HEARTBEAT_DEFAULT); } + else + { + heartbeat = Integer.getInteger("amqj.heartbeat.delay", ClientProperties.HEARTBEAT_DEFAULT); + } return heartbeat; } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java index 740a81b939..25af7003d0 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java @@ -122,7 +122,9 @@ public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion()); - NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler())); + + NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()), + _conn.getProtocolHandler()); _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java index af89000c5c..04d57c9fa2 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java @@ -178,6 +178,8 @@ public class AMQProtocolHandler implements ProtocolEngine private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private long _lastReadTime = System.currentTimeMillis(); + private long _lastWriteTime = System.currentTimeMillis(); /** * Creates a new protocol handler, associated with the specified client connection instance. @@ -442,6 +444,7 @@ public class AMQProtocolHandler implements ProtocolEngine public void received(ByteBuffer msg) { _readBytes += msg.remaining(); + _lastReadTime = System.currentTimeMillis(); try { final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg); @@ -560,6 +563,7 @@ public class AMQProtocolHandler implements ProtocolEngine public synchronized void writeFrame(AMQDataBlock frame, boolean flush) { final ByteBuffer buf = asByteBuffer(frame); + _lastWriteTime = System.currentTimeMillis(); _writtenBytes += buf.remaining(); _sender.send(buf); if(flush) @@ -882,6 +886,18 @@ public class AMQProtocolHandler implements ProtocolEngine _sender = sender; } + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + protected Sender<ByteBuffer> getSender() { return _sender; |
