summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-02-14 10:52:47 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-02-14 10:52:47 +0000
commit50b314a51a2c787fcd412a84cb8464f72e3868b4 (patch)
treefa6e85db6da742fbb9b235ca3e1d036d288ae970 /qpid/java/broker-plugins
parent08b64b592cb844cbd746b33e5f17c94b2158a115 (diff)
downloadqpid-python-50b314a51a2c787fcd412a84cb8464f72e3868b4.tar.gz
QPID-5551 : Remove uses of AMQException, add ServerScopedRuntimeException and ConnectionScopedRuntimeException
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1568235 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java28
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java7
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java76
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java50
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java131
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java4
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java19
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java15
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java5
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java8
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java12
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java39
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java52
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java34
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java2
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java5
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java9
-rwxr-xr-xqpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java3
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java38
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java39
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java58
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java23
-rw-r--r--qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java11
-rw-r--r--qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java12
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java3
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java3
-rw-r--r--qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java1
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java7
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java3
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java14
-rw-r--r--qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java3
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java3
-rw-r--r--qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java7
48 files changed, 335 insertions, 448 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index c1ee0bc108..ff4bd1dc2e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v0_10;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.LogActor;
@@ -31,13 +30,13 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.*;
import java.util.Collections;
@@ -507,7 +506,8 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
_creditManager = new WindowCreditManager(0l,0l);
break;
default:
- throw new RuntimeException("Unknown message flow mode: " + flowMode);
+ // this should never happen, as 0-10 is finalised and so the enum should never change
+ throw new ConnectionScopedRuntimeException("Unknown message flow mode: " + flowMode);
}
_flowMode = flowMode;
updateState(State.ACTIVE, State.SUSPENDED);
@@ -532,7 +532,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
}
}
- public void flush() throws AMQException
+ public void flush()
{
flushCreditState(true);
getConsumer().flush();
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
index 32ecc6bd0e..df4c398115 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -183,7 +185,7 @@ public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, M
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
index bc5f8899f2..68997bbb01 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10_to_Internal.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.message.internal.InternalMessageMetaData;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -247,11 +248,11 @@ public class MessageConverter_v0_10_to_Internal implements MessageConverter<Mess
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e); // TODO - Implement
}
}
return list;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 66ffd1ef94..f94026d0af 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -28,13 +28,11 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
-import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.AMQPConnectionActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
@@ -44,6 +42,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionCloseCode;
@@ -52,7 +51,6 @@ import org.apache.qpid.transport.ExecutionException;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.network.NetworkConnection;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
@@ -199,7 +197,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
_onOpenTask = task;
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
ExecutionException ex = new ExecutionException();
ExecutionErrorCode code = ExecutionErrorCode.INTERNAL_ERROR;
@@ -224,6 +222,26 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
}
@Override
+ public void exception(final Throwable t)
+ {
+ try
+ {
+ super.exception(t);
+ }
+ finally
+ {
+ if(t instanceof Error)
+ {
+ throw (Error) t;
+ }
+ if(t instanceof ServerScopedRuntimeException)
+ {
+ throw (ServerScopedRuntimeException) t;
+ }
+ }
+ }
+
+ @Override
public void received(ProtocolEvent event)
{
_lastIoTime.set(System.currentTimeMillis());
@@ -294,7 +312,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel,
return _actor;
}
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
closeSubscriptions();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 87a02b99c1..7e2d614b99 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -41,8 +41,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.server.store.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
@@ -148,7 +147,7 @@ public class ServerSession extends Session
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
{
@Override
- public void doTimeoutAction(String reason) throws AMQException
+ public void doTimeoutAction(String reason)
{
getConnectionModel().closeSession(ServerSession.this, AMQConstant.RESOURCE_ERROR, reason);
}
@@ -679,7 +678,7 @@ public class ServerSession extends Session
return (LogSubject) this;
}
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
{
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 3d50da6ed5..3961bcb080 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -25,11 +25,11 @@ import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.server.store.AMQStoreException;
+import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.HeadersExchange;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
@@ -66,7 +66,7 @@ import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
import org.apache.qpid.transport.*;
import java.nio.ByteBuffer;
@@ -253,7 +253,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
filterManager = FilterManagerFactory.createManager(method.getArguments());
}
- catch (AMQException amqe)
+ catch (AMQInvalidArgumentException amqe)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
return;
@@ -298,10 +298,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.RESOURCE_LOCKED, "Queue has an existing consumer - can't subscribe exclusively");
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot subscribe to queue '" + queueName + "' with destination '" + destination);
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -435,15 +431,7 @@ public class ServerSessionDelegate extends SessionDelegate
}
else
{
-
- try
- {
- sub.flush();
- }
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot flush subscription '" + destination);
- }
+ sub.flush();
}
}
@@ -787,10 +775,6 @@ public class ServerSessionDelegate extends SessionDelegate
+ " to " + method.getAlternateExchange() +".");
}
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare exchange '" + exchangeName);
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -801,26 +785,6 @@ public class ServerSessionDelegate extends SessionDelegate
}
- // TODO decouple AMQException and AMQConstant error codes
- private void exception(Session session, Method method, AMQException exception, String message)
- {
- ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
- if (exception.getErrorCode() != null)
- {
- try
- {
- errorCode = ExecutionErrorCode.get(exception.getErrorCode().getCode());
- }
- catch (IllegalArgumentException iae)
- {
- // ignore, already set to INTERNAL_ERROR
- }
- }
- String description = message + "': " + exception.getMessage();
-
- exception(session, method, errorCode, description);
- }
-
private void exception(Session session, Method method, ExecutionErrorCode errorCode, String description)
{
ExecutionException ex = new ExecutionException();
@@ -903,10 +867,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Exchange '"+method.getExchange()+"' cannot be deleted");
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete exchange '" + method.getExchange() );
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -1001,10 +961,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
exchange.addBinding(method.getBindingKey(), queue, method.getArguments());
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot add binding '" + method.getBindingKey());
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -1058,10 +1014,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
exchange.removeBinding(method.getBindingKey(), queue, null);
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot remove binding '" + method.getBindingKey());
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -1289,10 +1241,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
virtualHost.removeQueue(q);
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete '" + method.getQueue());
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -1345,10 +1293,6 @@ public class ServerSessionDelegate extends SessionDelegate
exception(session, method, errorCode, description);
}
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot declare queue '" + queueName);
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -1426,10 +1370,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
virtualHost.removeQueue(queue);
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot delete queue '" + queueName);
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
@@ -1461,10 +1401,6 @@ public class ServerSessionDelegate extends SessionDelegate
{
queue.clearQueue();
}
- catch (AMQException e)
- {
- exception(session, method, e, "Cannot purge queue '" + queueName);
- }
catch (QpidSecurityException e)
{
exception(session, method, ExecutionErrorCode.UNAUTHORIZED_ACCESS, e.getMessage());
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 4eeb9d8fb2..b15b3f0bfa 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -29,6 +29,7 @@ import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQMethodBody;
@@ -81,6 +82,7 @@ import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
@@ -186,9 +188,16 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
{
@Override
- public void doTimeoutAction(String reason) throws AMQException
+ public void doTimeoutAction(String reason)
{
- closeConnection(reason);
+ try
+ {
+ closeConnection(reason);
+ }
+ catch (AMQException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
}
});
}
@@ -516,7 +525,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*/
public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
- throws AMQException, QpidSecurityException
+ throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException
{
if (tag == null)
{
@@ -579,17 +589,22 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
AMQShortString.toString(tag),
options);
}
- catch (AMQException e)
+ catch (QpidSecurityException e)
{
_tag2SubscriptionTargetMap.remove(tag);
throw e;
}
- catch (RuntimeException e)
+ catch (MessageSource.ExistingExclusiveConsumer e)
{
_tag2SubscriptionTargetMap.remove(tag);
throw e;
}
- catch (QpidSecurityException e)
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ _tag2SubscriptionTargetMap.remove(tag);
+ throw e;
+ }
+ catch (AMQInvalidArgumentException e)
{
_tag2SubscriptionTargetMap.remove(tag);
throw e;
@@ -601,9 +616,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* Unsubscribe a consumer from a queue.
* @param consumerTag
* @return true if the consumerTag had a mapped queue that could be unregistered.
- * @throws AMQException
*/
- public boolean unsubscribeConsumer(AMQShortString consumerTag) throws AMQException
+ public boolean unsubscribeConsumer(AMQShortString consumerTag)
{
ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag);
@@ -622,16 +636,14 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
/**
* Called from the protocol session to close this channel and clean up. T
- *
- * @throws AMQException if there is an error during closure
*/
@Override
- public void close() throws AMQException
+ public void close()
{
close(null, null);
}
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
if(!_closing.compareAndSet(false, true))
{
@@ -651,17 +663,13 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
{
requeue();
}
- catch (AMQException e)
- {
- _logger.error("Caught AMQException whilst attempting to requeue:" + e);
- }
catch (TransportException e)
{
_logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
}
- private void unsubscribeAllConsumers() throws AMQException
+ private void unsubscribeAllConsumers()
{
if (_logger.isInfoEnabled())
{
@@ -724,9 +732,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
* Called to attempt re-delivery all outstanding unacknowledged messages on the channel. May result in delivery to
* this same channel or to other subscribers.
*
- * @throws org.apache.qpid.AMQException if the requeue fails
*/
- public void requeue() throws AMQException
+ public void requeue()
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<MessageInstance> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -756,9 +763,8 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
*
* @param deliveryTag The message to requeue
*
- * @throws AMQException If something goes wrong.
*/
- public void requeue(long deliveryTag) throws AMQException
+ public void requeue(long deliveryTag)
{
MessageInstance unacked = _unacknowledgedMessageMap.remove(deliveryTag);
@@ -1455,7 +1461,7 @@ public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.F
return getProtocolSession().getVirtualHost();
}
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
{
_transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
index 329aa396b0..5e95701e5a 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
@@ -46,28 +46,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.framing.MethodRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -95,6 +78,8 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQState;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
@@ -303,9 +288,24 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
receivedComplete();
}
- catch (Exception e)
+ catch (ConnectionScopedRuntimeException e)
+ {
+ _logger.error("Unexpected exception", e);
+ closeProtocolSession();
+ }
+ catch (AMQProtocolVersionException e)
+ {
+ _logger.error("Unexpected protocol version", e);
+ closeProtocolSession();
+ }
+ catch (AMQFrameDecodingException e)
{
- _logger.error("Unexpected exception when processing datablocks", e);
+ _logger.error("Frame decoding", e);
+ closeProtocolSession();
+ }
+ catch (IOException e)
+ {
+ _logger.error("I/O Exception", e);
closeProtocolSession();
}
finally
@@ -314,34 +314,14 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- private void receivedComplete() throws AMQException
+ private void receivedComplete()
{
- Exception exception = null;
for (AMQChannel channel : _channelsForCurrentMessage)
{
- try
- {
- channel.receivedComplete();
- }
- catch(Exception exceptionForThisChannel)
- {
- if(exception == null)
- {
- exception = exceptionForThisChannel;
- }
- _logger.error("Error informing channel that receiving is complete. Channel: " + channel, exceptionForThisChannel);
- }
+ channel.receivedComplete();
}
_channelsForCurrentMessage.clear();
-
- if(exception != null)
- {
- throw new AMQException(
- AMQConstant.INTERNAL_ERROR,
- "Error informing channel that receiving is complete: " + exception.getMessage(),
- exception);
- }
}
/**
@@ -549,7 +529,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ServerScopedRuntimeException(e);
}
final ByteBuffer buf;
@@ -812,16 +792,15 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*
* @param channelId id of the channel to close
*
- * @throws AMQException if an error occurs closing the channel
* @throws IllegalArgumentException if the channel id is not valid
*/
@Override
- public void closeChannel(int channelId) throws AMQException
+ public void closeChannel(int channelId)
{
closeChannel(channelId, null, null);
}
- public void closeChannel(int channelId, AMQConstant cause, String message) throws AMQException
+ public void closeChannel(int channelId, AMQConstant cause, String message)
{
final AMQChannel channel = getChannel(channelId);
if (channel == null)
@@ -903,7 +882,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
*
* @throws AMQException if an error occurs while closing any channel
*/
- private void closeAllChannels() throws AMQException
+ private void closeAllChannels()
{
for (AMQChannel channel : getChannels())
{
@@ -921,7 +900,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
@Override
- public void closeSession() throws AMQException
+ public void closeSession()
{
if(_closing.compareAndSet(false,true))
{
@@ -996,7 +975,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
}
- private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ private void closeConnection(int channelId, AMQConnectionException e)
{
try
{
@@ -1033,7 +1012,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
- catch (AMQException e)
+ catch (ConnectionScopedRuntimeException e)
{
_logger.info(e.getMessage());
}
@@ -1234,9 +1213,9 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
closeProtocolSession();
}
}
- catch (AMQException e)
+ catch (ConnectionScopedRuntimeException e)
{
- _logger.error("Could not close protocol engine", e);
+ _logger.error("Could not close protocol engine", e);
}
catch (TransportException e)
{
@@ -1269,15 +1248,30 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
}
else
{
- _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable);
+ try
+ {
+ _logger.error("Exception caught in " + this + ", closing connection explicitly: " + throwable, throwable);
- MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
- ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
+ ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
- writeFrame(closeBody.generateFrame(0));
+ writeFrame(closeBody.generateFrame(0));
- _sender.close();
+ _sender.close();
+ }
+ finally
+ {
+ if(throwable instanceof Error)
+ {
+ throw (Error) throwable;
+ }
+ if(throwable instanceof ServerScopedRuntimeException)
+ {
+ throw (ServerScopedRuntimeException) throwable;
+ }
+
+ }
}
}
@@ -1441,15 +1435,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
{
writeFrame(responseBody.generateFrame(0));
- try
- {
+ closeSession();
- closeSession();
- }
- catch (AMQException ex)
- {
- throw new RuntimeException(ex);
- }
}
finally
{
@@ -1483,15 +1470,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
try
{
writeFrame(responseBody.generateFrame(channelId));
-
- try
- {
- closeChannel(channelId);
- }
- catch (AMQException ex)
- {
- throw new RuntimeException(ex);
- }
+ closeChannel(channelId);
}
finally
{
@@ -1507,7 +1486,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
return getContextKey().toString();
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
int channelId = ((AMQChannel)session).getChannelId();
closeChannel(channelId, cause, message);
@@ -1522,7 +1501,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi
writeFrame(responseBody.generateFrame(channelId));
}
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
closeConnection(0, new AMQConnectionException(cause, message, 0, 0,
getProtocolOutputConverter().getProtocolMajorVersion(),
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
index 6bcd4b9d49..58a3b5df12 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
@@ -71,7 +71,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
public static interface Task
{
- public void doTask(AMQProtocolSession session) throws AMQException;
+ public void doTask(AMQProtocolSession session);
}
/**
@@ -152,7 +152,7 @@ public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, Auth
void initHeartbeats(int delay);
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
- void closeSession() throws AMQException;
+ void closeSession();
void closeProtocolSession();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
index b80ad3e7b8..f2bb95c8d5 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_Internal_to_v0_8.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -236,7 +237,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter<Inter
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
index 6076ff66c7..f35d37ecbd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageConverter_v0_8_to_Internal.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v0_8;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
@@ -124,11 +125,11 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter<AMQMe
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
}
return list;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index 3665e7f135..fd7fb9ca80 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -22,6 +22,8 @@ package org.apache.qpid.server.protocol.v0_8;
import java.util.Collection;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -32,6 +34,7 @@ import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ByteBufferOutputStream;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.util.ByteBufferInputStream;
import java.io.DataInputStream;
@@ -132,7 +135,7 @@ public class MessageMetaData implements StorableMessageMetaData
catch (IOException e)
{
// This shouldn't happen as we are not actually using anything that can throw an IO Exception
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
return dest.position()-oldPosition;
@@ -196,17 +199,21 @@ public class MessageMetaData implements StorableMessageMetaData
};
return new MessageMetaData(publishBody, chb, arrivalTime);
}
- catch (AMQException e)
+ catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
- catch (IOException e)
+ catch (AMQProtocolVersionException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ catch (AMQFrameDecodingException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
}
}
- };
+ }
public AMQMessageHeader getMessageHeader()
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
index ad4235b786..ce90de7aac 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
@@ -28,6 +28,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -149,7 +150,7 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener<Basic
}
}
- catch (org.apache.qpid.AMQInvalidArgumentException ise)
+ catch (AMQInvalidArgumentException ise)
{
_logger.debug("Closing connection due to invalid selector");
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
index f55a120a2d..c9a7cc69a1 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
@@ -31,6 +31,7 @@ import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.flow.FlowCreditManager;
@@ -124,6 +125,17 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
e.getMessage());
}
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+ "Queue has an exclusive consumer");
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
+ "The GET request has been evaluated as an exclusive consumer, " +
+ "this is likely due to a programming error in the Qpid broker");
+ }
}
}
}
@@ -132,7 +144,8 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB
final AMQProtocolSession session,
final AMQChannel channel,
final boolean acks)
- throws AMQException, QpidSecurityException
+ throws AMQException, QpidSecurityException, MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer
{
final FlowCreditManager singleMessageCredit = new MessageOnlyCreditManager(1L);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
index 442c912032..2594242db4 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.io.ByteArrayOutputStream;
@@ -99,7 +100,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
catch (IOException e)
{
// This *really* shouldn't happen as we're not doing any I/O
- throw new RuntimeException("I/O exception when writing to byte array", e);
+ throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e);
}
// should really associate this channelId to the session
@@ -123,7 +124,7 @@ public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenB
catch (IOException e)
{
// This *really* shouldn't happen as we're not doing any I/O
- throw new RuntimeException("I/O exception when writing to byte array", e);
+ throw new ConnectionScopedRuntimeException("I/O exception when writing to byte array", e);
}
// should really associate this channelId to the session
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
index fa513486a4..5c5b1f141b 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
@@ -129,6 +130,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener<Exchange
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
}
+ catch (UnknownExchangeException e)
+ {
+ // note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 1286a20970..5b5525643c 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -34,21 +34,17 @@ import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.QpidSecurityException;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.virtualhost.plugins.QueueExistsException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
{
@@ -131,7 +127,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final AMQQueue q = queue;
final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
{
- public void doTask(AMQProtocolSession session) throws AMQException
+ public void doTask(AMQProtocolSession session)
{
q.setExclusiveOwningSession(null);
}
@@ -219,7 +215,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
QueueDeclareBody body,
final VirtualHost virtualHost,
final AMQProtocolSession session)
- throws AMQException, QpidSecurityException
+ throws AMQException, QpidSecurityException, QueueExistsException
{
final boolean durable = body.getDurable();
@@ -241,7 +237,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
{
- public void doTask(AMQProtocolSession session) throws AMQException
+ public void doTask(AMQProtocolSession session)
{
if (virtualHost.getQueue(queueName.toString()) == queue)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
index 0555bba98b..c3cbb5e26f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/AMQStateManager.java
@@ -52,8 +52,6 @@ public class AMQStateManager implements AMQMethodListener
/** The current state */
private AMQState _currentState;
- private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
-
public AMQStateManager(Broker broker, AMQProtocolSession protocolSession)
{
_broker = broker;
@@ -72,30 +70,17 @@ public class AMQStateManager implements AMQMethodListener
return _broker;
}
- public AMQState getCurrentState()
- {
- return _currentState;
- }
-
- public void changeState(AMQState newState) throws AMQException
+ public void changeState(AMQState newState)
{
_logger.debug("State changing to " + newState + " from old state " + _currentState);
final AMQState oldState = _currentState;
_currentState = newState;
- for (StateListener l : _stateListeners)
- {
- l.stateChanged(oldState, newState);
- }
}
public void error(Exception e)
{
_logger.error("State manager received error notification[Current State:" + _currentState + "]: " + e, e);
- for (StateListener l : _stateListeners)
- {
- l.error(e);
- }
}
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
@@ -121,28 +106,6 @@ public class AMQStateManager implements AMQMethodListener
}
- private <B extends AMQMethodBody> void checkChannel(AMQMethodEvent<B> evt, AMQProtocolSession protocolSession)
- throws AMQException
- {
- if ((evt.getChannelId() != 0) && !(evt.getMethod() instanceof ChannelOpenBody)
- && (protocolSession.getChannel(evt.getChannelId()) == null)
- && !protocolSession.channelAwaitingClosure(evt.getChannelId()))
- {
- throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
- }
- }
-
- public void addStateListener(StateListener listener)
- {
- _logger.debug("Adding state listener");
- _stateListeners.add(listener);
- }
-
- public void removeStateListener(StateListener listener)
- {
- _stateListeners.remove(listener);
- }
-
public VirtualHostRegistry getVirtualHostRegistry()
{
return _broker.getVirtualHostRegistry();
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java
deleted file mode 100644
index f61553f8a2..0000000000
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/state/IllegalStateTransitionException.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v0_8.state;
-
-import org.apache.qpid.AMQException;
-
-/**
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Not used! Delete.
- */
-public class IllegalStateTransitionException extends AMQException
-{
- private AMQState _originalState;
-
- private Class _frame;
-
- public IllegalStateTransitionException(AMQState originalState, Class frame)
- {
- super("No valid state transition defined for receiving frame " + frame + " from state " + originalState);
- _originalState = originalState;
- _frame = frame;
- }
-
- public AMQState getOriginalState()
- {
- return _originalState;
- }
-
- public Class getFrameClass()
- {
- return _frame;
- }
-}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
index 23c32f988d..8d7de4cd93 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
@@ -139,28 +139,20 @@ public class AckTest extends QpidTestCase
final StoredMessage storedMessage = _messageStore.addMessage(mmd);
final AMQMessage message = new AMQMessage(storedMessage);
ServerTransaction txn = new AutoCommitTransaction(_messageStore);
- txn.enqueue(_queue, message, new ServerTransaction.Action() {
- public void postCommit()
- {
- try
- {
-
- _queue.enqueue(message,null);
- }
- catch (AMQException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- public void onRollback()
- {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- });
+ txn.enqueue(_queue, message,
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ _queue.enqueue(message,null);
+ }
+
+ public void onRollback()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+ });
- // we manually send the message to the subscription
- //_subscription.send(new QueueEntry(_queue,msg), _queue);
}
try
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 4a2fbe44fa..7661d98cb4 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -223,7 +223,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
// Then the AMQMinaProtocolSession can join on the returning future without a NPE.
}
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
super.closeSession(session, cause, message);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
index 41e2fef03f..cae61f9d80 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0;
import java.text.MessageFormat;
import java.util.Collection;
-import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
@@ -152,7 +151,7 @@ public class Connection_1_0 implements ConnectionEventListener
private volatile boolean _stopped;
@Override
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
_conn.close();
}
@@ -170,7 +169,7 @@ public class Connection_1_0 implements ConnectionEventListener
}
@Override
- public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message)
{
// TODO
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index dbfb8f5fc4..f3417710a5 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
@@ -46,6 +45,7 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -187,7 +187,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget
catch (AmqpErrorException e)
{
//TODO
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
Header header = new Header();
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
index 3d030890e0..5356a6e6a3 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.List;
-import org.apache.qpid.AMQException;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
@@ -29,8 +27,6 @@ import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.txn.ServerTransaction;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
index f02908391a..1764eec84d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java
@@ -31,6 +31,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -132,7 +133,7 @@ public class MessageConverter_Internal_to_v1_0 extends MessageConverter_to_1_0<I
}
catch (IOException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
index a8a203b247..40f738b8e7 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
@@ -140,11 +141,11 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
}
return new AmqpValue(list);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
index f639f98dba..ec6d5a924c 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java
@@ -30,6 +30,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
import org.apache.qpid.amqp_1_0.type.messaging.Data;
import org.apache.qpid.server.message.internal.InternalMessage;
import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
@@ -96,7 +97,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
{
if(previousSection != null && (previousSection.getClass() != section.getClass() || section instanceof AmqpValue))
{
- throw new RuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
+ throw new ConnectionScopedRuntimeException("Message is badly formed and has multiple body section which are not all Data or not all AmqpSequence");
}
else
{
@@ -149,7 +150,7 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
}
catch (AmqpErrorException e)
{
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
@@ -257,11 +258,11 @@ public class MessageConverter_v1_0_to_Internal implements MessageConverter<Messa
}
catch (TypedBytesFormatException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
catch (EOFException e)
{
- throw new RuntimeException(e); // TODO - Implement
+ throw new ConnectionScopedRuntimeException(e);
}
}
return list;
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index be9d7a2d60..f28e25e080 100755
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.messaging.Properties;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class MessageMetaData_1_0 implements StorableMessageMetaData
{
@@ -394,7 +395,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
catch (AmqpErrorException e)
{
//TODO
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
index d614f44981..58c3597a5c 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.io.IOException;
import java.io.PrintWriter;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
@@ -40,8 +41,11 @@ import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.ErrorCondition;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.protocol.ServerProtocolEngine;
@@ -51,12 +55,16 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOutputHandler
{
+ private static final org.apache.log4j.Logger
+ _logger = org.apache.log4j.Logger.getLogger(ProtocolEngine_1_0_0_SASL.class);
+
private final Port _port;
private final Transport _transport;
private long _readBytes;
@@ -353,9 +361,35 @@ public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, FrameOut
}
- public void exception(Throwable t)
+ public void exception(Throwable throwable)
{
- t.printStackTrace();
+ if (throwable instanceof IOException)
+ {
+ _logger.info("IOException caught in " + this + ", connection closed implicitly: " + throwable);
+ }
+ else
+ {
+
+ try
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.INTERNAL_ERROR);
+ err.setDescription(throwable.getMessage());
+ _conn.close(err);
+ close();
+ }
+ finally
+ {
+ if(throwable instanceof java.lang.Error)
+ {
+ throw (java.lang.Error) throwable;
+ }
+ if(throwable instanceof ServerScopedRuntimeException)
+ {
+ throw (ServerScopedRuntimeException) throwable;
+ }
+ }
+ }
}
public void closed()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
index 3d6bb5e3db..4fefbe9f7d 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
@@ -49,37 +49,22 @@ public class QueueDestination extends MessageSourceDestination implements Sendin
public Outcome send(final Message_1_0 message, ServerTransaction txn)
{
- try
+ txn.enqueue(getQueue(),message, new ServerTransaction.Action()
{
- txn.enqueue(getQueue(),message, new ServerTransaction.Action()
+
+
+ public void postCommit()
{
+ getQueue().enqueue(message,null);
+ }
+
+ public void onRollback()
+ {
+ // NO-OP
+ }
+ });
- public void postCommit()
- {
- try
- {
- getQueue().enqueue(message,null);
- }
- catch (Exception e)
- {
- // TODO
- throw new RuntimeException(e);
- }
-
- }
-
- public void onRollback()
- {
- // NO-OP
- }
- });
- }
- catch(Exception e)
- {
- _logger.error("Send error", e);
- throw new RuntimeException(e);
- }
return ACCEPTED;
}
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 56b574685f..546cc79f9e 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -30,8 +30,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInternalException;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
@@ -64,7 +62,9 @@ import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
{
@@ -324,11 +324,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_vhost.removeQueue(tempQueue);
}
- catch (AMQException e)
- {
- //TODO
- _logger.error("Error removing queue", e);
- }
catch (QpidSecurityException e)
{
//TODO
@@ -356,17 +351,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
catch (QpidSecurityException e)
{
_logger.error("Security error", e);
- throw new RuntimeException(e);
- }
- catch (AMQInternalException e)
- {
- _logger.error("Internal error", e);
- throw new RuntimeException(e);
+ throw new ConnectionScopedRuntimeException(e);
}
- catch (AMQException e)
+ catch (QueueExistsException e)
{
- _logger.error("Error", e);
- throw new RuntimeException(e);
+ _logger.error("A randomly generated temporary queue name collided with an existing queue",e);
+ throw new ConnectionScopedRuntimeException(e);
}
@@ -377,7 +367,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
}
else
{
- throw new RuntimeException("Unknown destination type");
+ throw new ConnectionScopedRuntimeException("Unknown destination type");
}
if(_target != null)
@@ -403,15 +393,21 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
messageFilter == null ? null : new SimpleFilterManager(messageFilter),
Message_1_0.class, name, options);
}
- catch (AMQException e)
+ catch (QpidSecurityException e)
{
//TODO
- _logger.error("Error registering subscription", e);
+ _logger.info("Error registering subscription", e);
+ throw new ConnectionScopedRuntimeException(e);
}
- catch (QpidSecurityException e)
+ catch (MessageSource.ExistingExclusiveConsumer e)
{
- //TODO
- _logger.error("Error registering subscription", e);
+ _logger.info("Cannot add a consumer to the destination as there is already an exclusive consumer");
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ _logger.info("Cannot add an exclusive consumer to the destination as there is already a consumer");
+ throw new ConnectionScopedRuntimeException(e);
}
}
@@ -429,18 +425,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
// if not durable or close
if(!TerminusDurability.UNSETTLED_STATE.equals(_durability))
{
-
- try
- {
-
- _consumer.close();
-
- }
- catch (AMQException e)
- {
- //TODO
- _logger.error("Error unregistering subscription", e);
- }
+ _consumer.close();
Modified state = new Modified();
state.setDeliveryFailed(true);
@@ -462,11 +447,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
{
_vhost.removeQueue((AMQQueue)_queue);
}
- catch(AMQException e)
- {
- //TODO
- _logger.error("Error registering subscription", e);
- }
catch (QpidSecurityException e)
{
//TODO
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index a0b2fc5289..c055d1e840 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -36,7 +36,6 @@ import org.apache.qpid.amqp_1_0.type.transaction.TxnCapability;
import org.apache.qpid.amqp_1_0.type.transport.*;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.exchange.Exchange;
@@ -51,7 +50,9 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
import java.util.*;
@@ -357,11 +358,6 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
{
_vhost.removeQueue(tempQueue);
}
- catch (AMQException e)
- {
- //TODO
- _logger.error("Error removing queue from vhost", e);
- }
catch (QpidSecurityException e)
{
//TODO
@@ -399,12 +395,13 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
catch (QpidSecurityException e)
{
//TODO
- _logger.error("Security error", e);
+ _logger.info("Security error", e);
+ throw new ConnectionScopedRuntimeException(e);
}
- catch (AMQException e)
+ catch (QueueExistsException e)
{
- //TODO
- _logger.error("Error", e);
+ _logger.error("A temporary queue was created with a name which collided with an existing queue name");
+ throw new ConnectionScopedRuntimeException(e);
}
return queue;
@@ -490,14 +487,14 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public void close() throws AMQException
+ public void close()
{
// TODO - required for AMQSessionModel / management initiated closing
}
@Override
- public void close(AMQConstant cause, String message) throws AMQException
+ public void close(AMQConstant cause, String message)
{
// TODO - required for AMQSessionModel
}
@@ -509,7 +506,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSu
}
@Override
- public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
{
// TODO - required for AMQSessionModel / long running transaction detection
}
diff --git a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index bc8d157346..d32ea49e60 100644
--- a/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker-plugins/derby-store/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.MessageStoreConstants;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.util.FileUtils;
/**
@@ -99,7 +100,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
return "bigint";
}
- protected void doClose() throws SQLException
+ protected void doClose()
{
try
{
@@ -117,7 +118,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
else
{
getLogger().error("Exception whilst shutting down the store: " + e);
- throw e;
+ throw new ServerScopedRuntimeException("Error closing message store", e);
}
}
}
@@ -307,7 +308,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
catch (SQLException e)
{
closeConnection(conn);
- throw new RuntimeException("Exception while processing store size change", e);
+ throw new ServerScopedRuntimeException("Exception while processing store size change", e);
}
}
}
@@ -359,7 +360,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
catch (SQLException e)
{
closeConnection(conn);
- throw new RuntimeException("Error reducing on disk size", e);
+ throw new ServerScopedRuntimeException("Error reducing on disk size", e);
}
finally
{
@@ -407,7 +408,7 @@ public class DerbyMessageStore extends AbstractJDBCMessageStore implements Messa
catch (SQLException e)
{
closeConnection(conn);
- throw new RuntimeException("Error establishing on disk size", e);
+ throw new ServerScopedRuntimeException("Error establishing on disk size", e);
}
finally
{
diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
index 6fdfa40561..3fac83f5a3 100644
--- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
+++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCMessageStore.java
@@ -30,13 +30,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory;
import org.apache.qpid.server.store.AbstractJDBCMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
/**
* An implementation of a {@link org.apache.qpid.server.store.MessageStore} that uses a JDBC database as the persistence
@@ -252,7 +252,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
@Override
- protected void doClose() throws AMQStoreException
+ protected void doClose()
{
while(!_transactions.isEmpty())
{
@@ -265,7 +265,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
catch (SQLException e)
{
- throw new AMQStoreException("Unable to close connection provider ", e);
+ throw new ServerScopedRuntimeException("Unable to close connection provider ", e);
}
}
@@ -430,7 +430,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
@Override
- public void commitTran() throws AMQStoreException
+ public void commitTran()
{
try
{
@@ -443,7 +443,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
@Override
- public StoreFuture commitTranAsync() throws AMQStoreException
+ public StoreFuture commitTranAsync()
{
try
{
@@ -456,7 +456,7 @@ public class JDBCMessageStore extends AbstractJDBCMessageStore implements Messag
}
@Override
- public void abortTran() throws AMQStoreException
+ public void abortTran()
{
try
{
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
index 4921db5c07..7d76a0ee8e 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
@@ -940,7 +939,7 @@ class ManagementNode implements MessageSource<ManagementNodeConsumer,ManagementN
final FilterManager filters,
final Class<? extends ServerMessage> messageClass,
final String consumerName,
- final EnumSet<Consumer.Option> options) throws AMQException
+ final EnumSet<Consumer.Option> options)
{
final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target);
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index ea4b60da13..008f16883a 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.message.internal.InternalMessage;
@@ -124,7 +123,7 @@ class ManagementNodeConsumer implements Consumer
}
@Override
- public void close() throws AMQException
+ public void close()
{
}
diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
index 283dec0f30..59ab849848 100644
--- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
+++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.management.amqp;
-import org.apache.qpid.AMQException;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.InstanceProperties;
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
index 3375a784ea..08b99a206d 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
@@ -76,6 +76,7 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
import org.apache.qpid.server.plugin.PluginFactory;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.network.security.ssl.QpidMultipleTrustManager;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
@@ -181,7 +182,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem
}
catch (Exception e)
{
- throw new RuntimeException("Failed to start HTTP management on ports : " + httpPorts, e);
+ throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e);
}
CurrentActor.get().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME));
@@ -198,7 +199,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem
}
catch (Exception e)
{
- throw new RuntimeException("Failed to stop HTTP management on ports : " + getHttpPorts(getBroker().getPorts()), e);
+ throw new ServerScopedRuntimeException("Failed to stop HTTP management on ports : " + getHttpPorts(getBroker().getPorts()), e);
}
}
@@ -310,7 +311,7 @@ public class HttpManagement extends AbstractPluginAdapter implements HttpManagem
}
catch (GeneralSecurityException e)
{
- throw new RuntimeException("Cannot configure port " + port.getName() + " for transport " + Transport.SSL, e);
+ throw new ServerScopedRuntimeException("Cannot configure port " + port.getName() + " for transport " + Transport.SSL, e);
}
connector = new SslSocketConnector(factory);
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
index f6674b5152..9a2f0dd1f6 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManager;
import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerFactory;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
public class HttpManagementUtil
@@ -138,7 +139,7 @@ public class HttpManagementUtil
}
catch (PrivilegedActionException e)
{
- throw new RuntimeException("Unable to perform access check", e);
+ throw new ServerScopedRuntimeException("Unable to perform access check", e);
}
}
finally
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
index ee481ebdbe..0381b711bc 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
@@ -214,7 +215,16 @@ public abstract class AbstractServlet extends HttpServlet
catch (PrivilegedActionException e)
{
LOGGER.error("Unable to perform action", e);
- throw new RuntimeException(e.getCause());
+ Throwable cause = e.getCause();
+ if(cause instanceof RuntimeException)
+ {
+ throw (RuntimeException)cause;
+ }
+ if(cause instanceof Error)
+ {
+ throw (Error)cause;
+ }
+ throw new ConnectionScopedRuntimeException(e.getCause());
}
finally
{
@@ -255,7 +265,7 @@ public abstract class AbstractServlet extends HttpServlet
}
catch (IOException e)
{
- throw new RuntimeException("Failed to send error response code " + errorCode, e);
+ throw new ConnectionScopedRuntimeException("Failed to send error response code " + errorCode, e);
}
}
diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
index 9ad52007ab..a29a875071 100644
--- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
+++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.management.plugin.servlet.rest;
import org.apache.commons.codec.binary.Base64;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
@@ -210,7 +211,7 @@ public class SaslServlet extends AbstractServlet
}
if (!saslAuthEnabled)
{
- throw new RuntimeException("Sasl authentication disabled.");
+ throw new ConnectionScopedRuntimeException("Sasl authentication disabled.");
}
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
index 32aac51008..e2b9a98784 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagedObjectRegistry.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.security.auth.jmx.JMXPasswordAuthenticator;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.ssl.SSLContextFactory;
import javax.management.JMException;
@@ -134,7 +135,7 @@ public class JMXManagedObjectRegistry implements ManagedObjectRegistry
}
catch (GeneralSecurityException e)
{
- throw new RuntimeException("Unable to create SSLContext for key store", e);
+ throw new ServerScopedRuntimeException("Unable to create SSLContext for key store", e);
}
CurrentActor.get().message(ManagementConsoleMessages.SSL_KEYSTORE(keyStore.getName()));
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
index 18e9f9f809..e418275d7e 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagement.java
@@ -54,6 +54,7 @@ import org.apache.qpid.server.model.adapter.AbstractPluginAdapter;
import org.apache.qpid.server.plugin.PluginFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.util.MapValueConverter;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class JMXManagement extends AbstractPluginAdapter implements ConfigurationChangeListener
{
@@ -110,7 +111,7 @@ public class JMXManagement extends AbstractPluginAdapter implements Configuratio
}
catch (Exception e)
{
- throw new RuntimeException("Couldn't start JMX management", e);
+ throw new ServerScopedRuntimeException("Couldn't start JMX management", e);
}
return true;
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
index d0c0d5e73f..34366a196c 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ConnectionMBean.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Statistics;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class ConnectionMBean extends AbstractStatisticsGatheringMBean<Connection> implements ManagedConnection
{
@@ -60,7 +61,7 @@ public class ConnectionMBean extends AbstractStatisticsGatheringMBean<Connection
catch (JMException ex)
{
// This is not expected to ever occur.
- throw new RuntimeException("Got JMException in static initializer.", ex);
+ throw new ServerScopedRuntimeException("Got JMException in static initializer.", ex);
}
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
index 3e1a47c431..407da0fd3f 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import javax.management.JMException;
import javax.management.MalformedObjectNameException;
@@ -106,7 +107,7 @@ public class ExchangeMBean extends AMQManagedObject implements ManagedExchange
}
catch(OpenDataException e)
{
- throw new RuntimeException("Unexpected Error creating ArrayType", e);
+ throw new ServerScopedRuntimeException("Unexpected Error creating ArrayType", e);
}
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java
index d6f4b5d8c9..848a846911 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/LoggingManagementMBean.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.jmx.ManagedObject;
import org.apache.qpid.server.jmx.ManagedObjectRegistry;
import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
import org.apache.qpid.server.logging.log4j.LoggingFacadeException;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import javax.management.JMException;
import javax.management.openmbean.CompositeData;
@@ -314,7 +315,7 @@ public class LoggingManagementMBean extends AMQManagedObject implements LoggingM
catch (OpenDataException ode)
{
// Should not happen
- throw new RuntimeException(ode);
+ throw new ConnectionScopedRuntimeException(ode);
}
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index d74aa41244..1365ceb06a 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -57,6 +57,7 @@ import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryVisitor;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueNotificationListener
{
@@ -109,7 +110,7 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
}
catch (OpenDataException e)
{
- throw new RuntimeException(e);
+ throw new ServerScopedRuntimeException(e);
}
}
diff --git a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index fde93a27e3..81dcb05a41 100644
--- a/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -28,6 +28,7 @@ import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.transport.Binary;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -144,9 +145,13 @@ class WebSocketProvider implements AcceptingTransport
{
_server.start();
}
+ catch(RuntimeException e)
+ {
+ throw e;
+ }
catch (Exception e)
{
- throw new RuntimeException(e);
+ throw new ServerScopedRuntimeException(e);
}
}