diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-02-14 10:52:47 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-02-14 10:52:47 +0000 |
| commit | 50b314a51a2c787fcd412a84cb8464f72e3868b4 (patch) | |
| tree | fa6e85db6da742fbb9b235ca3e1d036d288ae970 /qpid/java/broker-plugins | |
| parent | 08b64b592cb844cbd746b33e5f17c94b2158a115 (diff) | |
| download | qpid-python-50b314a51a2c787fcd412a84cb8464f72e3868b4.tar.gz | |
QPID-5551 : Remove uses of AMQException, add ServerScopedRuntimeException and ConnectionScopedRuntimeException
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1568235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
48 files changed, 335 insertions, 448 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index c1ee0bc108..ff4bd1dc2e 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.protocol.v0_10; -import org.apache.qpid.AMQException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.LogActor; @@ -31,13 +30,13 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.transport.*; import java.util.Collections; @@ -507,7 +506,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _creditManager = new WindowCreditManager(0l,0l); break; default: - throw new RuntimeException("Unknown message flow mode: " + flowMode); + // this should never happen, as 0-10 is finalised and so the enum should never change + throw new ConnectionScopedRuntimeException("Unknown message flow mode: " + flowMode); } _flowMode = flowMode; updateState(State.ACTIVE, State.SUSPENDED); @@ -532,7 +532,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC } } - public void flush() throws AMQException + public void flush() { flushCreditState(true); getConsumer().flush(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java index 32ecc6bd0e..df4c398115 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -183,7 +185,7 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M } catch (IOException e) { - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java index bc5f8899f2..68997bbb01 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.message.internal.InternalMessageMetaData; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -247,11 +248,11 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess } catch (TypedBytesFormatException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } catch (EOFException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); // TODO - Implement } } return list; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 66ffd1ef94..f94026d0af 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -28,13 +28,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.security.auth.Subject; -import org.apache.qpid.AMQException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.actors.AMQPConnectionActor; import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ConnectionMessages; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; @@ -44,6 +42,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Connection; import org.apache.qpid.transport.ConnectionCloseCode; @@ -52,7 +51,6 @@ import org.apache.qpid.transport.ExecutionException; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.ProtocolEvent; import org.apache.qpid.transport.Session; -import org.apache.qpid.transport.network.NetworkConnection; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT; @@ -199,7 +197,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, _onOpenTask = task; } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) { ExecutionException ex = new ExecutionException(); ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR; @@ -224,6 +222,26 @@ public class ServerConnection extends Connection implements AMQConnectionModel, } @Override + public void exception(final Throwable t) + { + try + { + super.exception(t); + } + finally + { + if(t instanceof Error) + { + throw (Error) t; + } + if(t instanceof ServerScopedRuntimeException) + { + throw (ServerScopedRuntimeException) t; + } + } + } + + @Override public void received(ProtocolEvent event) { _lastIoTime.set(System.currentTimeMillis()); @@ -294,7 +312,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel, return _actor; } - public void close(AMQConstant cause, String message) throws AMQException + public void close(AMQConstant cause, String message) { closeSubscriptions(); ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL; diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index 87a02b99c1..7e2d614b99 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -41,8 +41,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; +import org.apache.qpid.server.store.AMQStoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; @@ -148,7 +147,7 @@ public class ServerSession extends Session _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() { @Override - public void doTimeoutAction(String reason) throws AMQException + public void doTimeoutAction(String reason) { getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason); } @@ -679,7 +678,7 @@ public class ServerSession extends Session return (LogSubject) this; } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) { _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 3d50da6ed5..3961bcb080 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -25,11 +25,11 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQStoreException; -import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.server.store.AMQStoreException; +import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.HeadersExchange; +import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; import org.apache.qpid.server.logging.messages.ExchangeMessages; @@ -66,7 +66,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; +import org.apache.qpid.server.virtualhost.QueueExistsException; import org.apache.qpid.transport.*; import java.nio.ByteBuffer; @@ -253,7 +253,7 @@ public class ServerSessionDelegate extends SessionDelegate { filterManager = FilterManagerFactory.createManager(method.getArguments()); } - catch (AMQException amqe) + catch (AMQInvalidArgumentException amqe) { exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager"); return; @@ -298,10 +298,6 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively"); } - catch (AMQException e) - { - exception(session, method, e, "Cannot subscribe to queue '" + queueName + "' with destination '" + destination); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -435,15 +431,7 @@ public class ServerSessionDelegate extends SessionDelegate } else { - - try - { - sub.flush(); - } - catch (AMQException e) - { - exception(session, method, e, "Cannot flush subscription '" + destination); - } + sub.flush(); } } @@ -787,10 +775,6 @@ public class ServerSessionDelegate extends SessionDelegate + " to " + method.getAlternateExchange() +"."); } } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare exchange '" + exchangeName); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -801,26 +785,6 @@ public class ServerSessionDelegate extends SessionDelegate } - // TODO decouple AMQException and AMQConstant error codes - private void exception(Session session, Method method, AMQException exception, String message) - { - ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR; - if (exception.getErrorCode() != null) - { - try - { - errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode()); - } - catch (IllegalArgumentException iae) - { - // ignore, already set to INTERNAL_ERROR - } - } - String description = message + "': " + exception.getMessage(); - - exception(session, method, errorCode, description); - } - private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description) { ExecutionException ex = new ExecutionException(); @@ -903,10 +867,6 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted"); } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete exchange '" + method.getExchange() ); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -1001,10 +961,6 @@ public class ServerSessionDelegate extends SessionDelegate { exchange.addBinding(method.getBindingKey(), queue, method.getArguments()); } - catch (AMQException e) - { - exception(session, method, e, "Cannot add binding '" + method.getBindingKey()); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -1058,10 +1014,6 @@ public class ServerSessionDelegate extends SessionDelegate { exchange.removeBinding(method.getBindingKey(), queue, null); } - catch (AMQException e) - { - exception(session, method, e, "Cannot remove binding '" + method.getBindingKey()); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -1289,10 +1241,6 @@ public class ServerSessionDelegate extends SessionDelegate { virtualHost.removeQueue(q); } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete '" + method.getQueue()); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -1345,10 +1293,6 @@ public class ServerSessionDelegate extends SessionDelegate exception(session, method, errorCode, description); } } - catch (AMQException e) - { - exception(session, method, e, "Cannot declare queue '" + queueName); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -1426,10 +1370,6 @@ public class ServerSessionDelegate extends SessionDelegate { virtualHost.removeQueue(queue); } - catch (AMQException e) - { - exception(session, method, e, "Cannot delete queue '" + queueName); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); @@ -1461,10 +1401,6 @@ public class ServerSessionDelegate extends SessionDelegate { queue.clearQueue(); } - catch (AMQException e) - { - exception(session, method, e, "Cannot purge queue '" + queueName); - } catch (QpidSecurityException e) { exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 4eeb9d8fb2..b15b3f0bfa 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock; import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; +import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQMethodBody; @@ -81,6 +82,7 @@ import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.TransportException; @@ -186,9 +188,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction() { @Override - public void doTimeoutAction(String reason) throws AMQException + public void doTimeoutAction(String reason) { - closeConnection(reason); + try + { + closeConnection(reason); + } + catch (AMQException e) + { + throw new ConnectionScopedRuntimeException(e); + } } }); } @@ -516,7 +525,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) - throws AMQException, QpidSecurityException + throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException { if (tag == null) { @@ -579,17 +589,22 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F AMQShortString.toString(tag), options); } - catch (AMQException e) + catch (QpidSecurityException e) { _tag2SubscriptionTargetMap.remove(tag); throw e; } - catch (RuntimeException e) + catch (MessageSource.ExistingExclusiveConsumer e) { _tag2SubscriptionTargetMap.remove(tag); throw e; } - catch (QpidSecurityException e) + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + _tag2SubscriptionTargetMap.remove(tag); + throw e; + } + catch (AMQInvalidArgumentException e) { _tag2SubscriptionTargetMap.remove(tag); throw e; @@ -601,9 +616,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * Unsubscribe a consumer from a queue. * @param consumerTag * @return true if the consumerTag had a mapped queue that could be unregistered. - * @throws AMQException */ - public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException + public boolean unsubscribeConsumer(AMQShortString consumerTag) { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); @@ -622,16 +636,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F /** * Called from the protocol session to close this channel and clean up. T - * - * @throws AMQException if there is an error during closure */ @Override - public void close() throws AMQException + public void close() { close(null, null); } - public void close(AMQConstant cause, String message) throws AMQException + public void close(AMQConstant cause, String message) { if(!_closing.compareAndSet(false, true)) { @@ -651,17 +663,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F { requeue(); } - catch (AMQException e) - { - _logger.error("Caught AMQException whilst attempting to requeue:" + e); - } catch (TransportException e) { _logger.error("Caught TransportException whilst attempting to requeue:" + e); } } - private void unsubscribeAllConsumers() throws AMQException + private void unsubscribeAllConsumers() { if (_logger.isInfoEnabled()) { @@ -724,9 +732,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to * this same channel or to other subscribers. * - * @throws org.apache.qpid.AMQException if the requeue fails */ - public void requeue() throws AMQException + public void requeue() { // we must create a new map since all the messages will get a new delivery tag when they are redelivered Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages(); @@ -756,9 +763,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F * * @param deliveryTag The message to requeue * - * @throws AMQException If something goes wrong. */ - public void requeue(long deliveryTag) throws AMQException + public void requeue(long deliveryTag) { MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag); @@ -1455,7 +1461,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F return getProtocolSession().getVirtualHost(); } - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) { _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 329aa396b0..5e95701e5a 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -46,28 +46,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQChannelException; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.*; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; -import org.apache.qpid.framing.AMQBody; -import org.apache.qpid.framing.AMQDataBlock; -import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.AMQMethodBody; -import org.apache.qpid.framing.AMQProtocolHeaderException; -import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.ChannelCloseBody; -import org.apache.qpid.framing.ChannelCloseOkBody; -import org.apache.qpid.framing.ConnectionCloseBody; -import org.apache.qpid.framing.ContentBody; -import org.apache.qpid.framing.ContentHeaderBody; -import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.framing.FieldTableFactory; -import org.apache.qpid.framing.HeartbeatBody; -import org.apache.qpid.framing.MethodDispatcher; -import org.apache.qpid.framing.MethodRegistry; -import org.apache.qpid.framing.ProtocolInitiation; -import org.apache.qpid.framing.ProtocolVersion; import org.apache.qpid.properties.ConnectionStartProperties; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; @@ -95,6 +78,8 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.TransportException; @@ -303,9 +288,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } receivedComplete(); } - catch (Exception e) + catch (ConnectionScopedRuntimeException e) + { + _logger.error("Unexpected exception", e); + closeProtocolSession(); + } + catch (AMQProtocolVersionException e) + { + _logger.error("Unexpected protocol version", e); + closeProtocolSession(); + } + catch (AMQFrameDecodingException e) { - _logger.error("Unexpected exception when processing datablocks", e); + _logger.error("Frame decoding", e); + closeProtocolSession(); + } + catch (IOException e) + { + _logger.error("I/O Exception", e); closeProtocolSession(); } finally @@ -314,34 +314,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - private void receivedComplete() throws AMQException + private void receivedComplete() { - Exception exception = null; for (AMQChannel channel : _channelsForCurrentMessage) { - try - { - channel.receivedComplete(); - } - catch(Exception exceptionForThisChannel) - { - if(exception == null) - { - exception = exceptionForThisChannel; - } - _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel); - } + channel.receivedComplete(); } _channelsForCurrentMessage.clear(); - - if(exception != null) - { - throw new AMQException( - AMQConstant.INTERNAL_ERROR, - "Error informing channel that receiving is complete: " + exception.getMessage(), - exception); - } } /** @@ -549,7 +529,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } catch (IOException e) { - throw new RuntimeException(e); + throw new ServerScopedRuntimeException(e); } final ByteBuffer buf; @@ -812,16 +792,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * @param channelId id of the channel to close * - * @throws AMQException if an error occurs closing the channel * @throws IllegalArgumentException if the channel id is not valid */ @Override - public void closeChannel(int channelId) throws AMQException + public void closeChannel(int channelId) { closeChannel(channelId, null, null); } - public void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException + public void closeChannel(int channelId, AMQConstant cause, String message) { final AMQChannel channel = getChannel(channelId); if (channel == null) @@ -903,7 +882,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi * * @throws AMQException if an error occurs while closing any channel */ - private void closeAllChannels() throws AMQException + private void closeAllChannels() { for (AMQChannel channel : getChannels()) { @@ -921,7 +900,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi /** This must be called when the session is _closed in order to free up any resources managed by the session. */ @Override - public void closeSession() throws AMQException + public void closeSession() { if(_closing.compareAndSet(false,true)) { @@ -996,7 +975,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } - private void closeConnection(int channelId, AMQConnectionException e) throws AMQException + private void closeConnection(int channelId, AMQConnectionException e) { try { @@ -1033,7 +1012,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { _stateManager.changeState(AMQState.CONNECTION_CLOSED); } - catch (AMQException e) + catch (ConnectionScopedRuntimeException e) { _logger.info(e.getMessage()); } @@ -1234,9 +1213,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi closeProtocolSession(); } } - catch (AMQException e) + catch (ConnectionScopedRuntimeException e) { - _logger.error("Could not close protocol engine", e); + _logger.error("Could not close protocol engine", e); } catch (TransportException e) { @@ -1269,15 +1248,30 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } else { - _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable); + try + { + _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable); - MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); - ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); + MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion()); + ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0); - writeFrame(closeBody.generateFrame(0)); + writeFrame(closeBody.generateFrame(0)); - _sender.close(); + _sender.close(); + } + finally + { + if(throwable instanceof Error) + { + throw (Error) throwable; + } + if(throwable instanceof ServerScopedRuntimeException) + { + throw (ServerScopedRuntimeException) throwable; + } + + } } } @@ -1441,15 +1435,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi { writeFrame(responseBody.generateFrame(0)); - try - { + closeSession(); - closeSession(); - } - catch (AMQException ex) - { - throw new RuntimeException(ex); - } } finally { @@ -1483,15 +1470,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi try { writeFrame(responseBody.generateFrame(channelId)); - - try - { - closeChannel(channelId); - } - catch (AMQException ex) - { - throw new RuntimeException(ex); - } + closeChannel(channelId); } finally { @@ -1507,7 +1486,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return getContextKey().toString(); } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) { int channelId = ((AMQChannel)session).getChannelId(); closeChannel(channelId, cause, message); @@ -1522,7 +1501,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi writeFrame(responseBody.generateFrame(channelId)); } - public void close(AMQConstant cause, String message) throws AMQException + public void close(AMQConstant cause, String message) { closeConnection(0, new AMQConnectionException(cause, message, 0, 0, getProtocolOutputConverter().getProtocolMajorVersion(), diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java index 6bcd4b9d49..58a3b5df12 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java @@ -71,7 +71,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth public static interface Task { - public void doTask(AMQProtocolSession session) throws AMQException; + public void doTask(AMQProtocolSession session); } /** @@ -152,7 +152,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth void initHeartbeats(int delay); /** This must be called when the session is _closed in order to free up any resources managed by the session. */ - void closeSession() throws AMQException; + void closeSession(); void closeProtocolSession(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java index b80ad3e7b8..f2bb95c8d5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.codec.BBEncoder; @@ -236,7 +237,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter } catch (IOException e) { - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java index 6076ff66c7..f35d37ecbd 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; @@ -124,11 +125,11 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe } catch (TypedBytesFormatException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } catch (EOFException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } } return list; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java index 3665e7f135..fd7fb9ca80 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.v0_8; import java.util.Collection; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.AMQFrameDecodingException; +import org.apache.qpid.framing.AMQProtocolVersionException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; @@ -32,6 +34,7 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.util.ByteBufferOutputStream; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.util.ByteBufferInputStream; import java.io.DataInputStream; @@ -132,7 +135,7 @@ public class MessageMetaData implements StorableMessageMetaData catch (IOException e) { // This shouldn't happen as we are not actually using anything that can throw an IO Exception - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } return dest.position()-oldPosition; @@ -196,17 +199,21 @@ public class MessageMetaData implements StorableMessageMetaData }; return new MessageMetaData(publishBody, chb, arrivalTime); } - catch (AMQException e) + catch (IOException e) { - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } - catch (IOException e) + catch (AMQProtocolVersionException e) { - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); + } + catch (AMQFrameDecodingException e) + { + throw new ConnectionScopedRuntimeException(e); } } - }; + } public AMQMessageHeader getMessageHeader() { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java index ad4235b786..ce90de7aac 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java @@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicConsumeBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; @@ -149,7 +150,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic } } - catch (org.apache.qpid.AMQInvalidArgumentException ise) + catch (AMQInvalidArgumentException ise) { _logger.debug("Closing connection due to invalid selector"); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index f55a120a2d..c9a7cc69a1 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -31,6 +31,7 @@ import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.flow.FlowCreditManager; @@ -124,6 +125,17 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); } + catch (MessageSource.ExistingExclusiveConsumer e) + { + throw body.getConnectionException(AMQConstant.NOT_ALLOWED, + "Queue has an exclusive consumer"); + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + throw body.getConnectionException(AMQConstant.INTERNAL_ERROR, + "The GET request has been evaluated as an exclusive consumer, " + + "this is likely due to a programming error in the Qpid broker"); + } } } } @@ -132,7 +144,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final AMQProtocolSession session, final AMQChannel channel, final boolean acks) - throws AMQException, QpidSecurityException + throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive, + MessageSource.ExistingExclusiveConsumer { final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java index 442c912032..2594242db4 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQChannel; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.io.ByteArrayOutputStream; @@ -99,7 +100,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB catch (IOException e) { // This *really* shouldn't happen as we're not doing any I/O - throw new RuntimeException("I/O exception when writing to byte array", e); + throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e); } // should really associate this channelId to the session @@ -123,7 +124,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB catch (IOException e) { // This *really* shouldn't happen as we're not doing any I/O - throw new RuntimeException("I/O exception when writing to byte array", e); + throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e); } // should really associate this channelId to the session diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java index fa513486a4..5c5b1f141b 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; -import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.server.exchange.AMQUnknownExchangeType; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ExchangeDeclareBody; @@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.virtualhost.ExchangeExistsException; import org.apache.qpid.server.virtualhost.ReservedExchangeNameException; +import org.apache.qpid.server.virtualhost.UnknownExchangeException; import org.apache.qpid.server.virtualhost.VirtualHost; public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody> @@ -129,6 +130,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange { throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage()); } + catch (UnknownExchangeException e) + { + // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java index 1286a20970..5b5525643c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java @@ -34,21 +34,17 @@ import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueArgumentsConverter; -import org.apache.qpid.server.queue.QueueRegistry; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.security.QpidSecurityException; -import org.apache.qpid.server.store.DurableConfigurationStoreHelper; -import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; import java.util.UUID; -import org.apache.qpid.server.virtualhost.plugins.QueueExistsException; +import org.apache.qpid.server.virtualhost.QueueExistsException; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> { @@ -131,7 +127,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final AMQQueue q = queue; final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task() { - public void doTask(AMQProtocolSession session) throws AMQException + public void doTask(AMQProtocolSession session) { q.setExclusiveOwningSession(null); } @@ -219,7 +215,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar QueueDeclareBody body, final VirtualHost virtualHost, final AMQProtocolSession session) - throws AMQException, QpidSecurityException + throws AMQException, QpidSecurityException, QueueExistsException { final boolean durable = body.getDurable(); @@ -241,7 +237,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar final AMQProtocolSession.Task deleteQueueTask = new AMQProtocolSession.Task() { - public void doTask(AMQProtocolSession session) throws AMQException + public void doTask(AMQProtocolSession session) { if (virtualHost.getQueue(queueName.toString()) == queue) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java index 0555bba98b..c3cbb5e26f 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java @@ -52,8 +52,6 @@ public class AMQStateManager implements AMQMethodListener /** The current state */ private AMQState _currentState; - private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>(); - public AMQStateManager(Broker broker, AMQProtocolSession protocolSession) { _broker = broker; @@ -72,30 +70,17 @@ public class AMQStateManager implements AMQMethodListener return _broker; } - public AMQState getCurrentState() - { - return _currentState; - } - - public void changeState(AMQState newState) throws AMQException + public void changeState(AMQState newState) { _logger.debug("State changing to " + newState + " from old state " + _currentState); final AMQState oldState = _currentState; _currentState = newState; - for (StateListener l : _stateListeners) - { - l.stateChanged(oldState, newState); - } } public void error(Exception e) { _logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e); - for (StateListener l : _stateListeners) - { - l.error(e); - } } public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException @@ -121,28 +106,6 @@ public class AMQStateManager implements AMQMethodListener } - private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession) - throws AMQException - { - if ((evt.getChannelId() != 0) && !(evt.getMethod() instanceof ChannelOpenBody) - && (protocolSession.getChannel(evt.getChannelId()) == null) - && !protocolSession.channelAwaitingClosure(evt.getChannelId())) - { - throw evt.getMethod().getChannelNotFoundException(evt.getChannelId()); - } - } - - public void addStateListener(StateListener listener) - { - _logger.debug("Adding state listener"); - _stateListeners.add(listener); - } - - public void removeStateListener(StateListener listener) - { - _stateListeners.remove(listener); - } - public VirtualHostRegistry getVirtualHostRegistry() { return _broker.getVirtualHostRegistry(); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java deleted file mode 100644 index f61553f8a2..0000000000 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v0_8.state; - -import org.apache.qpid.AMQException; - -/** - * @todo Not an AMQP exception as no status code. - * - * @todo Not used! Delete. - */ -public class IllegalStateTransitionException extends AMQException -{ - private AMQState _originalState; - - private Class _frame; - - public IllegalStateTransitionException(AMQState originalState, Class frame) - { - super("No valid state transition defined for receiving frame " + frame + " from state " + originalState); - _originalState = originalState; - _frame = frame; - } - - public AMQState getOriginalState() - { - return _originalState; - } - - public Class getFrameClass() - { - return _frame; - } -} diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 23c32f988d..8d7de4cd93 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -139,28 +139,20 @@ public class AckTest extends QpidTestCase final StoredMessage storedMessage = _messageStore.addMessage(mmd); final AMQMessage message = new AMQMessage(storedMessage); ServerTransaction txn = new AutoCommitTransaction(_messageStore); - txn.enqueue(_queue, message, new ServerTransaction.Action() { - public void postCommit() - { - try - { - - _queue.enqueue(message,null); - } - catch (AMQException e) - { - throw new RuntimeException(e); - } - } - - public void onRollback() - { - //To change body of implemented methods use File | Settings | File Templates. - } - }); + txn.enqueue(_queue, message, + new ServerTransaction.Action() + { + public void postCommit() + { + _queue.enqueue(message,null); + } + + public void onRollback() + { + //To change body of implemented methods use File | Settings | File Templates. + } + }); - // we manually send the message to the subscription - //_subscription.send(new QueueEntry(_queue,msg), _queue); } try { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index 4a2fbe44fa..7661d98cb4 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -223,7 +223,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr // Then the AMQMinaProtocolSession can join on the returning future without a NPE. } - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) { super.closeSession(session, cause, message); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 41e2fef03f..cae61f9d80 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0; import java.text.MessageFormat; import java.util.Collection; -import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener; import org.apache.qpid.amqp_1_0.transport.SessionEndpoint; @@ -152,7 +151,7 @@ public class Connection_1_0 implements ConnectionEventListener private volatile boolean _stopped; @Override - public void close(AMQConstant cause, String message) throws AMQException + public void close(AMQConstant cause, String message) { _conn.close(); } @@ -170,7 +169,7 @@ public class Connection_1_0 implements ConnectionEventListener } @Override - public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException + public void closeSession(AMQSessionModel session, AMQConstant cause, String message) { // TODO } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index dbfb8f5fc4..f3417710a5 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.codec.ValueHandler; import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; @@ -46,6 +45,7 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import java.nio.ByteBuffer; import java.util.List; @@ -187,7 +187,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget catch (AmqpErrorException e) { //TODO - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } Header header = new Header(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index 3d030890e0..5356a6e6a3 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -20,8 +20,6 @@ */ package org.apache.qpid.server.protocol.v1_0; -import java.util.List; -import org.apache.qpid.AMQException; import org.apache.qpid.amqp_1_0.type.Outcome; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.Rejected; @@ -29,8 +27,6 @@ import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.txn.ServerTransaction; public class ExchangeDestination implements ReceivingDestination, SendingDestination diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java index f02908391a..1764eec84d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java @@ -31,6 +31,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.Properties; import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -132,7 +133,7 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I } catch (IOException e) { - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index a8a203b247..40f738b8e7 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; @@ -140,11 +141,11 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement } catch (TypedBytesFormatException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } catch (EOFException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } } return new AmqpValue(list); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java index f639f98dba..ec6d5a924c 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java @@ -30,6 +30,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; import org.apache.qpid.amqp_1_0.type.messaging.Data; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.codec.BBDecoder; import org.apache.qpid.typedmessage.TypedBytesContentReader; @@ -96,7 +97,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa { if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue)) { - throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence"); + throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence"); } else { @@ -149,7 +150,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa } catch (AmqpErrorException e) { - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } @@ -257,11 +258,11 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa } catch (TypedBytesFormatException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } catch (EOFException e) { - throw new RuntimeException(e); // TODO - Implement + throw new ConnectionScopedRuntimeException(e); } } return list; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index be9d7a2d60..f28e25e080 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Properties; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; public class MessageMetaData_1_0 implements StorableMessageMetaData { @@ -394,7 +395,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData catch (AmqpErrorException e) { //TODO - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java index d614f44981..58c3597a5c 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; +import java.io.IOException; import java.io.PrintWriter; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -40,8 +41,11 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; import org.apache.qpid.amqp_1_0.transport.Container; import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.ErrorCondition; import org.apache.qpid.amqp_1_0.type.FrameBody; import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.common.QpidProperties; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.protocol.ServerProtocolEngine; @@ -51,12 +55,16 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.v1_0.Connection_1_0; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.UsernamePrincipal; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler { + private static final org.apache.log4j.Logger + _logger = org.apache.log4j.Logger.getLogger(ProtocolEngine_1_0_0_SASL.class); + private final Port _port; private final Transport _transport; private long _readBytes; @@ -353,9 +361,35 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut } - public void exception(Throwable t) + public void exception(Throwable throwable) { - t.printStackTrace(); + if (throwable instanceof IOException) + { + _logger.info("IOException caught in " + this + ", connection closed implicitly: " + throwable); + } + else + { + + try + { + final Error err = new Error(); + err.setCondition(AmqpError.INTERNAL_ERROR); + err.setDescription(throwable.getMessage()); + _conn.close(err); + close(); + } + finally + { + if(throwable instanceof java.lang.Error) + { + throw (java.lang.Error) throwable; + } + if(throwable instanceof ServerScopedRuntimeException) + { + throw (ServerScopedRuntimeException) throwable; + } + } + } } public void closed() diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java index 3d6bb5e3db..4fefbe9f7d 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java @@ -49,37 +49,22 @@ public class QueueDestination extends MessageSourceDestination implements Sendin public Outcome send(final Message_1_0 message, ServerTransaction txn) { - try + txn.enqueue(getQueue(),message, new ServerTransaction.Action() { - txn.enqueue(getQueue(),message, new ServerTransaction.Action() + + + public void postCommit() { + getQueue().enqueue(message,null); + } + + public void onRollback() + { + // NO-OP + } + }); - public void postCommit() - { - try - { - getQueue().enqueue(message,null); - } - catch (Exception e) - { - // TODO - throw new RuntimeException(e); - } - - } - - public void onRollback() - { - // NO-OP - } - }); - } - catch(Exception e) - { - _logger.error("Send error", e); - throw new RuntimeException(e); - } return ACCEPTED; } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 56b574685f..546cc79f9e 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -30,8 +30,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; -import org.apache.qpid.AMQException; -import org.apache.qpid.AMQInternalException; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; @@ -64,7 +62,9 @@ import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.QueueExistsException; public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler { @@ -324,11 +324,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _vhost.removeQueue(tempQueue); } - catch (AMQException e) - { - //TODO - _logger.error("Error removing queue", e); - } catch (QpidSecurityException e) { //TODO @@ -356,17 +351,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS catch (QpidSecurityException e) { _logger.error("Security error", e); - throw new RuntimeException(e); - } - catch (AMQInternalException e) - { - _logger.error("Internal error", e); - throw new RuntimeException(e); + throw new ConnectionScopedRuntimeException(e); } - catch (AMQException e) + catch (QueueExistsException e) { - _logger.error("Error", e); - throw new RuntimeException(e); + _logger.error("A randomly generated temporary queue name collided with an existing queue",e); + throw new ConnectionScopedRuntimeException(e); } @@ -377,7 +367,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } else { - throw new RuntimeException("Unknown destination type"); + throw new ConnectionScopedRuntimeException("Unknown destination type"); } if(_target != null) @@ -403,15 +393,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS messageFilter == null ? null : new SimpleFilterManager(messageFilter), Message_1_0.class, name, options); } - catch (AMQException e) + catch (QpidSecurityException e) { //TODO - _logger.error("Error registering subscription", e); + _logger.info("Error registering subscription", e); + throw new ConnectionScopedRuntimeException(e); } - catch (QpidSecurityException e) + catch (MessageSource.ExistingExclusiveConsumer e) { - //TODO - _logger.error("Error registering subscription", e); + _logger.info("Cannot add a consumer to the destination as there is already an exclusive consumer"); + throw new ConnectionScopedRuntimeException(e); + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer"); + throw new ConnectionScopedRuntimeException(e); } } @@ -429,18 +425,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS // if not durable or close if(!TerminusDurability.UNSETTLED_STATE.equals(_durability)) { - - try - { - - _consumer.close(); - - } - catch (AMQException e) - { - //TODO - _logger.error("Error unregistering subscription", e); - } + _consumer.close(); Modified state = new Modified(); state.setDeliveryFailed(true); @@ -462,11 +447,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { _vhost.removeQueue((AMQQueue)_queue); } - catch(AMQException e) - { - //TODO - _logger.error("Error registering subscription", e); - } catch (QpidSecurityException e) { //TODO diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index a0b2fc5289..c055d1e840 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -36,7 +36,6 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability; import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.AMQException; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.exchange.Exchange; @@ -51,7 +50,9 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.QueueExistsException; import java.util.*; @@ -357,11 +358,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu { _vhost.removeQueue(tempQueue); } - catch (AMQException e) - { - //TODO - _logger.error("Error removing queue from vhost", e); - } catch (QpidSecurityException e) { //TODO @@ -399,12 +395,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu catch (QpidSecurityException e) { //TODO - _logger.error("Security error", e); + _logger.info("Security error", e); + throw new ConnectionScopedRuntimeException(e); } - catch (AMQException e) + catch (QueueExistsException e) { - //TODO - _logger.error("Error", e); + _logger.error("A temporary queue was created with a name which collided with an existing queue name"); + throw new ConnectionScopedRuntimeException(e); } return queue; @@ -490,14 +487,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public void close() throws AMQException + public void close() { // TODO - required for AMQSessionModel / management initiated closing } @Override - public void close(AMQConstant cause, String message) throws AMQException + public void close(AMQConstant cause, String message) { // TODO - required for AMQSessionModel } @@ -509,7 +506,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu } @Override - public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException + public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) { // TODO - required for AMQSessionModel / long running transaction detection } diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index bc8d157346..d32ea49e60 100644 --- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreConstants; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.util.FileUtils; /** @@ -99,7 +100,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa return "bigint"; } - protected void doClose() throws SQLException + protected void doClose() { try { @@ -117,7 +118,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa else { getLogger().error("Exception whilst shutting down the store: " + e); - throw e; + throw new ServerScopedRuntimeException("Error closing message store", e); } } } @@ -307,7 +308,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa catch (SQLException e) { closeConnection(conn); - throw new RuntimeException("Exception while processing store size change", e); + throw new ServerScopedRuntimeException("Exception while processing store size change", e); } } } @@ -359,7 +360,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa catch (SQLException e) { closeConnection(conn); - throw new RuntimeException("Error reducing on disk size", e); + throw new ServerScopedRuntimeException("Error reducing on disk size", e); } finally { @@ -407,7 +408,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa catch (SQLException e) { closeConnection(conn); - throw new RuntimeException("Error establishing on disk size", e); + throw new ServerScopedRuntimeException("Error establishing on disk size", e); } finally { diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java index 6fdfa40561..3fac83f5a3 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java @@ -30,13 +30,13 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.log4j.Logger; -import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.store.AbstractJDBCMessageStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.util.ServerScopedRuntimeException; /** * An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence @@ -252,7 +252,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } @Override - protected void doClose() throws AMQStoreException + protected void doClose() { while(!_transactions.isEmpty()) { @@ -265,7 +265,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } catch (SQLException e) { - throw new AMQStoreException("Unable to close connection provider ", e); + throw new ServerScopedRuntimeException("Unable to close connection provider ", e); } } @@ -430,7 +430,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } @Override - public void commitTran() throws AMQStoreException + public void commitTran() { try { @@ -443,7 +443,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } @Override - public StoreFuture commitTranAsync() throws AMQStoreException + public StoreFuture commitTranAsync() { try { @@ -456,7 +456,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag } @Override - public void abortTran() throws AMQStoreException + public void abortTran() { try { diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index 4921db5c07..7d76a0ee8e 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.AMQException; import org.apache.qpid.server.security.QpidSecurityException; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; @@ -940,7 +939,7 @@ class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementN final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - final EnumSet<Consumer.Option> options) throws AMQException + final EnumSet<Consumer.Option> options) { final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index ea4b60da13..008f16883a 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.AMQException; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.internal.InternalMessage; @@ -124,7 +123,7 @@ class ManagementNodeConsumer implements Consumer } @Override - public void close() throws AMQException + public void close() { } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 283dec0f30..59ab849848 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.AMQException; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java index 3375a784ea..08b99a206d 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java @@ -76,6 +76,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; import org.apache.qpid.server.plugin.PluginFactory; import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Server; @@ -181,7 +182,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem } catch (Exception e) { - throw new RuntimeException("Failed to start HTTP management on ports : " + httpPorts, e); + throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e); } CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); @@ -198,7 +199,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem } catch (Exception e) { - throw new RuntimeException("Failed to stop HTTP management on ports : " + getHttpPorts(getBroker().getPorts()), e); + throw new ServerScopedRuntimeException("Failed to stop HTTP management on ports : " + getHttpPorts(getBroker().getPorts()), e); } } @@ -310,7 +311,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem } catch (GeneralSecurityException e) { - throw new RuntimeException("Cannot configure port " + port.getName() + " for transport " + Transport.SSL, e); + throw new ServerScopedRuntimeException("Cannot configure port " + port.getName() + " for transport " + Transport.SSL, e); } connector = new SslSocketConnector(factory); diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java index f6674b5152..9a2f0dd1f6 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.security.auth.SubjectAuthenticationResult; import org.apache.qpid.server.security.auth.UsernamePrincipal; import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManager; import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.network.security.ssl.SSLUtil; public class HttpManagementUtil @@ -138,7 +139,7 @@ public class HttpManagementUtil } catch (PrivilegedActionException e) { - throw new RuntimeException("Unable to perform access check", e); + throw new ServerScopedRuntimeException("Unable to perform access check", e); } } finally diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java index ee481ebdbe..0381b711bc 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.management.plugin.HttpManagementConfiguration; import org.apache.qpid.server.management.plugin.HttpManagementUtil; import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.security.SecurityManager; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; @@ -214,7 +215,16 @@ public abstract class AbstractServlet extends HttpServlet catch (PrivilegedActionException e) { LOGGER.error("Unable to perform action", e); - throw new RuntimeException(e.getCause()); + Throwable cause = e.getCause(); + if(cause instanceof RuntimeException) + { + throw (RuntimeException)cause; + } + if(cause instanceof Error) + { + throw (Error)cause; + } + throw new ConnectionScopedRuntimeException(e.getCause()); } finally { @@ -255,7 +265,7 @@ public abstract class AbstractServlet extends HttpServlet } catch (IOException e) { - throw new RuntimeException("Failed to send error response code " + errorCode, e); + throw new ConnectionScopedRuntimeException("Failed to send error response code " + errorCode, e); } } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java index 9ad52007ab..a29a875071 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.management.plugin.servlet.rest; import org.apache.commons.codec.binary.Base64; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -210,7 +211,7 @@ public class SaslServlet extends AbstractServlet } if (!saslAuthEnabled) { - throw new RuntimeException("Sasl authentication disabled."); + throw new ConnectionScopedRuntimeException("Sasl authentication disabled."); } } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java index 32aac51008..e2b9a98784 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java @@ -31,6 +31,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.security.auth.jmx.JMXPasswordAuthenticator; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.ssl.SSLContextFactory; import javax.management.JMException; @@ -134,7 +135,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry } catch (GeneralSecurityException e) { - throw new RuntimeException("Unable to create SSLContext for key store", e); + throw new ServerScopedRuntimeException("Unable to create SSLContext for key store", e); } CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName())); diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java index 18e9f9f809..e418275d7e 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java @@ -54,6 +54,7 @@ import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; import org.apache.qpid.server.plugin.PluginFactory; import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; public class JMXManagement extends AbstractPluginAdapter implements ConfigurationChangeListener { @@ -110,7 +111,7 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio } catch (Exception e) { - throw new RuntimeException("Couldn't start JMX management", e); + throw new ServerScopedRuntimeException("Couldn't start JMX management", e); } return true; } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java index d0c0d5e73f..34366a196c 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.model.Connection; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.Statistics; +import org.apache.qpid.server.util.ServerScopedRuntimeException; public class ConnectionMBean extends AbstractStatisticsGatheringMBean<Connection> implements ManagedConnection { @@ -60,7 +61,7 @@ public class ConnectionMBean extends AbstractStatisticsGatheringMBean<Connection catch (JMException ex) { // This is not expected to ever occur. - throw new RuntimeException("Got JMException in static initializer.", ex); + throw new ServerScopedRuntimeException("Got JMException in static initializer.", ex); } } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java index 3e1a47c431..407da0fd3f 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import javax.management.JMException; import javax.management.MalformedObjectNameException; @@ -106,7 +107,7 @@ public class ExchangeMBean extends AMQManagedObject implements ManagedExchange } catch(OpenDataException e) { - throw new RuntimeException("Unexpected Error creating ArrayType", e); + throw new ServerScopedRuntimeException("Unexpected Error creating ArrayType", e); } } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java index d6f4b5d8c9..848a846911 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.jmx.ManagedObject; import org.apache.qpid.server.jmx.ManagedObjectRegistry; import org.apache.qpid.server.logging.log4j.LoggingManagementFacade; import org.apache.qpid.server.logging.log4j.LoggingFacadeException; +import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import javax.management.JMException; import javax.management.openmbean.CompositeData; @@ -314,7 +315,7 @@ public class LoggingManagementMBean extends AMQManagedObject implements LoggingM catch (OpenDataException ode) { // Should not happen - throw new RuntimeException(ode); + throw new ConnectionScopedRuntimeException(ode); } } diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index d74aa41244..1365ceb06a 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -57,6 +57,7 @@ import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.queue.NotificationCheck; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; +import org.apache.qpid.server.util.ServerScopedRuntimeException; public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener { @@ -109,7 +110,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN } catch (OpenDataException e) { - throw new RuntimeException(e); + throw new ServerScopedRuntimeException(e); } } diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java index fde93a27e3..81dcb05a41 100644 --- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java +++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java @@ -28,6 +28,7 @@ import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AmqpProtocolVersion; import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory; import org.apache.qpid.server.transport.AcceptingTransport; +import org.apache.qpid.server.util.ServerScopedRuntimeException; import org.apache.qpid.transport.Binary; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -144,9 +145,13 @@ class WebSocketProvider implements AcceptingTransport { _server.start(); } + catch(RuntimeException e) + { + throw e; + } catch (Exception e) { - throw new RuntimeException(e); + throw new ServerScopedRuntimeException(e); } } |
