summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-18 23:40:15 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-18 23:40:15 +0000
commit1688957ffdcafc82dcd94891b0282d414522c51e (patch)
tree50d3c9df3cfd8ff8529e8530413c833309338233
parentd6f465d6a10b4d1d9ced48a10ae980c98697ff5b (diff)
downloadqpid-python-1688957ffdcafc82dcd94891b0282d414522c51e.tar.gz
QPID-5563 : [Java Broker] [AMQP 1.0] Use the hostname field in the open frame to select the virtual host
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569563 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java9
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java327
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java40
3 files changed, 205 insertions, 171 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
index 1556876681..77880fb145 100644
--- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
+++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
@@ -123,6 +123,8 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
private Map _properties;
private long _syncTimeout = DEFAULT_SYNC_TIMEOUT;
+ private String _localHostname;
+
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
_container = container;
@@ -334,6 +336,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
: _desiredMaxFrameSize).intValue();
_remoteContainerId = open.getContainerId();
+ _localHostname = open.getHostname();
if (open.getIdleTimeOut() != null)
{
@@ -1041,6 +1044,12 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour
_remoteHostname = remoteHostname;
}
+ public String getLocalHostname()
+ {
+ return _localHostname;
+ }
+
+
public boolean isOpen()
{
return _state == ConnectionState.OPEN;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 7f8237cc85..cd246c081f 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -27,12 +27,14 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -47,6 +49,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
{
private final Port _port;
+ private final Broker _broker;
private VirtualHost _vhost;
private final Transport _transport;
private final ConnectionEndpoint _conn;
@@ -84,18 +87,18 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
- public Connection_1_0(VirtualHost virtualHost,
+ public Connection_1_0(Broker broker,
ConnectionEndpoint conn,
long connectionId,
Port port,
Transport transport)
{
- _vhost = virtualHost;
+ _broker = broker;
_port = port;
_transport = transport;
_conn = conn;
_connectionId = connectionId;
- _vhost.getConnectionRegistry().registerConnection(this);
+ //_vhost.getConnectionRegistry().registerConnection(this);
}
@@ -152,65 +155,65 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
- @Override
- public void close(AMQConstant cause, String message)
- {
- _conn.close();
- }
+ @Override
+ public void close(AMQConstant cause, String message)
+ {
+ _conn.close();
+ }
- @Override
- public void block()
- {
- // TODO
- }
+ @Override
+ public void block()
+ {
+ // TODO
+ }
- @Override
- public void unblock()
- {
- // TODO
- }
+ @Override
+ public void unblock()
+ {
+ // TODO
+ }
- @Override
- public void closeSession(Session_1_0 session, AMQConstant cause, String message)
- {
- session.close(cause, message);
- }
+ @Override
+ public void closeSession(Session_1_0 session, AMQConstant cause, String message)
+ {
+ session.close(cause, message);
+ }
- @Override
- public long getConnectionId()
- {
- return _connectionId;
- }
+ @Override
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
- @Override
- public List<Session_1_0> getSessionModels()
- {
- return new ArrayList<Session_1_0>(_sessions);
- }
+ @Override
+ public List<Session_1_0> getSessionModels()
+ {
+ return new ArrayList<Session_1_0>(_sessions);
+ }
- @Override
- public LogSubject getLogSubject()
- {
- return _logSubject;
- }
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
- @Override
- public boolean isSessionNameUnique(byte[] name)
- {
- return true; // TODO
- }
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return true; // TODO
+ }
- @Override
- public String getRemoteAddressString()
- {
- return String.valueOf(_conn.getRemoteAddress());
- }
+ @Override
+ public String getRemoteAddressString()
+ {
+ return String.valueOf(_conn.getRemoteAddress());
+ }
- @Override
- public String getClientId()
- {
- return _conn.getRemoteContainerId();
- }
+ @Override
+ public String getClientId()
+ {
+ return _conn.getRemoteContainerId();
+ }
@Override
public String getRemoteContainerName()
@@ -219,123 +222,123 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
@Override
- public String getClientVersion()
- {
- return ""; //TODO
- }
+ public String getClientVersion()
+ {
+ return ""; //TODO
+ }
- @Override
- public String getClientProduct()
- {
- return ""; //TODO
- }
+ @Override
+ public String getClientProduct()
+ {
+ return ""; //TODO
+ }
- public Principal getAuthorizedPrincipal()
- {
- return _conn.getUser();
- }
+ public Principal getAuthorizedPrincipal()
+ {
+ return _conn.getUser();
+ }
- @Override
- public long getSessionCountLimit()
- {
- return 0; // TODO
- }
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0; // TODO
+ }
- @Override
- public long getLastIoTime()
- {
- return 0; // TODO
- }
+ @Override
+ public long getLastIoTime()
+ {
+ return 0; // TODO
+ }
- @Override
- public String getVirtualHostName()
- {
- return _vhost == null ? null : _vhost.getName();
- }
+ @Override
+ public String getVirtualHostName()
+ {
+ return _vhost == null ? null : _vhost.getName();
+ }
- @Override
- public Port getPort()
- {
- return _port;
- }
+ @Override
+ public Port getPort()
+ {
+ return _port;
+ }
- @Override
- public Transport getTransport()
- {
- return _transport;
- }
+ @Override
+ public Transport getTransport()
+ {
+ return _transport;
+ }
- @Override
- public void stop()
- {
- _stopped = true;
- }
+ @Override
+ public void stop()
+ {
+ _stopped = true;
+ }
- @Override
- public boolean isStopped()
- {
- return _stopped;
- }
+ @Override
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
- @Override
- public void initialiseStatistics()
- {
- _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
- _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId());
- _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId());
- _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId());
- }
+ @Override
+ public void initialiseStatistics()
+ {
+ _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId());
+ _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId());
+ _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId());
+ _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId());
+ }
- @Override
- public void registerMessageReceived(long messageSize, long timestamp)
- {
- _messageReceiptStatistics.registerEvent(1L, timestamp);
- _dataReceiptStatistics.registerEvent(messageSize, timestamp);
- _vhost.registerMessageReceived(messageSize,timestamp);
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ _messageReceiptStatistics.registerEvent(1L, timestamp);
+ _dataReceiptStatistics.registerEvent(messageSize, timestamp);
+ _vhost.registerMessageReceived(messageSize,timestamp);
- }
+ }
- @Override
- public void registerMessageDelivered(long messageSize)
- {
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
- _messageDeliveryStatistics.registerEvent(1L);
- _dataDeliveryStatistics.registerEvent(messageSize);
- _vhost.registerMessageDelivered(messageSize);
- }
+ _messageDeliveryStatistics.registerEvent(1L);
+ _dataDeliveryStatistics.registerEvent(messageSize);
+ _vhost.registerMessageDelivered(messageSize);
+ }
- @Override
- public StatisticsCounter getMessageDeliveryStatistics()
- {
- return _messageDeliveryStatistics;
- }
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messageDeliveryStatistics;
+ }
- @Override
- public StatisticsCounter getMessageReceiptStatistics()
- {
- return _messageReceiptStatistics;
- }
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messageReceiptStatistics;
+ }
- @Override
- public StatisticsCounter getDataDeliveryStatistics()
- {
- return _dataDeliveryStatistics;
- }
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDeliveryStatistics;
+ }
- @Override
- public StatisticsCounter getDataReceiptStatistics()
- {
- return _dataReceiptStatistics;
- }
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceiptStatistics;
+ }
- @Override
- public void resetStatistics()
- {
- _dataDeliveryStatistics.reset();
- _dataReceiptStatistics.reset();
- _messageDeliveryStatistics.reset();
- _messageReceiptStatistics.reset();
- }
+ @Override
+ public void resetStatistics()
+ {
+ _dataDeliveryStatistics.reset();
+ _dataReceiptStatistics.reset();
+ _messageDeliveryStatistics.reset();
+ _messageReceiptStatistics.reset();
+ }
@@ -345,4 +348,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod
}
+ public void frameReceived()
+ {
+ if(_vhost == null && _conn.isOpen())
+ {
+ String host = _conn.getLocalHostname();
+ if(host == null || host.trim().equals(""))
+ {
+ host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST);
+ }
+ _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host);
+ if(_vhost == null)
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_FOUND);
+ err.setDescription("Unknown hostname " + _conn.getLocalHostname());
+ _conn.close(err);
+ }
+
+ }
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index fe214eb899..aa52c6191a 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -73,7 +73,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private long _lastWriteTime;
private final Broker _broker;
private long _createTime = System.currentTimeMillis();
- private ConnectionEndpoint _conn;
+ private ConnectionEndpoint _endpoint;
private long _connectionId;
private static final ByteBuffer HEADER =
@@ -113,6 +113,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
private PrintWriter _out;
private NetworkConnection _network;
private Sender<ByteBuffer> _sender;
+ private Connection_1_0 _connection;
static enum State {
@@ -181,9 +182,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
Container container = new Container(_broker.getId().toString());
- VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress());
- _conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator));
+ _endpoint = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator));
Map<Symbol,Object> serverProperties = new LinkedHashMap<Symbol, Object>();
serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), QpidProperties.getProductName());
@@ -191,18 +191,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), QpidProperties.getBuildVersion());
serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), _broker.getName());
- _conn.setProperties(serverProperties);
+ _endpoint.setProperties(serverProperties);
- _conn.setRemoteAddress(getRemoteAddress());
- _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport));
- _conn.setFrameOutputHandler(this);
- _conn.setSaslFrameOutput(this);
+ _endpoint.setRemoteAddress(getRemoteAddress());
+ _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport);
+ _endpoint.setConnectionEventListener(_connection);
+ _endpoint.setFrameOutputHandler(this);
+ _endpoint.setSaslFrameOutput(this);
- _conn.setOnSaslComplete(new Runnable()
+ _endpoint.setOnSaslComplete(new Runnable()
{
public void run()
{
- if(_conn.isAuthenticated())
+ if (_endpoint.isAuthenticated())
{
_sender.send(PROTOCOL_HEADER.duplicate());
_sender.flush();
@@ -213,13 +214,13 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
}
});
- _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry());
- _frameHandler = new SASLFrameHandler(_conn);
+ _frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry());
+ _frameHandler = new SASLFrameHandler(_endpoint);
_sender.send(HEADER.duplicate());
_sender.flush();
- _conn.initiateSASL(subjectCreator.getMechanisms().split(" "));
+ _endpoint.initiateSASL(subjectCreator.getMechanisms().split(" "));
}
@@ -357,6 +358,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
if (msg.hasRemaining())
{
_frameHandler = _frameHandler.parse(msg);
+ _connection.frameReceived();
}
}
}
@@ -380,7 +382,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
final Error err = new Error();
err.setCondition(AmqpError.INTERNAL_ERROR);
err.setDescription(throwable.getMessage());
- _conn.close(err);
+ _endpoint.close(err);
close();
}
catch(TransportException e)
@@ -406,10 +408,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
try
{
// todo
- _conn.inputClosed();
- if (_conn != null && _conn.getConnectionEventListener() != null)
+ _endpoint.inputClosed();
+ if (_endpoint != null && _endpoint.getConnectionEventListener() != null)
{
- ((Connection_1_0) _conn.getConnectionEventListener()).closed();
+ ((Connection_1_0) _endpoint.getConnectionEventListener()).closed();
}
}
catch(RuntimeException e)
@@ -450,10 +452,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
_frameWriter.setValue(amqFrame);
- ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
+ ByteBuffer dup = ByteBuffer.allocate(_endpoint.getMaxFrameSize());
int size = _frameWriter.writeToBuffer(dup);
- if (size > _conn.getMaxFrameSize())
+ if (size > _endpoint.getMaxFrameSize())
{
throw new OversizeFrameException(amqFrame, size);
}