diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-18 23:40:15 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-18 23:40:15 +0000 |
| commit | 1688957ffdcafc82dcd94891b0282d414522c51e (patch) | |
| tree | 50d3c9df3cfd8ff8529e8530413c833309338233 | |
| parent | d6f465d6a10b4d1d9ced48a10ae980c98697ff5b (diff) | |
| download | qpid-python-1688957ffdcafc82dcd94891b0282d414522c51e.tar.gz | |
QPID-5563 : [Java Broker] [AMQP 1.0] Use the hostname field in the open frame to select the virtual host
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1569563 13f79535-47bb-0310-9956-ffa450edef68
3 files changed, 205 insertions, 171 deletions
diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java index 1556876681..77880fb145 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java @@ -123,6 +123,8 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour private Map _properties; private long _syncTimeout = DEFAULT_SYNC_TIMEOUT; + private String _localHostname; + public ConnectionEndpoint(Container container, SaslServerProvider cbs) { _container = container; @@ -334,6 +336,7 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour : _desiredMaxFrameSize).intValue(); _remoteContainerId = open.getContainerId(); + _localHostname = open.getHostname(); if (open.getIdleTimeOut() != null) { @@ -1041,6 +1044,12 @@ public class ConnectionEndpoint implements DescribedTypeConstructorRegistry.Sour _remoteHostname = remoteHostname; } + public String getLocalHostname() + { + return _localHostname; + } + + public boolean isOpen() { return _state == ConnectionState.OPEN; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 7f8237cc85..cd246c081f 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -27,12 +27,14 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -47,6 +49,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { private final Port _port; + private final Broker _broker; private VirtualHost _vhost; private final Transport _transport; private final ConnectionEndpoint _conn; @@ -84,18 +87,18 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod - public Connection_1_0(VirtualHost virtualHost, + public Connection_1_0(Broker broker, ConnectionEndpoint conn, long connectionId, Port port, Transport transport) { - _vhost = virtualHost; + _broker = broker; _port = port; _transport = transport; _conn = conn; _connectionId = connectionId; - _vhost.getConnectionRegistry().registerConnection(this); + //_vhost.getConnectionRegistry().registerConnection(this); } @@ -152,65 +155,65 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } - @Override - public void close(AMQConstant cause, String message) - { - _conn.close(); - } + @Override + public void close(AMQConstant cause, String message) + { + _conn.close(); + } - @Override - public void block() - { - // TODO - } + @Override + public void block() + { + // TODO + } - @Override - public void unblock() - { - // TODO - } + @Override + public void unblock() + { + // TODO + } - @Override - public void closeSession(Session_1_0 session, AMQConstant cause, String message) - { - session.close(cause, message); - } + @Override + public void closeSession(Session_1_0 session, AMQConstant cause, String message) + { + session.close(cause, message); + } - @Override - public long getConnectionId() - { - return _connectionId; - } + @Override + public long getConnectionId() + { + return _connectionId; + } - @Override - public List<Session_1_0> getSessionModels() - { - return new ArrayList<Session_1_0>(_sessions); - } + @Override + public List<Session_1_0> getSessionModels() + { + return new ArrayList<Session_1_0>(_sessions); + } - @Override - public LogSubject getLogSubject() - { - return _logSubject; - } + @Override + public LogSubject getLogSubject() + { + return _logSubject; + } - @Override - public boolean isSessionNameUnique(byte[] name) - { - return true; // TODO - } + @Override + public boolean isSessionNameUnique(byte[] name) + { + return true; // TODO + } - @Override - public String getRemoteAddressString() - { - return String.valueOf(_conn.getRemoteAddress()); - } + @Override + public String getRemoteAddressString() + { + return String.valueOf(_conn.getRemoteAddress()); + } - @Override - public String getClientId() - { - return _conn.getRemoteContainerId(); - } + @Override + public String getClientId() + { + return _conn.getRemoteContainerId(); + } @Override public String getRemoteContainerName() @@ -219,123 +222,123 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public String getClientVersion() - { - return ""; //TODO - } + public String getClientVersion() + { + return ""; //TODO + } - @Override - public String getClientProduct() - { - return ""; //TODO - } + @Override + public String getClientProduct() + { + return ""; //TODO + } - public Principal getAuthorizedPrincipal() - { - return _conn.getUser(); - } + public Principal getAuthorizedPrincipal() + { + return _conn.getUser(); + } - @Override - public long getSessionCountLimit() - { - return 0; // TODO - } + @Override + public long getSessionCountLimit() + { + return 0; // TODO + } - @Override - public long getLastIoTime() - { - return 0; // TODO - } + @Override + public long getLastIoTime() + { + return 0; // TODO + } - @Override - public String getVirtualHostName() - { - return _vhost == null ? null : _vhost.getName(); - } + @Override + public String getVirtualHostName() + { + return _vhost == null ? null : _vhost.getName(); + } - @Override - public Port getPort() - { - return _port; - } + @Override + public Port getPort() + { + return _port; + } - @Override - public Transport getTransport() - { - return _transport; - } + @Override + public Transport getTransport() + { + return _transport; + } - @Override - public void stop() - { - _stopped = true; - } + @Override + public void stop() + { + _stopped = true; + } - @Override - public boolean isStopped() - { - return _stopped; - } + @Override + public boolean isStopped() + { + return _stopped; + } - @Override - public void initialiseStatistics() - { - _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId()); - _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId()); - _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId()); - _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId()); - } + @Override + public void initialiseStatistics() + { + _messageDeliveryStatistics = new StatisticsCounter("messages-delivered-" + getConnectionId()); + _dataDeliveryStatistics = new StatisticsCounter("data-delivered-" + getConnectionId()); + _messageReceiptStatistics = new StatisticsCounter("messages-received-" + getConnectionId()); + _dataReceiptStatistics = new StatisticsCounter("data-received-" + getConnectionId()); + } - @Override - public void registerMessageReceived(long messageSize, long timestamp) - { - _messageReceiptStatistics.registerEvent(1L, timestamp); - _dataReceiptStatistics.registerEvent(messageSize, timestamp); - _vhost.registerMessageReceived(messageSize,timestamp); + @Override + public void registerMessageReceived(long messageSize, long timestamp) + { + _messageReceiptStatistics.registerEvent(1L, timestamp); + _dataReceiptStatistics.registerEvent(messageSize, timestamp); + _vhost.registerMessageReceived(messageSize,timestamp); - } + } - @Override - public void registerMessageDelivered(long messageSize) - { + @Override + public void registerMessageDelivered(long messageSize) + { - _messageDeliveryStatistics.registerEvent(1L); - _dataDeliveryStatistics.registerEvent(messageSize); - _vhost.registerMessageDelivered(messageSize); - } + _messageDeliveryStatistics.registerEvent(1L); + _dataDeliveryStatistics.registerEvent(messageSize); + _vhost.registerMessageDelivered(messageSize); + } - @Override - public StatisticsCounter getMessageDeliveryStatistics() - { - return _messageDeliveryStatistics; - } + @Override + public StatisticsCounter getMessageDeliveryStatistics() + { + return _messageDeliveryStatistics; + } - @Override - public StatisticsCounter getMessageReceiptStatistics() - { - return _messageReceiptStatistics; - } + @Override + public StatisticsCounter getMessageReceiptStatistics() + { + return _messageReceiptStatistics; + } - @Override - public StatisticsCounter getDataDeliveryStatistics() - { - return _dataDeliveryStatistics; - } + @Override + public StatisticsCounter getDataDeliveryStatistics() + { + return _dataDeliveryStatistics; + } - @Override - public StatisticsCounter getDataReceiptStatistics() - { - return _dataReceiptStatistics; - } + @Override + public StatisticsCounter getDataReceiptStatistics() + { + return _dataReceiptStatistics; + } - @Override - public void resetStatistics() - { - _dataDeliveryStatistics.reset(); - _dataReceiptStatistics.reset(); - _messageDeliveryStatistics.reset(); - _messageReceiptStatistics.reset(); - } + @Override + public void resetStatistics() + { + _dataDeliveryStatistics.reset(); + _dataReceiptStatistics.reset(); + _messageDeliveryStatistics.reset(); + _messageReceiptStatistics.reset(); + } @@ -345,4 +348,24 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } + public void frameReceived() + { + if(_vhost == null && _conn.isOpen()) + { + String host = _conn.getLocalHostname(); + if(host == null || host.trim().equals("")) + { + host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST); + } + _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host); + if(_vhost == null) + { + final Error err = new Error(); + err.setCondition(AmqpError.NOT_FOUND); + err.setDescription("Unknown hostname " + _conn.getLocalHostname()); + _conn.close(err); + } + + } + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index fe214eb899..aa52c6191a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -73,7 +73,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private long _lastWriteTime; private final Broker _broker; private long _createTime = System.currentTimeMillis(); - private ConnectionEndpoint _conn; + private ConnectionEndpoint _endpoint; private long _connectionId; private static final ByteBuffer HEADER = @@ -113,6 +113,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut private PrintWriter _out; private NetworkConnection _network; private Sender<ByteBuffer> _sender; + private Connection_1_0 _connection; static enum State { @@ -181,9 +182,8 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut Container container = new Container(_broker.getId().toString()); - VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST)); SubjectCreator subjectCreator = _broker.getSubjectCreator(getLocalAddress()); - _conn = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator)); + _endpoint = new ConnectionEndpoint(container, asSaslServerProvider(subjectCreator)); Map<Symbol,Object> serverProperties = new LinkedHashMap<Symbol, Object>(); serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), QpidProperties.getProductName()); @@ -191,18 +191,19 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), QpidProperties.getBuildVersion()); serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), _broker.getName()); - _conn.setProperties(serverProperties); + _endpoint.setProperties(serverProperties); - _conn.setRemoteAddress(getRemoteAddress()); - _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId, _port, _transport)); - _conn.setFrameOutputHandler(this); - _conn.setSaslFrameOutput(this); + _endpoint.setRemoteAddress(getRemoteAddress()); + _connection = new Connection_1_0(_broker, _endpoint, _connectionId, _port, _transport); + _endpoint.setConnectionEventListener(_connection); + _endpoint.setFrameOutputHandler(this); + _endpoint.setSaslFrameOutput(this); - _conn.setOnSaslComplete(new Runnable() + _endpoint.setOnSaslComplete(new Runnable() { public void run() { - if(_conn.isAuthenticated()) + if (_endpoint.isAuthenticated()) { _sender.send(PROTOCOL_HEADER.duplicate()); _sender.flush(); @@ -213,13 +214,13 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } } }); - _frameWriter = new FrameWriter(_conn.getDescribedTypeRegistry()); - _frameHandler = new SASLFrameHandler(_conn); + _frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry()); + _frameHandler = new SASLFrameHandler(_endpoint); _sender.send(HEADER.duplicate()); _sender.flush(); - _conn.initiateSASL(subjectCreator.getMechanisms().split(" ")); + _endpoint.initiateSASL(subjectCreator.getMechanisms().split(" ")); } @@ -357,6 +358,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut if (msg.hasRemaining()) { _frameHandler = _frameHandler.parse(msg); + _connection.frameReceived(); } } } @@ -380,7 +382,7 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut final Error err = new Error(); err.setCondition(AmqpError.INTERNAL_ERROR); err.setDescription(throwable.getMessage()); - _conn.close(err); + _endpoint.close(err); close(); } catch(TransportException e) @@ -406,10 +408,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut try { // todo - _conn.inputClosed(); - if (_conn != null && _conn.getConnectionEventListener() != null) + _endpoint.inputClosed(); + if (_endpoint != null && _endpoint.getConnectionEventListener() != null) { - ((Connection_1_0) _conn.getConnectionEventListener()).closed(); + ((Connection_1_0) _endpoint.getConnectionEventListener()).closed(); } } catch(RuntimeException e) @@ -450,10 +452,10 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut _frameWriter.setValue(amqFrame); - ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize()); + ByteBuffer dup = ByteBuffer.allocate(_endpoint.getMaxFrameSize()); int size = _frameWriter.writeToBuffer(dup); - if (size > _conn.getMaxFrameSize()) + if (size > _endpoint.getMaxFrameSize()) { throw new OversizeFrameException(amqFrame, size); } |
