summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-05-11 14:27:01 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-05-11 14:27:01 +0000
commit64db4df9cbd3c88b103f43b8860efb3c2b9e2791 (patch)
treecef2834cb93d38757c2705cfe8e4edc977a89760 /qpid/java/broker/src
parent0b22baa11318fc7e86c9d1b9b74ad3d83e276859 (diff)
downloadqpid-python-64db4df9cbd3c88b103f43b8860efb3c2b9e2791.tar.gz
QPID-4831 : [Java Broker] Allow SSL and non-SSL connections on the same port
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1481331 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java24
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java266
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java27
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java7
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java14
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java240
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java27
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java6
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java14
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java4
15 files changed, 518 insertions, 165 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
index 451754e6d8..a539743081 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
@@ -88,13 +88,6 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
}
}
}
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.connectionRegistered(connnection);
- }
- }
}
public void deregisterConnection(AMQConnectionModel connnection)
@@ -111,14 +104,6 @@ public class ConnectionRegistry implements IConnectionRegistry, Closeable
}
}
}
-
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.connectionUnregistered(connnection);
- }
- }
}
public void addRegistryChangeListener(RegistryChangeListener listener)
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
index 1fc22c736c..ddfbf51322 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
@@ -99,9 +99,11 @@ public class AmqpPortAdapter extends PortAdapter
_transport = org.apache.qpid.transport.network.Transport.getIncomingTransportInstance();
final MultiVersionProtocolEngineFactory protocolEngineFactory = new MultiVersionProtocolEngineFactory(
- _broker, supported, defaultSupportedProtocolReply);
+ _broker, transports.contains(Transport.TCP) ? sslContext : null,
+ settings.wantClientAuth(), settings.needClientAuth(),
+ supported, defaultSupportedProtocolReply, this, transports.contains(Transport.TCP) ? Transport.TCP : Transport.SSL);
- _transport.accept(settings, protocolEngineFactory, sslContext);
+ _transport.accept(settings, protocolEngineFactory, transports.contains(Transport.TCP) ? null : sslContext);
for(Transport transport : getTransports())
{
CurrentActor.get().message(BrokerMessages.LISTENING(String.valueOf(transport), getPort()));
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
index 36fafba1cd..b52da3039d 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
@@ -23,6 +23,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.stats.StatisticsGatherer;
import java.util.List;
@@ -82,4 +84,9 @@ public interface AMQConnectionModel extends StatisticsGatherer
long getSessionCountLimit();
long getLastIoTime();
+
+ Port getPort();
+
+ Transport getTransport();
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
index a84a4783ac..4574f87b54 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
@@ -62,6 +62,8 @@ import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.queue.QueueEntry;
@@ -86,6 +88,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
// channels. This value must be of the form 2^x - 1.
private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ private final Port _port;
private AMQShortString _contextKey;
@@ -153,11 +156,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
private final Lock _receivedLock;
private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis());
private final Broker _broker;
+ private final Transport _transport;
- public AMQProtocolEngine(Broker broker, NetworkConnection network, final long connectionId)
+ public AMQProtocolEngine(Broker broker,
+ NetworkConnection network,
+ final long connectionId,
+ Port port,
+ Transport transport)
{
_broker = broker;
+ _port = port;
+ _transport = transport;
_maxNoOfChannels = (Integer)broker.getAttribute(Broker.CONNECTION_SESSION_COUNT_LIMIT);
_receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(broker, this);
@@ -1142,6 +1152,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return _lastIoTime;
}
+ @Override
+ public Port getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
public long getLastReceivedTime()
{
return _lastReceivedTime;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
index d9e5e1c473..267857a34a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
@@ -24,21 +24,35 @@ package org.apache.qpid.server.protocol;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.security.Principal;
import java.util.Set;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.log4j.Logger;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.transport.network.security.ssl.SSLBufferingSender;
+import org.apache.qpid.transport.network.security.ssl.SSLReceiver;
public class MultiVersionProtocolEngine implements ServerProtocolEngine
{
private static final Logger _logger = Logger.getLogger(MultiVersionProtocolEngine.class);
private final long _id;
+ private final SSLContext _sslContext;
+ private final boolean _wantClientAuth;
+ private final boolean _needClientAuth;
+ private final Port _port;
+ private final Transport _transport;
private Set<AmqpProtocolVersion> _supported;
private String _fqdn;
@@ -50,19 +64,10 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
private volatile ServerProtocolEngine _delegate = new SelfDelegateProtocolEngine();
public MultiVersionProtocolEngine(final Broker broker,
+ SSLContext sslContext, boolean wantClientAuth, boolean needClientAuth,
final Set<AmqpProtocolVersion> supported,
final AmqpProtocolVersion defaultSupportedReply,
- final long id,
- final NetworkConnection network)
- {
- this(broker, supported, defaultSupportedReply, id);
- setNetworkConnection(network);
- }
-
- public MultiVersionProtocolEngine(final Broker broker,
- final Set<AmqpProtocolVersion> supported,
- final AmqpProtocolVersion defaultSupportedReply,
- final long id)
+ Port port, Transport transport, final long id)
{
if(defaultSupportedReply != null && !supported.contains(defaultSupportedReply))
{
@@ -74,6 +79,11 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
_broker = broker;
_supported = supported;
_defaultSupportedReply = defaultSupportedReply;
+ _sslContext = sslContext;
+ _wantClientAuth = wantClientAuth;
+ _needClientAuth = needClientAuth;
+ _port = port;
+ _transport = transport;
}
@@ -252,7 +262,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_broker, _network, _id);
+ return new AMQProtocolEngine(_broker, _network, _id, _port, _transport);
}
};
@@ -272,7 +282,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_broker, _network, _id);
+ return new AMQProtocolEngine(_broker, _network, _id, _port, _transport);
}
};
@@ -292,7 +302,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new AMQProtocolEngine(_broker, _network, _id);
+ return new AMQProtocolEngine(_broker, _network, _id, _port, _transport);
}
};
@@ -321,7 +331,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
conn.setConnectionDelegate(connDelegate);
conn.setRemoteAddress(_network.getRemoteAddress());
conn.setLocalAddress(_network.getLocalAddress());
- return new ProtocolEngine_0_10( conn, _network);
+ return new ProtocolEngine_0_10( conn, _network, _port, _transport);
}
};
@@ -341,7 +351,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new ProtocolEngine_1_0_0(_network, _broker, _id);
+ return new ProtocolEngine_1_0_0(_network, _broker, _id, _port, _transport);
}
};
@@ -361,7 +371,7 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
- return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id);
+ return new ProtocolEngine_1_0_0_SASL(_network, _broker, _id, _port, _transport);
}
};
@@ -518,6 +528,14 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
}
}
+ if(newDelegate == null && looksLikeSSL(headerBytes))
+ {
+ if(_sslContext != null)
+ {
+ newDelegate = new SslDelegateProtocolEngine();
+ }
+ }
+
// If no delegate is found then send back a supported protocol version id
if(newDelegate == null)
{
@@ -625,4 +643,218 @@ public class MultiVersionProtocolEngine implements ServerProtocolEngine
return 0;
}
}
+
+ private class SslDelegateProtocolEngine implements ServerProtocolEngine
+ {
+ private final MultiVersionProtocolEngine _decryptEngine;
+ private final SSLEngine _engine;
+ private final SSLReceiver _sslReceiver;
+ private final SSLBufferingSender _sslSender;
+
+ private SslDelegateProtocolEngine()
+ {
+
+ _decryptEngine = new MultiVersionProtocolEngine(_broker, null, false, false, _supported,
+ _defaultSupportedReply, _port, Transport.SSL, _id);
+
+ _engine = _sslContext.createSSLEngine();
+ _engine.setUseClientMode(false);
+
+ if(_needClientAuth)
+ {
+ _engine.setNeedClientAuth(_needClientAuth);
+ }
+ else if(_wantClientAuth)
+ {
+ _engine.setWantClientAuth(_wantClientAuth);
+ }
+
+ SSLStatus sslStatus = new SSLStatus();
+ _sslReceiver = new SSLReceiver(_engine,_decryptEngine,sslStatus);
+ _sslSender = new SSLBufferingSender(_engine,_sender,sslStatus);
+ _decryptEngine.setNetworkConnection(new SSLNetworkConnection(_engine,_network, _sslSender));
+ }
+
+ @Override
+ public void received(ByteBuffer msg)
+ {
+ _sslReceiver.received(msg);
+ _sslSender.send();
+ _sslSender.flush();
+ }
+
+ @Override
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender)
+ {
+ //TODO - Implement
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _decryptEngine.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _decryptEngine.getLocalAddress();
+ }
+
+ @Override
+ public long getWrittenBytes()
+ {
+ return _decryptEngine.getWrittenBytes();
+ }
+
+ @Override
+ public long getReadBytes()
+ {
+ return _decryptEngine.getReadBytes();
+ }
+
+ @Override
+ public void closed()
+ {
+ _decryptEngine.closed();
+ }
+
+ @Override
+ public void writerIdle()
+ {
+ _decryptEngine.writerIdle();
+ }
+
+ @Override
+ public void readerIdle()
+ {
+ _decryptEngine.readerIdle();
+ }
+
+ @Override
+ public void exception(Throwable t)
+ {
+ _decryptEngine.exception(t);
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return _decryptEngine.getConnectionId();
+ }
+
+ @Override
+ public long getLastReadTime()
+ {
+ return _decryptEngine.getLastReadTime();
+ }
+
+ @Override
+ public long getLastWriteTime()
+ {
+ return _decryptEngine.getLastWriteTime();
+ }
+ }
+
+ private boolean looksLikeSSL(byte[] headerBytes)
+ {
+ return headerBytes[0] == 22 && // SSL Handshake
+ (headerBytes[1] == 3 && // SSL 3.0 / TLS 1.x
+ (headerBytes[2] == 0 || // SSL 3.0
+ headerBytes[2] == 1 || // TLS 1.0
+ headerBytes[2] == 2 || // TLS 1.1
+ headerBytes[2] == 3)) && // TLS1.2
+ (headerBytes[5] == 1); // client_hello
+ }
+
+ private static class SSLNetworkConnection implements NetworkConnection
+ {
+ private final NetworkConnection _network;
+ private final SSLBufferingSender _sslSender;
+ private final SSLEngine _engine;
+
+ public SSLNetworkConnection(SSLEngine engine, NetworkConnection network,
+ SSLBufferingSender sslSender)
+ {
+ _engine = engine;
+ _network = network;
+ _sslSender = sslSender;
+
+ }
+
+ @Override
+ public Sender<ByteBuffer> getSender()
+ {
+ return _sslSender;
+ }
+
+ @Override
+ public void start()
+ {
+ _network.start();
+ }
+
+ @Override
+ public void close()
+ {
+ _sslSender.close();
+
+ _network.close();
+ }
+
+ @Override
+ public SocketAddress getRemoteAddress()
+ {
+ return _network.getRemoteAddress();
+ }
+
+ @Override
+ public SocketAddress getLocalAddress()
+ {
+ return _network.getLocalAddress();
+ }
+
+ @Override
+ public void setMaxWriteIdle(int sec)
+ {
+ _network.setMaxWriteIdle(sec);
+ }
+
+ @Override
+ public void setMaxReadIdle(int sec)
+ {
+ _network.setMaxReadIdle(sec);
+ }
+
+ @Override
+ public void setPeerPrincipal(Principal principal)
+ {
+ _network.setPeerPrincipal(principal);
+ }
+
+ @Override
+ public Principal getPeerPrincipal()
+ {
+ try
+ {
+ return _engine.getSession().getPeerPrincipal();
+ }
+ catch (SSLPeerUnverifiedException e)
+ {
+ return null;
+ }
+ }
+
+ @Override
+ public int getMaxReadIdle()
+ {
+ return _network.getMaxReadIdle();
+ }
+
+ @Override
+ public int getMaxWriteIdle()
+ {
+ return _network.getMaxWriteIdle();
+ }
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
index 9f078c8999..4b76546da1 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.protocol;
+import javax.net.ssl.SSLContext;
import org.apache.qpid.protocol.ProtocolEngineFactory;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
{
@@ -34,9 +37,20 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
private final Broker _broker;
private final Set<AmqpProtocolVersion> _supported;
private final AmqpProtocolVersion _defaultSupportedReply;
+ private final SSLContext _sslContext;
+ private final boolean _wantClientAuth;
+ private final boolean _needClientAuth;
+ private final Port _port;
+ private final Transport _transport;
public MultiVersionProtocolEngineFactory(Broker broker,
- final Set<AmqpProtocolVersion> supportedVersions, final AmqpProtocolVersion defaultSupportedReply)
+ SSLContext sslContext,
+ boolean wantClientAuth,
+ boolean needClientAuth,
+ final Set<AmqpProtocolVersion> supportedVersions,
+ final AmqpProtocolVersion defaultSupportedReply,
+ Port port,
+ Transport transport)
{
if(defaultSupportedReply != null && !supportedVersions.contains(defaultSupportedReply))
{
@@ -45,13 +59,20 @@ public class MultiVersionProtocolEngineFactory implements ProtocolEngineFactory
}
_broker = broker;
+ _sslContext = sslContext;
_supported = supportedVersions;
_defaultSupportedReply = defaultSupportedReply;
+ _wantClientAuth = wantClientAuth;
+ _needClientAuth = needClientAuth;
+ _port = port;
+ _transport = transport;
}
public ServerProtocolEngine newProtocolEngine()
{
- return new MultiVersionProtocolEngine(_broker, _supported, _defaultSupportedReply, ID_GENERATOR.getAndIncrement());
+ return new MultiVersionProtocolEngine(_broker, _sslContext, _wantClientAuth, _needClientAuth,
+ _supported, _defaultSupportedReply, _port, _transport,
+ ID_GENERATOR.getAndIncrement()
+ );
}
-
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
index d5f7fe486c..8275ab690c 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.Assembler;
@@ -47,11 +49,12 @@ public class ProtocolEngine_0_10 extends InputHandler implements ServerProtocol
private long _lastWriteTime;
public ProtocolEngine_0_10(ServerConnection conn,
- NetworkConnection network)
+ NetworkConnection network, Port port, Transport transport)
{
super(new Assembler(conn));
_connection = conn;
-
+ _connection.setPort(port);
+ _connection.setTransport(transport);
if(network != null)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
index ed9cd324b4..c2210be935 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
@@ -40,6 +40,8 @@ import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -50,6 +52,8 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHandler
{
static final AtomicLong _connectionIdSource = new AtomicLong(0L);
+ private final Port _port;
+ private final Transport _transport;
//private NetworkConnection _networkDriver;
private long _readBytes;
@@ -100,9 +104,15 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
- public ProtocolEngine_1_0_0(final NetworkConnection networkDriver, final Broker broker, long id)
+ public ProtocolEngine_1_0_0(final NetworkConnection networkDriver,
+ final Broker broker,
+ long id,
+ Port port,
+ Transport transport)
{
_broker = broker;
+ _port = port;
+ _transport = transport;
_connectionId = id;
if(networkDriver != null)
{
@@ -153,7 +163,7 @@ public class ProtocolEngine_1_0_0 implements ServerProtocolEngine, FrameOutputHa
_conn = new ConnectionEndpoint(container, asSaslServerProvider(_broker.getSubjectCreator(
getLocalAddress())));
_conn.setRemoteAddress(_network.getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId));
+ _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
_conn.setFrameOutputHandler(this);
_frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
index 124eb779d5..8e64ca74f9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
@@ -41,6 +41,8 @@ import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
@@ -50,50 +52,52 @@ import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
- private long _readBytes;
- private long _writtenBytes;
-
- private long _lastReadTime;
- private long _lastWriteTime;
- private final Broker _broker;
- private long _createTime = System.currentTimeMillis();
- private ConnectionEndpoint _conn;
- private long _connectionId;
-
- private static final ByteBuffer HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 3,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
- private static final ByteBuffer PROTOCOL_HEADER =
- ByteBuffer.wrap(new byte[]
- {
- (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte) 0,
- (byte) 1,
- (byte) 0,
- (byte) 0
- });
-
-
- private FrameWriter _frameWriter;
- private ProtocolHandler _frameHandler;
- private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
- private Object _sendLock = new Object();
- private byte _major;
- private byte _minor;
- private byte _revision;
+ private final Port _port;
+ private final Transport _transport;
+ private long _readBytes;
+ private long _writtenBytes;
+
+ private long _lastReadTime;
+ private long _lastWriteTime;
+ private final Broker _broker;
+ private long _createTime = System.currentTimeMillis();
+ private ConnectionEndpoint _conn;
+ private long _connectionId;
+
+ private static final ByteBuffer HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 3,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+ private static final ByteBuffer PROTOCOL_HEADER =
+ ByteBuffer.wrap(new byte[]
+ {
+ (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte) 0,
+ (byte) 1,
+ (byte) 0,
+ (byte) 0
+ });
+
+
+ private FrameWriter _frameWriter;
+ private ProtocolHandler _frameHandler;
+ private ByteBuffer _buf = ByteBuffer.allocate(1024 * 1024);
+ private Object _sendLock = new Object();
+ private byte _major;
+ private byte _minor;
+ private byte _revision;
private PrintWriter _out;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
@@ -111,14 +115,16 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
FRAME
}
- private State _state = State.A;
+ private State _state = State.A;
public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker,
- long id)
+ long id, Port port, Transport transport)
{
_connectionId = id;
_broker = broker;
+ _port = port;
+ _transport = transport;
if(networkDriver != null)
{
setNetworkConnection(networkDriver, networkDriver.getSender());
@@ -167,7 +173,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress());
_conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator));
_conn.setRemoteAddress(getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId));
+ _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
_conn.setFrameOutputHandler(this);
_conn.setSaslFrameOutput(this);
@@ -333,102 +339,102 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
- public void exception(Throwable t)
- {
- t.printStackTrace();
- }
+ public void exception(Throwable t)
+ {
+ t.printStackTrace();
+ }
- public void closed()
- {
- // todo
+ public void closed()
+ {
+ // todo
_conn.inputClosed();
- if(_conn != null && _conn.getConnectionEventListener() != null)
+ if (_conn != null && _conn.getConnectionEventListener() != null)
{
- ((Connection_1_0)_conn.getConnectionEventListener()).closed();
+ ((Connection_1_0) _conn.getConnectionEventListener()).closed();
}
- }
+ }
- public long getCreateTime()
- {
- return _createTime;
- }
+ public long getCreateTime()
+ {
+ return _createTime;
+ }
- public boolean canSend()
- {
- return true;
- }
+ public boolean canSend()
+ {
+ return true;
+ }
- public void send(final AMQFrame amqFrame)
- {
- send(amqFrame, null);
- }
+ public void send(final AMQFrame amqFrame)
+ {
+ send(amqFrame, null);
+ }
- private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
+ private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
- public void send(final AMQFrame amqFrame, ByteBuffer buf)
- {
+ public void send(final AMQFrame amqFrame, ByteBuffer buf)
+ {
- synchronized(_sendLock)
- {
- _lastWriteTime = System.currentTimeMillis();
- if(FRAME_LOGGER.isLoggable(Level.FINE))
- {
- FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
- }
+ synchronized (_sendLock)
+ {
+ _lastWriteTime = System.currentTimeMillis();
+ if (FRAME_LOGGER.isLoggable(Level.FINE))
+ {
+ FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
+ }
- _frameWriter.setValue(amqFrame);
+ _frameWriter.setValue(amqFrame);
- ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
+ ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
- int size = _frameWriter.writeToBuffer(dup);
- if(size > _conn.getMaxFrameSize())
- {
- throw new OversizeFrameException(amqFrame,size);
- }
+ int size = _frameWriter.writeToBuffer(dup);
+ if (size > _conn.getMaxFrameSize())
+ {
+ throw new OversizeFrameException(amqFrame, size);
+ }
- dup.flip();
- _writtenBytes += dup.limit();
+ dup.flip();
+ _writtenBytes += dup.limit();
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- ByteBuffer dup2 = dup.duplicate();
- byte[] data = new byte[dup2.remaining()];
- dup2.get(data);
- Binary bin = new Binary(data);
- RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
- }
+ if (RAW_LOGGER.isLoggable(Level.FINE))
+ {
+ ByteBuffer dup2 = dup.duplicate();
+ byte[] data = new byte[dup2.remaining()];
+ dup2.get(data);
+ Binary bin = new Binary(data);
+ RAW_LOGGER.fine("SEND[" + getRemoteAddress() + "] : " + bin.toString());
+ }
- _sender.send(dup);
- _sender.flush();
+ _sender.send(dup);
+ _sender.flush();
- }
- }
+ }
+ }
- public void send(short channel, FrameBody body)
- {
- AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
- send(frame);
+ public void send(short channel, FrameBody body)
+ {
+ AMQFrame frame = AMQFrame.createAMQFrame(channel, body);
+ send(frame);
- }
+ }
- public void close()
- {
- _sender.close();
- }
+ public void close()
+ {
+ _sender.close();
+ }
- public void setLogOutput(final PrintWriter out)
- {
- _out = out;
- }
+ public void setLogOutput(final PrintWriter out)
+ {
+ _out = out;
+ }
- public long getConnectionId()
- {
- return _connectionId;
- }
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
public long getLastReadTime()
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index f79d34ea71..b274eabf04 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -29,6 +29,8 @@ import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -43,7 +45,9 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTIO
public class Connection_1_0 implements ConnectionEventListener
{
+ private final Port _port;
private VirtualHost _vhost;
+ private final Transport _transport;
private final ConnectionEndpoint _conn;
private final long _connectionId;
private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
@@ -60,9 +64,15 @@ public class Connection_1_0 implements ConnectionEventListener
- public Connection_1_0(VirtualHost virtualHost, ConnectionEndpoint conn, long connectionId)
+ public Connection_1_0(VirtualHost virtualHost,
+ ConnectionEndpoint conn,
+ long connectionId,
+ Port port,
+ Transport transport)
{
_vhost = virtualHost;
+ _port = port;
+ _transport = transport;
_conn = conn;
_connectionId = connectionId;
_vhost.getConnectionRegistry().registerConnection(_model);
@@ -230,6 +240,18 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
+ public Port getPort()
+ {
+ return _port;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
+ @Override
public void initialiseStatistics()
{
_messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
index 58de6a0cdf..b49bd89266 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
@@ -35,6 +35,8 @@ import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
@@ -66,10 +68,12 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
private final long _connectionId;
private final Object _reference = new Object();
private VirtualHost _virtualHost;
+ private Port _port;
private AtomicLong _lastIoTime = new AtomicLong();
private boolean _blocking;
private Principal _peerPrincipal;
private NetworkConnection _networkConnection;
+ private Transport _transport;
public ServerConnection(final long connectionId)
{
@@ -148,6 +152,28 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
initialiseStatistics();
}
+ @Override
+ public Port getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(Port port)
+ {
+ _port = port;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(Transport transport)
+ {
+ _transport = transport;
+ }
+
public void onOpen(final Runnable task)
{
_onOpenTask = task;
@@ -451,6 +477,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return _lastIoTime.longValue();
}
+
public String getClientId()
{
return getConnectionDelegate().getClientId();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
index 3216f8886a..5d3051f7b2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -60,7 +60,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
public InternalTestProtocolSession(VirtualHost virtualHost, Broker broker) throws AMQException
{
- super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement());
+ super(broker, new TestNetworkConnection(), ID_GENERATOR.getAndIncrement(), null, null);
_channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
index 02b8c74feb..0d4b4ba9f8 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
@@ -154,7 +154,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
Set<AmqpProtocolVersion> versions = EnumSet.allOf(AmqpProtocolVersion.class);
MultiVersionProtocolEngineFactory factory =
- new MultiVersionProtocolEngineFactory(_broker, versions, null);
+ new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, null, null,
+ org.apache.qpid.server.model.Transport.TCP);
//create a dummy to retrieve the 'current' ID number
long previousId = factory.newProtocolEngine().getConnectionId();
@@ -192,7 +193,8 @@ public class MultiVersionProtocolEngineFactoryTest extends QpidTestCase
try
{
- new MultiVersionProtocolEngineFactory(_broker, versions, AmqpProtocolVersion.v0_9);
+ new MultiVersionProtocolEngineFactory(_broker, null, false, false, versions, AmqpProtocolVersion.v0_9, null,
+ org.apache.qpid.server.model.Transport.TCP);
fail("should not have been allowed to create the factory");
}
catch(IllegalArgumentException iae)
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
index 65dbf7bae1..0bc20836f6 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
@@ -27,6 +27,8 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -539,5 +541,17 @@ public class MockSubscription implements Subscription
{
return 0;
}
+
+ @Override
+ public Port getPort()
+ {
+ return null;
+ }
+
+ @Override
+ public Transport getTransport()
+ {
+ return null;
+ }
}
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
index 89d434e95d..efb62ba085 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/SubscriptionFactoryImplTest.java
@@ -73,7 +73,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase
}
/**
- * Tests that while creating Subscriptions of various types, the
+ * Tests that while creating Subscriptions of various types, the
* ID numbers assigned are allocated from a common sequence
* (in increasing order).
*/
@@ -104,7 +104,7 @@ public class SubscriptionFactoryImplTest extends QpidTestCase
//create a 0-10 subscription
ServerConnection conn = new ServerConnection(1);
- ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection());
+ ProtocolEngine_0_10 engine = new ProtocolEngine_0_10(conn, new TestNetworkConnection(), null, null);
conn.setVirtualHost(_session.getVirtualHost());
ServerSessionDelegate sesDel = new ServerSessionDelegate();
Binary name = new Binary(new byte[]{new Byte("1")});