From 83393913d1a7c9bced3c6681423b55fe1d6717e9 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 11 Feb 2015 11:06:23 +0000 Subject: QPID-6383 : don't auto delete queues because the children are being removed due to broker closure git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1658923 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/queue/AbstractQueue.java | 14 +++++++++++++- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 16 +++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index f905558f13..8f4c3d6df0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -245,6 +245,7 @@ public abstract class AbstractQueue> private final ConcurrentLinkedQueue _postRecoveryQueue = new ConcurrentLinkedQueue<>(); private final QueueRunner _queueRunner = new QueueRunner(this); + private boolean _closing; protected AbstractQueue(Map attributes, VirtualHostImpl virtualHost) { @@ -753,6 +754,15 @@ public abstract class AbstractQueue> } + @Override + protected void beforeClose() + { + _closing = true; + super.beforeClose(); + } + + + synchronized void unregisterConsumer(final QueueConsumerImpl consumer) { if (consumer == null) @@ -793,7 +803,8 @@ public abstract class AbstractQueue> if(!consumer.isTransient() && ( getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS || getLifetimePolicy() == LifetimePolicy.DELETE_ON_NO_LINKS ) - && getConsumerCount() == 0) + && getConsumerCount() == 0 + && !(consumer.isDurable() && _closing)) { if (_logger.isInfoEnabled()) @@ -1797,6 +1808,7 @@ public abstract class AbstractQueue> { ReferenceCountingExecutorService.getInstance().releaseExecutorService(); } + _closing = false; } public void checkCapacity(AMQSessionModel channel) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index e3994005d6..d1d1227818 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -292,15 +292,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS actualFilters.put(entry.getKey(), entry.getValue()); } - catch (ParseException e) - { - Error error = new Error(); - error.setCondition(AmqpError.INVALID_FIELD); - error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue()); - error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter"))); - throw new AmqpErrorException(error); - } - catch (SelectorParsingException e) + catch (ParseException | SelectorParsingException e) { Error error = new Error(); error.setCondition(AmqpError.INVALID_FIELD); @@ -364,6 +356,12 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS options.add(ConsumerImpl.Option.NO_LOCAL); } + if(_durability == TerminusDurability.CONFIGURATION || + _durability == TerminusDurability.UNSETTLED_STATE ) + { + options.add(ConsumerImpl.Option.DURABLE); + } + try { final String name; -- cgit v1.2.1