summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-12-10 11:08:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-12-10 11:08:29 +0000
commit7ffe761004ac65d26c5b8b601aa2f2d1c4e234b1 (patch)
treec1f9820e6a6a6bb0661479401492013ca4d9c0c3 /qpid/java/broker-plugins
parent7e31771bc163e945510512066608da03ebfe4e52 (diff)
downloadqpid-python-7ffe761004ac65d26c5b8b601aa2f2d1c4e234b1.tar.gz
Force close connection on 0-10 when message sent to closing virtual host
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1644374 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java86
1 files changed, 49 insertions, 37 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 288a4f946c..a117ddb0c6 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -37,6 +37,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -381,58 +382,69 @@ public class ServerSessionDelegate extends SessionDelegate
new MessageTransferMessage(storeMessage, serverSession.getReference());
MessageReference<MessageTransferMessage> reference = message.newReference();
- final InstanceProperties instanceProperties = new InstanceProperties()
+ try
{
- @Override
- public Object getProperty(final Property prop)
+ final InstanceProperties instanceProperties = new InstanceProperties()
{
- switch (prop)
+ @Override
+ public Object getProperty(final Property prop)
{
- case EXPIRATION:
- return message.getExpiration();
- case IMMEDIATE:
- return message.isImmediate();
- case MANDATORY:
- return (delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
- case PERSISTENT:
- return message.isPersistent();
- case REDELIVERED:
- return delvProps.getRedelivered();
+ switch (prop)
+ {
+ case EXPIRATION:
+ return message.getExpiration();
+ case IMMEDIATE:
+ return message.isImmediate();
+ case MANDATORY:
+ return (delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT;
+ case PERSISTENT:
+ return message.isPersistent();
+ case REDELIVERED:
+ return delvProps.getRedelivered();
+ }
+ return null;
}
- return null;
- }
- };
+ };
- int enqueues = serverSession.enqueue(message, instanceProperties, destination);
+ int enqueues = serverSession.enqueue(message, instanceProperties, destination);
- if (enqueues == 0)
- {
- if ((delvProps == null || !delvProps.getDiscardUnroutable())
- && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ if (enqueues == 0)
+ {
+ if ((delvProps == null || !delvProps.getDiscardUnroutable())
+ && xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
+ {
+ RangeSet rejects = RangeSetFactory.createRangeSet();
+ rejects.add(xfr.getId());
+ MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
+ ssn.invoke(reject);
+ }
+ else
+ {
+ virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
+ messageMetaData.getRoutingKey()));
+ }
+ }
+
+ if (serverSession.isTransactional())
{
- RangeSet rejects = RangeSetFactory.createRangeSet();
- rejects.add(xfr.getId());
- MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
- ssn.invoke(reject);
+ serverSession.processed(xfr);
}
else
{
- virtualHost.getEventLogger().message(ExchangeMessages.DISCARDMSG(destination.getName(),
- messageMetaData.getRoutingKey()));
+ serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
+ new CommandProcessedAction(serverSession, xfr));
}
}
-
- if (serverSession.isTransactional())
+ catch (VirtualHostUnavailableException e)
{
- serverSession.processed(xfr);
+ getServerConnection(serverSession).close(AMQConstant.CONNECTION_FORCED, e.getMessage());
}
- else
+ finally
{
- serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
- new CommandProcessedAction(serverSession, xfr));
+ reference.release();
}
- reference.release();
+
}
}
@@ -549,7 +561,7 @@ public class ServerSessionDelegate extends SessionDelegate
{
try
{
- ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+ ((ServerSession) session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
}
catch (TimeoutDtxException e)
{