diff options
Diffstat (limited to 'java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java')
-rwxr-xr-x | java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java | 149 |
1 files changed, 57 insertions, 92 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java index fd6e9300ec..d5f7fe486c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java @@ -21,13 +21,7 @@ package org.apache.qpid.server.protocol; import org.apache.qpid.protocol.ServerProtocolEngine; -import org.apache.qpid.server.configuration.ConfigStore; -import org.apache.qpid.server.configuration.ConfiguredObject; -import org.apache.qpid.server.configuration.ConnectionConfig; -import org.apache.qpid.server.configuration.ConnectionConfigType; -import org.apache.qpid.server.configuration.VirtualHostConfig; import org.apache.qpid.server.logging.messages.ConnectionMessages; -import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.transport.ServerConnection; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.Assembler; @@ -37,9 +31,9 @@ import org.apache.qpid.transport.network.NetworkConnection; import java.net.SocketAddress; import java.nio.ByteBuffer; -import java.util.UUID; -public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine, ConnectionConfig + +public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocolEngine { public static final int MAX_FRAME_SIZE = 64 * 1024 - 1; @@ -47,20 +41,17 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol private long _readBytes; private long _writtenBytes; private ServerConnection _connection; - private final UUID _qmfId; - private final IApplicationRegistry _appRegistry; + private long _createTime = System.currentTimeMillis(); + private long _lastReadTime; + private long _lastWriteTime; public ProtocolEngine_0_10(ServerConnection conn, - NetworkConnection network, - final IApplicationRegistry appRegistry) + NetworkConnection network) { super(new Assembler(conn)); _connection = conn; - _connection.setConnectionConfig(this); - _qmfId = appRegistry.getConfigStore().createId(); - _appRegistry = appRegistry; if(network != null) { @@ -68,14 +59,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol } - _connection.onOpen(new Runnable() - { - public void run() - { - getConfigStore().addConfiguredObject(ProtocolEngine_0_10.this); - } - }); - } public void setNetworkConnection(NetworkConnection network) @@ -87,13 +70,61 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol { _network = network; - _connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE)); + _connection.setNetworkConnection(network); + _connection.setSender(new Disassembler(wrapSender(sender), MAX_FRAME_SIZE)); _connection.setPeerPrincipal(_network.getPeerPrincipal()); // FIXME Two log messages to maintain compatibility with earlier protocol versions _connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false)); _connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false)); } + private Sender<ByteBuffer> wrapSender(final Sender<ByteBuffer> sender) + { + return new Sender<ByteBuffer>() + { + @Override + public void setIdleTimeout(int i) + { + sender.setIdleTimeout(i); + + } + + @Override + public void send(ByteBuffer msg) + { + _lastWriteTime = System.currentTimeMillis(); + sender.send(msg); + + } + + @Override + public void flush() + { + sender.flush(); + + } + + @Override + public void close() + { + sender.close(); + + } + }; + } + + @Override + public long getLastReadTime() + { + return _lastReadTime; + } + + @Override + public long getLastWriteTime() + { + return _lastWriteTime; + } + public SocketAddress getRemoteAddress() { return _network.getRemoteAddress(); @@ -106,6 +137,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void received(final ByteBuffer buf) { + _lastReadTime = System.currentTimeMillis(); super.received(buf); _connection.receivedComplete(); } @@ -122,7 +154,7 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void writerIdle() { - //Todo + _connection.doHeartbeat(); } public void readerIdle() @@ -130,72 +162,16 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol //Todo } - public VirtualHostConfig getVirtualHost() - { - return _connection.getVirtualHost(); - } - public String getAddress() { return getRemoteAddress().toString(); } - public Boolean isIncoming() - { - return true; - } - - public Boolean isSystemConnection() - { - return false; - } - - public Boolean isFederationLink() - { - return false; - } - public String getAuthId() { return _connection.getAuthorizedPrincipal() == null ? null : _connection.getAuthorizedPrincipal().getName(); } - public String getRemoteProcessName() - { - return null; - } - - public Integer getRemotePID() - { - return null; - } - - public Integer getRemoteParentPID() - { - return null; - } - - public ConfigStore getConfigStore() - { - return _appRegistry.getConfigStore(); - } - - @Override - public UUID getQMFId() - { - return _qmfId; - } - - public ConnectionConfigType getConfigType() - { - return ConnectionConfigType.getInstance(); - } - - public ConfiguredObject getParent() - { - return getVirtualHost(); - } - public boolean isDurable() { return false; @@ -205,7 +181,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol public void closed() { super.closed(); - getConfigStore().removeConfiguredObject(this); } public long getCreateTime() @@ -213,16 +188,6 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol return _createTime; } - public Boolean isShadow() - { - return false; - } - - public void mgmtClose() - { - _connection.mgmtClose(); - } - public long getConnectionId() { return _connection.getConnectionId(); |