summaryrefslogtreecommitdiff
path: root/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java149
1 files changed, 57 insertions, 92 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index fd6e9300ec..d5f7fe486c 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -21,13 +21,7 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.ConnectionConfigType;
-import org.apache.qpid.server.configuration.VirtualHostConfig;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
@@ -37,9 +31,9 @@ import org.apache.qpid.transport.network.NetworkConnection;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.UUID;
-public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig
+
+public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine
{
public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
@@ -47,20 +41,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _readBytes;
private long _writtenBytes;
private ServerConnection _connection;
- private final UUID _qmfId;
- private final IApplicationRegistry _appRegistry;
+
private long _createTime = System.currentTimeMillis();
+ private long _lastReadTime;
+ private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network,
- final IApplicationRegistry appRegistry)
+ NetworkConnection network)
{
super(new Assembler(conn));
_connection = conn;
- _connection.setConnectionConfig(this);
- _qmfId = appRegistry.getConfigStore().createId();
- _appRegistry = appRegistry;
if(network != null)
{
@@ -68,14 +59,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
}
- _connection.onOpen(new Runnable()
- {
- public void run()
- {
- getConfigStore().addConfiguredObject(ProtocolEngine_0_10.this);
- }
- });
-
}
public void setNetworkConnection(NetworkConnection network)
@@ -87,13 +70,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
{
_network = network;
- _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
+ _connection.setNetworkConnection(network);
+ _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE));
_connection.setPeerPrincipal(_network.getPeerPrincipal());
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
}
+ private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender)
+ {
+ return new Sender<ByteBuffer>()
+ {
+ @Override
+ public void setIdleTimeout(int i)
+ {
+ sender.setIdleTimeout(i);
+
+ }
+
+ @Override
+ public void send(ByteBuffer msg)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ sender.send(msg);
+
+ }
+
+ @Override
+ public void flush()
+ {
+ sender.flush();
+
+ }
+
+ @Override
+ public void close()
+ {
+ sender.close();
+
+ }
+ };
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
+
public SocketAddress getRemoteAddress()
{
return _network.getRemoteAddress();
@@ -106,6 +137,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void received(final ByteBuffer buf)
{
+ _lastReadTime = System.currentTimeMillis();
super.received(buf);
_connection.receivedComplete();
}
@@ -122,7 +154,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void writerIdle()
{
- //Todo
+ _connection.doHeartbeat();
}
public void readerIdle()
@@ -130,72 +162,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
//Todo
}
- public VirtualHostConfig getVirtualHost()
- {
- return _connection.getVirtualHost();
- }
-
public String getAddress()
{
return getRemoteAddress().toString();
}
- public Boolean isIncoming()
- {
- return true;
- }
-
- public Boolean isSystemConnection()
- {
- return false;
- }
-
- public Boolean isFederationLink()
- {
- return false;
- }
-
public String getAuthId()
{
return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName();
}
- public String getRemoteProcessName()
- {
- return null;
- }
-
- public Integer getRemotePID()
- {
- return null;
- }
-
- public Integer getRemoteParentPID()
- {
- return null;
- }
-
- public ConfigStore getConfigStore()
- {
- return _appRegistry.getConfigStore();
- }
-
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public ConnectionConfigType getConfigType()
- {
- return ConnectionConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
- }
-
public boolean isDurable()
{
return false;
@@ -205,7 +181,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
public void closed()
{
super.closed();
- getConfigStore().removeConfiguredObject(this);
}
public long getCreateTime()
@@ -213,16 +188,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
return _createTime;
}
- public Boolean isShadow()
- {
- return false;
- }
-
- public void mgmtClose()
- {
- _connection.mgmtClose();
- }
-
public long getConnectionId()
{
return _connection.getConnectionId();