summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2012-11-25 17:15:43 +0000
committerRobert Godfrey <rgodfrey@apache.org>2012-11-25 17:15:43 +0000
commita80c4fea8b8c0f2113a917904611e39f6b93100b (patch)
tree316e95ecdebe29295f7591c1fb891ac75449b753 /qpid/java/broker
parentd429e749a971b2157b56e07835ac29c885683342 (diff)
downloadqpid-python-a80c4fea8b8c0f2113a917904611e39f6b93100b.tar.gz
QPID-2796 : Implement hearbeating in Java Broker (all protocol versions) and Java Client (0-8/9/9-1 path)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1413376 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java24
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java36
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java55
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java18
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java20
7 files changed, 168 insertions, 13 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index 976d7fd28a..72c21d357e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -150,6 +151,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private boolean _blocking;
private final Lock _receivedLock;
+ private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
+
public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
@@ -541,7 +544,10 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
_sender.send(buf);
- _lastIoTime = System.currentTimeMillis();
+ final long time = System.currentTimeMillis();
+ _lastIoTime = time;
+ _lastWriteTime.set(time);
+
if(!_deferFlush)
{
_sender.flush();
@@ -1058,12 +1064,12 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
public void readerIdle()
{
- // Nothing
+ // TODO - enforce disconnect on lack of inbound data
}
public synchronized void writerIdle()
{
- _sender.send(asByteBuffer(HeartbeatBody.FRAME));
+ writeFrame(HeartbeatBody.FRAME);
}
public void exception(Throwable throwable)
@@ -1461,4 +1467,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
return _receivedLock;
}
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReceivedTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime.get();
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index cb7680e9b6..707f02d4a5 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -217,6 +217,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_sender = sender;
}
+ @Override
+ public long getLastReadTime()
+ {
+ return _delegate.getLastReadTime();
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _delegate.getLastWriteTime();
+ }
+
private static interface DelegateCreator
{
@@ -409,6 +421,18 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
+ @Override
+ public long getLastReadTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return 0;
+ }
+
public long getConnectionId()
{
return _id;
@@ -566,5 +590,17 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
}
+
+ @Override
+ public long getLastReadTime()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return 0;
+ }
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index d7d26cc772..7d263b517c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -44,6 +44,8 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private ServerConnection _connection;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
+ private long _lastReadTime;
+ private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
NetworkConnection network,
@@ -71,13 +73,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_network = network;
- _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
_connection.setPeerPrincipal(_network.getPeerPrincipal());
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
}
+ private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ {
+ return new Sender<ByteBuffer>()
+ {
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ sender.setIdleTimeout(i);
+
+ }
+
+ @Override
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ sender.send(msg);
+
+ }
+
+ @Override
+ public void flush()
+ {
+ sender.flush();
+
+ }
+
+ @Override
+ public void close()
+ {
+ sender.close();
+
+ }
+ };
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -90,6 +140,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void received(final ByteBuffer buf)
{
+ _lastReadTime = System.currentTimeMillis();
super.received(buf);
_connection.receivedComplete();
}
@@ -106,7 +157,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void writerIdle()
{
- //Todo
+ _connection.doHeartbeat();
}
public void readerIdle()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index da70e497d6..274f206c85 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -53,6 +53,8 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
//private NetworkConnection _networkDriver;
private long _readBytes;
private long _writtenBytes;
+ private long _lastReadTime;
+ private long _lastWriteTime;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
@@ -178,6 +180,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -320,6 +323,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
synchronized(_sendLock)
{
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -374,4 +378,13 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index 71d6df27e0..634c5e6255 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -51,6 +51,9 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
{
private long _readBytes;
private long _writtenBytes;
+
+ private long _lastReadTime;
+ private long _lastWriteTime;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
@@ -222,6 +225,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -364,7 +368,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
synchronized(_sendLock)
{
-
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
@@ -425,4 +429,13 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index ce3ede2dba..58de6a0cdf 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -48,6 +48,7 @@ import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.network.NetworkConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -68,6 +69,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Principal _peerPrincipal;
+ private NetworkConnection _networkConnection;
public ServerConnection(final long connectionId)
{
@@ -490,4 +492,20 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
{
super.setLocalAddress(localAddress);
}
+
+ public void setNetworkConnection(NetworkConnection network)
+ {
+ _networkConnection = network;
+ }
+
+ public NetworkConnection getNetworkConnection()
+ {
+ return _networkConnection;
+ }
+
+ public void doHeartbeat()
+ {
+ super.doHeartBeat();
+
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
index 70f5afe5ac..f48121f9f0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
@@ -43,6 +43,8 @@ import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
+import org.apache.qpid.transport.network.NetworkConnection;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,14 +228,18 @@ public class ServerConnectionDelegate extends ServerDelegate
return;
}
- setConnectionTuneOkChannelMax(sconn, okChannelMax);
- }
+ if(ok.hasHeartbeat())
+ {
+ final int heartbeat = ok.getHeartbeat();
+ if(heartbeat > 0)
+ {
+ final NetworkConnection networkConnection = sconn.getNetworkConnection();
+ networkConnection.setMaxReadIdle(2 * heartbeat);
+ networkConnection.setMaxWriteIdle(heartbeat);
+ }
+ }
- @Override
- protected int getHeartbeatMax()
- {
- //TODO: implement broker support for actually sending heartbeats
- return 0;
+ setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
@Override