diff options
| author | Keith Wall <kwall@apache.org> | 2015-03-16 16:52:32 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2015-03-16 16:52:32 +0000 |
| commit | 67a2cb9fe4149dc9d6cd750b3426995033ea9d9d (patch) | |
| tree | 20633ed3c757a764123d5c98811462afe72cb9b2 /qpid/java/broker-plugins | |
| parent | e756d0579c8e0f4373e56a4d608acf9eb5632f57 (diff) | |
| download | qpid-python-67a2cb9fe4149dc9d6cd750b3426995033ea9d9d.tar.gz | |
QPID-6429, QPID-6262: [Java Broker] Improve error handling in new NIO code; Remove MINA terminlogy (session etc) in 0-8 stack
* Also added uncaught exception handler in test framework (QBTC) to guard log the case where a thread exits abnormally.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1667068 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
10 files changed, 62 insertions, 110 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 2280377fca..12aaa09eb0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -540,11 +540,6 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S return _connectionId; } - public boolean isSessionNameUnique(byte[] name) - { - return !super.hasSessionWithName(name); - } - public String getRemoteAddressString() { return String.valueOf(getRemoteAddress()); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java index 7f646b43b4..b4c0b15dc0 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java @@ -371,12 +371,17 @@ public class ServerConnectionDelegate extends ServerDelegate while(connections.hasNext()) { final AMQConnectionModel amqConnectionModel = connections.next(); - final String userName = amqConnectionModel.getAuthorizedPrincipal() == null - ? "" - : amqConnectionModel.getAuthorizedPrincipal().getName(); - if (userId.equals(userName) && !amqConnectionModel.isSessionNameUnique(name)) + if (amqConnectionModel instanceof ServerConnection) { - return false; + ServerConnection otherConnection = (ServerConnection)amqConnectionModel; + + final String userName = amqConnectionModel.getAuthorizedPrincipal() == null + ? "" + : amqConnectionModel.getAuthorizedPrincipal().getName(); + if (userId.equals(userName) && otherConnection.hasSessionWithName(name)) + { + return false; + } } } return true; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 87becd955d..16ea23b765 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -857,9 +857,6 @@ public class AMQChannel return false; } - /** - * Called from the protocol session to close this channel and clean up. T - */ @Override public void close() { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 3cd5e7bba2..cf61e135b0 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -93,7 +93,6 @@ import org.apache.qpid.transport.SenderClosedException; import org.apache.qpid.transport.SenderException; import org.apache.qpid.transport.TransportException; import org.apache.qpid.transport.network.NetworkConnection; -import org.apache.qpid.util.BytesDataOutput; public class AMQProtocolEngine implements ServerProtocolEngine, AMQConnectionModel<AMQProtocolEngine, AMQChannel>, @@ -117,9 +116,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, // to save boxing the channelId and looking up in a map... cache in an array the low numbered // channels. This value must be of the form 2^x - 1. private static final int CHANNEL_CACHE_SIZE = 0xff; - private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024; - public static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; - public static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; + private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = "broker.debug.binaryDataLength"; + private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80; private static final long AWAIT_CLOSED_TIMEOUT = 60000; private final AmqpPort<?> _port; private final long _creationTime; @@ -156,10 +154,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private volatile boolean _closed; - // maximum number of channels this session should have private long _maxNoOfChannels; - /* AMQP Version for this session */ private ProtocolVersion _protocolVersion = ProtocolVersion.getLatestSupportedVersion(); private final MethodRegistry _methodRegistry = new MethodRegistry(_protocolVersion); private final List<Action<? super AMQProtocolEngine>> _connectionCloseTaskList = @@ -172,7 +168,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private ProtocolOutputConverter _protocolOutputConverter; private final Subject _authorizedSubject = new Subject(); - private final long _connectionID; + private final long _connectionId; private Object _reference = new Object(); private LogSubject _logSubject; @@ -190,7 +186,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, private ByteBufferSender _sender; private volatile boolean _deferFlush; - private long _lastReceivedTime = System.currentTimeMillis(); // TODO consider if this is what we want? + private long _lastReceivedTime = System.currentTimeMillis(); private boolean _blocking; private AtomicLong _lastWriteTime = new AtomicLong(System.currentTimeMillis()); @@ -247,7 +243,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _transport = transport; _maxNoOfChannels = broker.getConnection_sessionCountLimit(); _decoder = new BrokerDecoder(this); - _connectionID = connectionId; + _connectionId = connectionId; _logSubject = new ConnectionLogSubject(this); _binaryDataLimit = _broker.getContextKeys(false).contains(BROKER_DEBUG_BINARY_DATA_LENGTH) ? _broker.getContextValue(Integer.class, BROKER_DEBUG_BINARY_DATA_LENGTH) @@ -272,11 +268,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return null; } }); - - _messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID()); - _dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID()); - _messagesReceived = new StatisticsCounter("messages-received-" + getSessionID()); - _dataReceived = new StatisticsCounter("data-received-" + getSessionID()); + + _messagesDelivered = new StatisticsCounter("messages-delivered-" + _connectionId); + _dataDelivered = new StatisticsCounter("data-delivered-" + _connectionId); + _messagesReceived = new StatisticsCounter("messages-received-" + _connectionId); + _dataReceived = new StatisticsCounter("data-received-" + _connectionId); _creationTime = System.currentTimeMillis(); } @@ -323,11 +319,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _sender = sender; } - public long getSessionID() - { - return _connectionID; - } - public void setMaxFrameSize(int frameMax) { _maxFrameSize = frameMax; @@ -368,7 +359,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, + _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY) + "ms to establish identity. Closing as possible DoS."); getEventLogger().message(ConnectionMessages.IDLE_CLOSE()); - closeProtocolSession(); + closeNetworkConnection(); } _lastReceivedTime = arrivalTime; _lastIoTime = arrivalTime; @@ -382,37 +373,37 @@ public class AMQProtocolEngine implements ServerProtocolEngine, catch (ConnectionScopedRuntimeException e) { _logger.error("Unexpected exception", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (AMQProtocolVersionException e) { _logger.error("Unexpected protocol version", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (SenderClosedException e) { _logger.debug("Sender was closed abruptly, closing network.", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (SenderException e) { _logger.info("Unexpected exception on send, closing network.", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (TransportException e) { _logger.error("Unexpected transport exception", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (AMQFrameDecodingException e) { _logger.error("Frame decoding", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (IOException e) { _logger.error("I/O Exception", e); - closeProtocolSession(); + closeNetworkConnection(); } catch (StoreException e) { @@ -484,7 +475,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, ProtocolVersion pv = pi.checkVersion(); // Fails if not correct - // This sets the protocol version (and hence framing classes) for this session. setProtocolVersion(pv); StringBuilder mechanismBuilder = new StringBuilder(); @@ -538,15 +528,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } - private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY]; - private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes); - /** - * Convenience method that writes a frame to the protocol session. Equivalent to calling - * getProtocolSession().write(). - * - * @param frame the frame to write - */ + public synchronized void writeFrame(AMQDataBlock frame) { if(_logger.isDebugEnabled()) @@ -730,12 +713,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, sessionRemoved(session); } - /** - * Initialise heartbeats on the session. - * - * @param delay delay in seconds (not ms) - */ - public void initHeartbeats(int delay) + private void initHeartbeats(int delay) { if (delay > 0) { @@ -749,9 +727,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - /** - * Closes all channels that were opened by this protocol session. This frees up all resources used by the channel. - */ private void closeAllChannels() { try @@ -792,7 +767,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void closeSession(final boolean connectionDropped) + private void closeConnectionInternal(final boolean connectionDropped) { if(runningAsSubject()) @@ -823,7 +798,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, @Override public Object run() { - closeSession(connectionDropped); + closeConnectionInternal(connectionDropped); return null; } }); @@ -921,7 +896,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { markChannelAwaitingCloseOk(channelId); - closeSession(false); + closeConnectionInternal(false); } finally { @@ -931,7 +906,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } finally { - closeProtocolSession(); + closeNetworkConnection(); } } } @@ -941,7 +916,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } } - public void closeProtocolSession() + public void closeNetworkConnection() { _network.close(); } @@ -951,19 +926,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, return getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName() + ")"); } - /** @return an object that can be used to identity */ - public Object getKey() - { - return getRemoteAddress(); - } - - /** - * Get the fully qualified domain name of the local address to which this session is bound. Since some servers may - * be bound to multiple addresses this could vary depending on the acceptor this session was created from. - * - * @return a String FQDN - */ - public String getLocalFQDN() + private String getLocalFQDN() { SocketAddress address = _network.getLocalAddress(); if (address instanceof InetSocketAddress) @@ -1149,11 +1112,11 @@ public class AMQProtocolEngine implements ServerProtocolEngine, { try { - closeSession(true); + closeConnectionInternal(true); } finally { - closeProtocolSession(); + closeNetworkConnection(); } } catch (ConnectionScopedRuntimeException | TransportException e) @@ -1311,7 +1274,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, public long getConnectionId() { - return getSessionID(); + return _connectionId; } public String getAddress() @@ -1452,12 +1415,6 @@ public class AMQProtocolEngine implements ServerProtocolEngine, _dataReceived.reset(); } - public boolean isSessionNameUnique(byte[] name) - { - // 0-8/0-9/0-9-1 sessions don't have names - return true; - } - public String getRemoteAddressString() { return String.valueOf(getRemoteAddress()); @@ -1640,19 +1597,20 @@ public class AMQProtocolEngine implements ServerProtocolEngine, } try { - closeSession(false); + closeConnectionInternal(false); + + MethodRegistry methodRegistry = getMethodRegistry(); + ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); + writeFrame(responseBody.generateFrame(0)); } catch (Exception e) { - _logger.error("Error closing protocol session: " + e, e); + _logger.error("Error closing connection for " + getRemoteAddressString(), e); + } + finally + { + closeNetworkConnection(); } - - MethodRegistry methodRegistry = getMethodRegistry(); - ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody(); - writeFrame(responseBody.generateFrame(0)); - - closeProtocolSession(); - } @Override @@ -1667,12 +1625,17 @@ public class AMQProtocolEngine implements ServerProtocolEngine, try { - closeSession(false); + closeConnectionInternal(false); } catch (Exception e) { - _logger.error("Error closing protocol session: " + e, e); + _logger.error("Error closing connection: " + getRemoteAddressString(), e); } + finally + { + closeNetworkConnection(); + } + } @Override @@ -1692,7 +1655,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, SaslServer ss = getSaslServer(); if (ss == null) { - closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in session",0 ); + closeConnection(AMQConstant.INTERNAL_ERROR, "No SASL context set up in connection",0 ); } MethodRegistry methodRegistry = getMethodRegistry(); SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, response); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 3e5da53d80..5f7d5fe46e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -377,7 +377,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen { String subscriber = "[channel=" + _channel + ", consumerTag=" + _consumerTag + - ", session=" + getProtocolSession().getKey() ; + ", session=" + getConnection().getRemoteAddressString(); return subscriber + "]"; } @@ -450,7 +450,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen return _consumerTag; } - public AMQProtocolEngine getProtocolSession() + private AMQProtocolEngine getConnection() { return _channel.getConnection(); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java index 4a84ccad37..15ce9262fc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java @@ -365,7 +365,7 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter return _underlyingBody.writePayload(sender); } - public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) + public void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException { throw new AMQException("This block should never be dispatched!"); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 67c51c78ed..7e68bee661 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -218,7 +218,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr } } - public void closeProtocolSession() + public void closeNetworkConnection() { // Override as we don't have a real IOSession to close. // The alternative is to fully implement the TestIOSession to return a CloseFuture from close(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java index c6f7defe56..459fc94484 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/MaxChannelsTest.java @@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.test.utils.QpidTestCase; -/** Test class to test MBean operations for AMQMinaProtocolSession. */ public class MaxChannelsTest extends QpidTestCase { private AMQProtocolEngine _session; @@ -62,7 +61,6 @@ public class MaxChannelsTest extends QpidTestCase try { _session.getVirtualHost().close(); - _session.closeSession(false); } finally { diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index b515fda4a7..ff8f642c0d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -324,12 +324,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod } @Override - public boolean isSessionNameUnique(byte[] name) - { - return true; // TODO - } - - @Override public String getRemoteAddressString() { return String.valueOf(_conn.getRemoteAddress()); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 5020ffe3cd..17e306f5bd 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -212,7 +212,7 @@ public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implem if(port.getState() != State.ACTIVE) { - // TODO - RG + // TODO - RG - probably does nothing port.startAsync(); } Connector connector = null; |
