From 9e0e8ef7be2cd693d64d8c3d718a4cfab6bda789 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Sun, 23 Feb 2014 20:31:19 +0000 Subject: QPID-5581 : [Java Broker] rename SimpleAMQQueue to AbstractQueue git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1571086 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/exchange/AbstractExchange.java | 6 +- .../qpid/server/exchange/DefaultExchange.java | 7 +- .../qpid/server/exchange/DirectExchange.java | 2 +- .../org/apache/qpid/server/exchange/Exchange.java | 2 +- .../qpid/server/exchange/FanoutExchange.java | 2 +- .../qpid/server/exchange/HeadersExchange.java | 2 +- .../apache/qpid/server/exchange/TopicExchange.java | 2 +- .../server/logging/subjects/BindingLogSubject.java | 2 +- .../logging/subjects/ExchangeLogSubject.java | 2 +- .../qpid/server/model/adapter/ExchangeAdapter.java | 2 +- .../apache/qpid/server/queue/AbstractQueue.java | 2627 ++++++++++++++++++++ .../queue/AssignedConsumerMessageGroupManager.java | 2 +- .../apache/qpid/server/queue/ConflationQueue.java | 2 +- .../queue/DefinedGroupMessageGroupManager.java | 2 +- .../qpid/server/queue/MessageGroupManager.java | 4 +- .../qpid/server/queue/OrderedQueueEntry.java | 2 +- .../qpid/server/queue/OrderedQueueEntryList.java | 7 +- .../apache/qpid/server/queue/OutOfOrderQueue.java | 2 +- .../qpid/server/queue/QueueArgumentsConverter.java | 4 +- .../apache/qpid/server/queue/QueueConsumer.java | 2 +- .../qpid/server/queue/QueueConsumerImpl.java | 3 +- .../qpid/server/queue/QueueConsumerList.java | 6 +- .../org/apache/qpid/server/queue/QueueContext.java | 2 +- .../apache/qpid/server/queue/QueueEntryImpl.java | 2 +- .../qpid/server/queue/QueueEntryListBase.java | 25 + .../qpid/server/queue/QueueEntryListFactory.java | 2 +- .../org/apache/qpid/server/queue/QueueRunner.java | 4 +- .../apache/qpid/server/queue/SimpleAMQQueue.java | 2624 ------------------- .../qpid/server/queue/SimpleQueueEntryList.java | 25 - .../qpid/server/queue/SortedQueueEntryList.java | 2 +- .../apache/qpid/server/queue/StandardQueue.java | 2 +- .../apache/qpid/server/queue/SubFlushRunner.java | 6 +- .../virtualhost/DefaultUpgraderProvider.java | 2 +- .../logging/subjects/AbstractTestLogSubject.java | 2 +- .../qpid/server/queue/AMQQueueFactoryTest.java | 3 +- .../qpid/server/queue/AbstractQueueTestBase.java | 1128 +++++++++ .../qpid/server/queue/PriorityQueueTest.java | 4 +- .../qpid/server/queue/QueueThreadPoolTest.java | 75 + .../qpid/server/queue/SimpleAMQQueueTestBase.java | 1128 --------- .../server/queue/SimpleAMQQueueThreadPoolTest.java | 75 - .../qpid/server/queue/StandardQueueTest.java | 8 +- .../DurableConfigurationRecovererTest.java | 6 +- .../protocol/v0_10/ServerSessionDelegate.java | 2 +- .../protocol/v0_8/handler/QueueBindHandler.java | 2 +- .../qpid/server/protocol/v1_0/SendingLink_1_0.java | 4 +- 45 files changed, 3911 insertions(+), 3914 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java delete mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java create mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 21d8da0b94..0131ad0458 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -144,15 +144,15 @@ public abstract class AbstractExchange implements Exchange getType(); + public abstract ExchangeType getExchangeType(); @Override public String getTypeName() { - return getType().getType(); + return getExchangeType().getType(); } public boolean isDurable() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java index 3c001f8b31..764c169781 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java @@ -31,9 +31,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; @@ -72,7 +69,7 @@ public class DefaultExchange implements Exchange } @Override - public ExchangeType getType() + public ExchangeType getExchangeType() { return DirectExchange.TYPE; } @@ -166,7 +163,7 @@ public class DefaultExchange implements Exchange @Override public String getTypeName() { - return getType().getType(); + return getExchangeType().getType(); } @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index f9e654f1f2..74eb063455 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -137,7 +137,7 @@ public class DirectExchange extends AbstractExchange } @Override - public ExchangeType getType() + public ExchangeType getExchangeType() { return TYPE; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 7cce4fde88..7902892a46 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -37,7 +37,7 @@ public interface Exchange extends ExchangeReferrer, MessageD String getName(); - ExchangeType getType(); + ExchangeType getExchangeType(); String getTypeName(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index a4d73ed4c4..d302ed6041 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -73,7 +73,7 @@ public class FanoutExchange extends AbstractExchange } @Override - public ExchangeType getType() + public ExchangeType getExchangeType() { return TYPE; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 6017e15446..8c1ea48031 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -87,7 +87,7 @@ public class HeadersExchange extends AbstractExchange } @Override - public ExchangeType getType() + public ExchangeType getExchangeType() { return TYPE; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 2efeb4f2ff..7c8c7ccb88 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -65,7 +65,7 @@ public class TopicExchange extends AbstractExchange } @Override - public ExchangeType getType() + public ExchangeType getExchangeType() { return TYPE; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java index a633162e85..476dff249a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java @@ -42,7 +42,7 @@ public class BindingLogSubject extends AbstractLogSubject { setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(), - exchange.getType().getType(), + exchange.getExchangeType().getType(), exchange.getName(), queue.getName(), routingKey); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java index 5affafad75..6a03ce3693 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java @@ -32,6 +32,6 @@ public class ExchangeLogSubject extends AbstractLogSubject public ExchangeLogSubject(Exchange exchange, VirtualHost vhost) { setLogStringWithFormat(EXCHANGE_FORMAT, vhost.getName(), - exchange.getType().getType(), exchange.getName()); + exchange.getExchangeType().getType(), exchange.getName()); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java index cfd1af1f4b..3c38ba89c8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java @@ -79,7 +79,7 @@ final class ExchangeAdapter extends AbstractConfiguredObject im @Override public String getType() { - return _exchange.getType().getType(); + return _exchange.getExchangeType().getType(); } public Collection getBindings() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java new file mode 100644 index 0000000000..ac118e76a3 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -0,0 +1,2627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.queue; + +import java.security.AccessController; +import java.security.Principal; +import java.util.*; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.model.ExclusivityPolicy; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.QueueActor; +import org.apache.qpid.server.logging.messages.QueueMessages; +import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.InstanceProperties; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.txn.AutoCommitTransaction; +import org.apache.qpid.server.txn.LocalTransaction; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.server.util.MapValueConverter; +import org.apache.qpid.server.util.ServerScopedRuntimeException; +import org.apache.qpid.server.util.StateChangeListener; +import org.apache.qpid.server.virtualhost.VirtualHost; + +import javax.security.auth.Subject; + +abstract class AbstractQueue, + Q extends AbstractQueue, + L extends QueueEntryListBase> + implements AMQQueue>, + StateChangeListener, QueueConsumer.State>, + MessageGroupManager.ConsumerResetHelper +{ + + private static final Logger _logger = Logger.getLogger(AbstractQueue.class); + + public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; + private static final String QPID_NO_GROUP = "qpid.no-group"; + private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); + + // TODO - should make this configurable at the vhost / broker level + private static final int DEFAULT_MAX_GROUPS = 255; + + private final VirtualHost _virtualHost; + + private final String _name; + + /** null means shared */ + private String _description; + + private final boolean _durable; + + private Exchange _alternateExchange; + + + private final L _entries; + + private final QueueConsumerList _consumerList = new QueueConsumerList(); + + private volatile QueueConsumer _exclusiveSubscriber; + + + + private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); + + private final AtomicLong _atomicQueueSize = new AtomicLong(0L); + + private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); + + private final AtomicLong _totalMessagesReceived = new AtomicLong(); + + private final AtomicLong _dequeueCount = new AtomicLong(); + private final AtomicLong _dequeueSize = new AtomicLong(); + private final AtomicLong _enqueueCount = new AtomicLong(); + private final AtomicLong _enqueueSize = new AtomicLong(); + private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong(); + private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); + private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); + private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); + private final AtomicLong _unackedMsgCount = new AtomicLong(0); + private final AtomicLong _unackedMsgBytes = new AtomicLong(); + + private final AtomicInteger _bindingCountHigh = new AtomicInteger(); + + /** max allowed size(KB) of a single message */ + private long _maximumMessageSize; + + /** max allowed number of messages on a queue. */ + private long _maximumMessageCount; + + /** max queue depth for the queue */ + private long _maximumQueueDepth; + + /** maximum message age before alerts occur */ + private long _maximumMessageAge; + + /** the minimum interval between sending out consecutive alerts of the same type */ + private long _minimumAlertRepeatGap; + + private long _capacity; + + private long _flowResumeCapacity; + + private ExclusivityPolicy _exclusivityPolicy; + private LifetimePolicy _lifetimePolicy; + private Object _exclusiveOwner; // could be connection, session or Principal + + private final Set _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + + + static final int MAX_ASYNC_DELIVERIES = 80; + + + private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); + + private final Executor _asyncDelivery; + private AtomicInteger _deliveredMessages = new AtomicInteger(); + private AtomicBoolean _stopped = new AtomicBoolean(false); + + private final Set _blockedChannels = new ConcurrentSkipListSet(); + + private final AtomicBoolean _deleted = new AtomicBoolean(false); + private final List> _deleteTaskList = + new CopyOnWriteArrayList>(); + + + private LogSubject _logSubject; + private LogActor _logActor; + + private boolean _noLocal; + + private final AtomicBoolean _overfull = new AtomicBoolean(false); + private final CopyOnWriteArrayList _bindings = new CopyOnWriteArrayList(); + private UUID _id; + private final Map _arguments; + + //TODO : persist creation time + private long _createTime = System.currentTimeMillis(); + + /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ + private int _maximumDeliveryCount; + private final MessageGroupManager _messageGroupManager; + + private final Collection> _consumerListeners = + new ArrayList>(); + + private AMQQueue.NotificationListener _notificationListener; + private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; + + protected AbstractQueue(VirtualHost virtualHost, + Map attributes, + QueueEntryListFactory entryListFactory) + { + if (virtualHost == null) + { + throw new IllegalArgumentException("Virtual Host must not be null"); + } + + UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); + String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes); + + if (name == null) + { + throw new IllegalArgumentException("Queue name must not be null"); + } + + boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false); + + + _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, + Queue.EXCLUSIVE, + attributes, + ExclusivityPolicy.NONE); + _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class, + Queue.LIFETIME_POLICY, + attributes, + LifetimePolicy.PERMANENT); + + + _name = name; + _durable = durable; + _virtualHost = virtualHost; + _entries = entryListFactory.createQueueEntryList((Q) this); + final LinkedHashMap arguments = new LinkedHashMap(attributes); + + arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); + arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); + + _arguments = Collections.synchronizedMap(arguments); + _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); + + _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); + + + _id = id; + _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); + + _logSubject = new QueueLogSubject(this); + _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); + + virtualHost.getSecurityManager().authoriseCreateQueue(this); + + Subject activeSubject = Subject.getSubject(AccessController.getContext()); + Set sessionPrincipals = activeSubject == null ? Collections.emptySet() : activeSubject.getPrincipals(SessionPrincipal.class); + AMQSessionModel sessionModel; + if(sessionPrincipals.isEmpty()) + { + sessionModel = null; + } + else + { + final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next(); + sessionModel = sessionPrincipal.getSession(); + } + + if(sessionModel != null) + { + + switch(_exclusivityPolicy) + { + + case PRINCIPAL: + _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal(); + break; + case CONTAINER: + _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName(); + break; + case CONNECTION: + _exclusiveOwner = sessionModel.getConnectionModel(); + addExclusivityConstraint(sessionModel.getConnectionModel()); + break; + case SESSION: + _exclusiveOwner = sessionModel; + addExclusivityConstraint(sessionModel); + break; + case NONE: + case LINK: + // nothing to do as if link no link associated until there is a consumer associated + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy: " + + _exclusivityPolicy + + " this is a coding error inside Qpid"); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = new AuthenticatedPrincipal(owner); + } + } + else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER) + { + String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); + if(owner != null) + { + _exclusiveOwner = owner; + } + } + + + if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE) + { + if(sessionModel != null) + { + addLifetimeConstraint(sessionModel.getConnectionModel()); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } + else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END) + { + if(sessionModel != null) + { + addLifetimeConstraint(sessionModel); + } + else + { + throw new IllegalArgumentException("Queues created with a lifetime policy of " + + _lifetimePolicy + + " must be created from a connection."); + } + } + + + + + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) + { + setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes)); + } + else + { + setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)) + { + setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes)); + } + else + { + setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)) + { + setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + attributes)); + } + else + { + setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages()); + } + if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)) + { + setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + attributes)); + } + else + { + setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes()); + } + if (attributes.containsKey(Queue.ALERT_REPEAT_GAP)) + { + setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes)); + } + else + { + setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)) + { + setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes)); + } + else + { + setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes()); + } + if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)) + { + setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes)); + } + else + { + setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes()); + } + if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) + { + setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes)); + } + else + { + setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts()); + } + + final String ownerString; + switch(_exclusivityPolicy) + { + case PRINCIPAL: + ownerString = ((Principal) _exclusiveOwner).getName(); + break; + case CONTAINER: + ownerString = (String) _exclusiveOwner; + break; + default: + ownerString = null; + + } + + // Log the creation of this Queue. + // The priorities display is toggled on if we set priorities > 0 + CurrentActor.get().message(_logSubject, + QueueMessages.CREATED(ownerString, + _entries.getPriorities(), + ownerString != null , + _lifetimePolicy != LifetimePolicy.PERMANENT, + durable, + !durable, + _entries.getPriorities() > 0)); + + if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) + { + if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null + && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) + { + Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); + _messageGroupManager = + new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)), + defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), + this); + } + else + { + _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get( + Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); + } + } + else + { + _messageGroupManager = null; + } + + resetNotifications(); + + } + + private void addLifetimeConstraint(final Deletable lifetimeObject) + { + final Action deleteQueueTask = new Action() + { + @Override + public void performAction(final Deletable object) + { + getVirtualHost().removeQueue(AbstractQueue.this); + } + }; + + lifetimeObject.addDeleteTask(deleteQueueTask); + addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask)); + } + + private void addExclusivityConstraint(final Deletable lifetimeObject) + { + final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject); + final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction); + clearOwnerAction.setDeleteTask(deleteDeleteTask); + lifetimeObject.addDeleteTask(clearOwnerAction); + addDeleteTask(deleteDeleteTask); + } + + public void resetNotifications() + { + // This ensure that the notification checks for the configured alerts are created. + setMaximumMessageAge(_maximumMessageAge); + setMaximumMessageCount(_maximumMessageCount); + setMaximumMessageSize(_maximumMessageSize); + setMaximumQueueDepth(_maximumQueueDepth); + } + + // ------ Getters and Setters + + public void execute(Runnable runnable) + { + try + { + _asyncDelivery.execute(runnable); + } + catch (RejectedExecutionException ree) + { + // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. + if(!_stopped.get()) + { + _logger.error("Unexpected rejected execution", ree); + throw ree; + + } + + } + } + + public void setNoLocal(boolean nolocal) + { + _noLocal = nolocal; + } + + public UUID getId() + { + return _id; + } + + public boolean isDurable() + { + return _durable; + } + + public boolean isExclusive() + { + return _exclusivityPolicy != ExclusivityPolicy.NONE; + } + + public Exchange getAlternateExchange() + { + return _alternateExchange; + } + + public void setAlternateExchange(Exchange exchange) + { + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); + } + if(exchange != null) + { + exchange.addReference(this); + } + _alternateExchange = exchange; + } + + + @Override + public Collection getAvailableAttributes() + { + return new ArrayList(_arguments.keySet()); + } + + @Override + public Object getAttribute(String attrName) + { + return _arguments.get(attrName); + } + + @Override + public LifetimePolicy getLifetimePolicy() + { + return _lifetimePolicy; + } + + public String getOwner() + { + if(_exclusiveOwner != null) + { + switch(_exclusivityPolicy) + { + case CONTAINER: + return (String) _exclusiveOwner; + case PRINCIPAL: + return ((Principal)_exclusiveOwner).getName(); + } + } + return null; + } + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public String getName() + { + return _name; + } + + // ------ Manage Consumers + + + @Override + public synchronized QueueConsumer addConsumer(final T target, + final FilterManager filters, + final Class messageClass, + final String consumerName, + EnumSet optionSet) + throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, + ConsumerAccessRefused + { + + + if (hasExclusiveConsumer()) + { + throw new ExistingExclusiveConsumer(); + } + + Object exclusiveOwner = _exclusiveOwner; + switch(_exclusivityPolicy) + { + case CONNECTION: + if(exclusiveOwner == null) + { + exclusiveOwner = target.getSessionModel().getConnectionModel(); + addExclusivityConstraint(target.getSessionModel().getConnectionModel()); + } + else + { + if(exclusiveOwner != target.getSessionModel().getConnectionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case SESSION: + if(exclusiveOwner == null) + { + exclusiveOwner = target.getSessionModel(); + addExclusivityConstraint(target.getSessionModel()); + } + else + { + if(exclusiveOwner != target.getSessionModel()) + { + throw new ConsumerAccessRefused(); + } + } + break; + case LINK: + if(getConsumerCount() != 0) + { + throw new ConsumerAccessRefused(); + } + break; + case PRINCIPAL: + if(exclusiveOwner == null) + { + exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else + { + if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case CONTAINER: + if(exclusiveOwner == null) + { + exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else + { + if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ConsumerAccessRefused(); + } + } + break; + case NONE: + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } + + boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + + if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL)) + { + optionSet = EnumSet.copyOf(optionSet); + optionSet.add(Consumer.Option.NO_LOCAL); + } + + if(exclusive && getConsumerCount() != 0) + { + throw new ExistingConsumerPreventsExclusive(); + } + + QueueConsumerImpl consumer = new QueueConsumerImpl((Q)this, + target, + consumerName, + filters, messageClass, + optionSet); + + _exclusiveOwner = exclusiveOwner; + target.consumerAdded(consumer); + + + if (exclusive && !isTransient) + { + _exclusiveSubscriber = consumer; + } + + if(consumer.isActive()) + { + _activeSubscriberCount.incrementAndGet(); + } + + consumer.setStateListener(this); + consumer.setQueueContext(new QueueContext(_entries.getHead())); + + if (!isDeleted()) + { + synchronized (_consumerListeners) + { + for(ConsumerRegistrationListener listener : _consumerListeners) + { + listener.consumerAdded((Q)this, consumer); + } + } + + _consumerList.add(consumer); + + if (isDeleted()) + { + consumer.queueDeleted(); + } + } + else + { + // TODO + } + + deliverAsync(consumer); + + return consumer; + + } + + synchronized void unregisterConsumer(final QueueConsumerImpl consumer) + { + if (consumer == null) + { + throw new NullPointerException("consumer argument is null"); + } + + boolean removed = _consumerList.remove(consumer); + + if (removed) + { + consumer.close(); + // No longer can the queue have an exclusive consumer + setExclusiveSubscriber(null); + + consumer.setQueueContext(null); + + if(_exclusivityPolicy == ExclusivityPolicy.LINK) + { + _exclusiveOwner = null; + } + + if(_messageGroupManager != null) + { + resetSubPointersForGroups(consumer, true); + } + + synchronized (_consumerListeners) + { + for(ConsumerRegistrationListener listener : _consumerListeners) + { + listener.consumerRemoved((Q)this, consumer); + } + } + + // auto-delete queues must be deleted if there are no remaining subscribers + + if(!consumer.isTransient() + && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS + || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS ) + && getConsumerCount() == 0) + { + + if (_logger.isInfoEnabled()) + { + _logger.info("Auto-deleting queue:" + this); + } + + getVirtualHost().removeQueue(this); + + // we need to manually fire the event to the removed consumer (which was the last one left for this + // queue. This is because the delete method uses the consumer set which has just been cleared + consumer.queueDeleted(); + } + } + + } + + public Collection> getConsumers() + { + List> consumers = new ArrayList>(); + QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); + while(iter.advance()) + { + consumers.add(iter.getNode().getConsumer()); + } + return consumers; + + } + + public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) + { + synchronized (_consumerListeners) + { + _consumerListeners.add(listener); + } + } + + public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) + { + synchronized (_consumerListeners) + { + _consumerListeners.remove(listener); + } + } + + public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments) + { + E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); + if(clearAssignments) + { + _messageGroupManager.clearAssignments(consumer); + } + + if(entry != null) + { + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance()) + { + QueueConsumer sub = subscriberIter.getNode().getConsumer(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues()) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + + } + } + + public void addBinding(final Binding binding) + { + _bindings.add(binding); + int bindingCount = _bindings.size(); + int bindingCountHigh; + while(bindingCount > (bindingCountHigh = _bindingCountHigh.get())) + { + if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount)) + { + break; + } + } + } + + public void removeBinding(final Binding binding) + { + _bindings.remove(binding); + } + + public List getBindings() + { + return Collections.unmodifiableList(_bindings); + } + + public int getBindingCount() + { + return getBindings().size(); + } + + public LogSubject getLogSubject() + { + return _logSubject; + } + + // ------ Enqueue / Dequeue + + public void enqueue(ServerMessage message, Action>> action) + { + incrementQueueCount(); + incrementQueueSize(message); + + _totalMessagesReceived.incrementAndGet(); + + + E entry; + final QueueConsumer exclusiveSub = _exclusiveSubscriber; + entry = _entries.add(message); + + if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) + { + /* + + iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message + + */ + QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); + QueueConsumerList.ConsumerNode nextNode = node.findNext(); + if (nextNode == null) + { + nextNode = _consumerList.getHead().findNext(); + } + while (nextNode != null) + { + if (_consumerList.updateMarkedNode(node, nextNode)) + { + break; + } + else + { + node = _consumerList.getMarkedNode(); + nextNode = node.findNext(); + if (nextNode == null) + { + nextNode = _consumerList.getHead().findNext(); + } + } + } + + // always do one extra loop after we believe we've finished + // this catches the case where we *just* miss an update + int loops = 2; + + while (entry.isAvailable() && loops != 0) + { + if (nextNode == null) + { + loops--; + nextNode = _consumerList.getHead(); + } + else + { + // if consumer at end, and active, offer + QueueConsumer sub = nextNode.getConsumer(); + deliverToConsumer(sub, entry); + } + nextNode = nextNode.findNext(); + + } + } + + + if (entry.isAvailable()) + { + checkConsumersNotAheadOfDelivery(entry); + + if (exclusiveSub != null) + { + deliverAsync(exclusiveSub); + } + else + { + deliverAsync(); + } + } + + checkForNotification(entry.getMessage()); + + if(action != null) + { + action.performAction(entry); + } + + } + + private void deliverToConsumer(final QueueConsumer sub, final E entry) + { + + if(sub.trySendLock()) + { + try + { + if (!sub.isSuspended() + && consumerReadyAndHasInterest(sub, entry) + && mightAssign(sub, entry) + && !sub.wouldSuspend(entry)) + { + if (sub.acquires() && !assign(sub, entry)) + { + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this consumer + sub.restoreCredit(entry); + } + else + { + deliverMessage(sub, entry, false); + } + } + } + finally + { + sub.releaseSendLock(); + } + } + } + + private boolean assign(final QueueConsumer sub, final E entry) + { + if(_messageGroupManager == null) + { + //no grouping, try to acquire immediately. + return entry.acquire(sub); + } + else + { + //the group manager is responsible for acquiring the message if/when appropriate + return _messageGroupManager.acceptMessage(sub, entry); + } + } + + private boolean mightAssign(final QueueConsumer sub, final E entry) + { + if(_messageGroupManager == null || !sub.acquires()) + { + return true; + } + QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); + return (assigned == null) || (assigned == sub); + } + + protected void checkConsumersNotAheadOfDelivery(final E entry) + { + // This method is only required for queues which mess with ordering + // Simple Queues don't :-) + } + + private void incrementQueueSize(final ServerMessage message) + { + long size = message.getSize(); + getAtomicQueueSize().addAndGet(size); + _enqueueCount.incrementAndGet(); + _enqueueSize.addAndGet(size); + if(message.isPersistent() && isDurable()) + { + _persistentMessageEnqueueSize.addAndGet(size); + _persistentMessageEnqueueCount.incrementAndGet(); + } + } + + public long getTotalDequeueCount() + { + return _dequeueCount.get(); + } + + public long getTotalEnqueueCount() + { + return _enqueueCount.get(); + } + + private void incrementQueueCount() + { + getAtomicQueueCount().incrementAndGet(); + } + + private void deliverMessage(final QueueConsumer sub, final E entry, boolean batch) + { + setLastSeenEntry(sub, entry); + + _deliveredMessages.incrementAndGet(); + incrementUnackedMsgCount(entry); + + sub.send(entry, batch); + } + + private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final E entry) + { + return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); + } + + + private void setLastSeenEntry(final QueueConsumer sub, final E entry) + { + QueueContext subContext = sub.getQueueContext(); + if (subContext != null) + { + E releasedEntry = subContext.getReleasedEntry(); + + QueueContext._lastSeenUpdater.set(subContext, entry); + if(releasedEntry == entry) + { + QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); + } + } + } + + private void updateSubRequeueEntry(final QueueConsumer sub, final E entry) + { + + QueueContext subContext = sub.getQueueContext(); + if(subContext != null) + { + E oldEntry; + + while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0) + { + if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry)) + { + break; + } + } + } + } + + public void requeue(E entry) + { + QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); + // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards + while (subscriberIter.advance() && entry.isAvailable()) + { + QueueConsumer sub = subscriberIter.getNode().getConsumer(); + + // we don't make browsers send the same stuff twice + if (sub.seesRequeues()) + { + updateSubRequeueEntry(sub, entry); + } + } + + deliverAsync(); + + } + + @Override + public void dequeue(E entry) + { + decrementQueueCount(); + decrementQueueSize(entry); + if (entry.acquiredByConsumer()) + { + _deliveredMessages.decrementAndGet(); + } + + checkCapacity(); + + } + + private void decrementQueueSize(final E entry) + { + final ServerMessage message = entry.getMessage(); + long size = message.getSize(); + getAtomicQueueSize().addAndGet(-size); + _dequeueSize.addAndGet(size); + if(message.isPersistent() && isDurable()) + { + _persistentMessageDequeueSize.addAndGet(size); + _persistentMessageDequeueCount.incrementAndGet(); + } + } + + void decrementQueueCount() + { + getAtomicQueueCount().decrementAndGet(); + _dequeueCount.incrementAndGet(); + } + + public boolean resend(final E entry, final QueueConsumer consumer) + { + /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message + entry to resend and move back the consumer pointer. */ + + consumer.getSendLock(); + try + { + if (!consumer.isClosed()) + { + deliverMessage(consumer, entry, false); + return true; + } + else + { + return false; + } + } + finally + { + consumer.releaseSendLock(); + } + } + + + + public int getConsumerCount() + { + return _consumerList.size(); + } + + public int getActiveConsumerCount() + { + return _activeSubscriberCount.get(); + } + + public boolean isUnused() + { + return getConsumerCount() == 0; + } + + public boolean isEmpty() + { + return getMessageCount() == 0; + } + + public int getMessageCount() + { + return getAtomicQueueCount().get(); + } + + public long getQueueDepth() + { + return getAtomicQueueSize().get(); + } + + public int getUndeliveredMessageCount() + { + int count = getMessageCount() - _deliveredMessages.get(); + if (count < 0) + { + return 0; + } + else + { + return count; + } + } + + public long getReceivedMessageCount() + { + return _totalMessagesReceived.get(); + } + + public long getOldestMessageArrivalTime() + { + E entry = getOldestQueueEntry(); + return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); + } + + protected E getOldestQueueEntry() + { + return _entries.next(_entries.getHead()); + } + + public boolean isDeleted() + { + return _deleted.get(); + } + + public List getMessagesOnTheQueue() + { + ArrayList entryList = new ArrayList(); + QueueEntryIterator> queueListIterator = _entries.iterator(); + while (queueListIterator.advance()) + { + E node = queueListIterator.getNode(); + if (node != null && !node.isDeleted()) + { + entryList.add(node); + } + } + return entryList; + + } + + public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState) + { + if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE) + { + _activeSubscriberCount.decrementAndGet(); + + } + else if (newState == QueueConsumer.State.ACTIVE) + { + if (oldState != QueueConsumer.State.ACTIVE) + { + _activeSubscriberCount.incrementAndGet(); + + } + deliverAsync(sub); + } + } + + public int compareTo(final Q o) + { + return _name.compareTo(o.getName()); + } + + public AtomicInteger getAtomicQueueCount() + { + return _atomicQueueCount; + } + + public AtomicLong getAtomicQueueSize() + { + return _atomicQueueSize; + } + + public boolean hasExclusiveConsumer() + { + return _exclusiveSubscriber != null; + } + + private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber) + { + _exclusiveSubscriber = exclusiveSubscriber; + } + + long getStateChangeCount() + { + return _stateChangeCount.get(); + } + + /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ + protected L getEntries() + { + return _entries; + } + + protected QueueConsumerList getConsumerList() + { + return _consumerList; + } + + + public static interface QueueEntryFilter + { + public boolean accept(E entry); + + public boolean filterComplete(); + } + + + + public List getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) + { + return getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(E entry) + { + final long messageId = entry.getMessage().getMessageNumber(); + return messageId >= fromMessageId && messageId <= toMessageId; + } + + public boolean filterComplete() + { + return false; + } + }); + } + + public E getMessageOnTheQueue(final long messageId) + { + List entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + private boolean _complete; + + public boolean accept(E entry) + { + _complete = entry.getMessage().getMessageNumber() == messageId; + return _complete; + } + + public boolean filterComplete() + { + return _complete; + } + }); + return entries.isEmpty() ? null : entries.get(0); + } + + public List getMessagesOnTheQueue(QueueEntryFilter filter) + { + ArrayList entryList = new ArrayList(); + QueueEntryIterator> queueListIterator = _entries.iterator(); + while (queueListIterator.advance() && !filter.filterComplete()) + { + E node = queueListIterator.getNode(); + if (!node.isDeleted() && filter.accept(node)) + { + entryList.add(node); + } + } + return entryList; + + } + + public void visit(final QueueEntryVisitor visitor) + { + QueueEntryIterator> queueListIterator = _entries.iterator(); + + while(queueListIterator.advance()) + { + E node = queueListIterator.getNode(); + + if(!node.isDeleted()) + { + if(visitor.visit(node)) + { + break; + } + } + } + } + + /** + * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. + * + * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. + * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. + * @param fromPosition first message position + * @param toPosition last message position + * @return list of messages + */ + public List getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) + { + return getMessagesOnTheQueue(new QueueEntryFilter() + { + private long position = 0; + + public boolean accept(E entry) + { + position++; + return (position >= fromPosition) && (position <= toPosition); + } + + public boolean filterComplete() + { + return position >= toPosition; + } + }); + + } + + public long getCreateTime() + { + return _createTime; + } + + // ------ Management functions + + public long clearQueue() + { + return clear(0l); + } + + private long clear(final long request) + { + //Perform ACLs + getVirtualHost().getSecurityManager().authorisePurge(this); + + QueueEntryIterator> queueListIterator = _entries.iterator(); + long count = 0; + + ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); + + while (queueListIterator.advance()) + { + E node = queueListIterator.getNode(); + if (node.acquire()) + { + dequeueEntry(node, txn); + if(++count == request) + { + break; + } + } + + } + + txn.commit(); + + return count; + } + + private void dequeueEntry(final E node) + { + ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); + dequeueEntry(node, txn); + } + + private void dequeueEntry(final E node, ServerTransaction txn) + { + txn.dequeue(this, node.getMessage(), + new ServerTransaction.Action() + { + + public void postCommit() + { + node.delete(); + } + + public void onRollback() + { + + } + }); + } + + @Override + public void addDeleteTask(final Action task) + { + _deleteTaskList.add(task); + } + + @Override + public void removeDeleteTask(final Action task) + { + _deleteTaskList.remove(task); + } + + // TODO list all thrown exceptions + public int delete() + { + // Check access + _virtualHost.getSecurityManager().authoriseDelete(this); + + if (!_deleted.getAndSet(true)) + { + + final ArrayList bindingCopy = new ArrayList(_bindings); + + for (Binding b : bindingCopy) + { + b.getExchange().removeBinding(b); + } + + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + + while (consumerNodeIterator.advance()) + { + QueueConsumer s = consumerNodeIterator.getNode().getConsumer(); + if (s != null) + { + s.queueDeleted(); + } + } + + + List entries = getMessagesOnTheQueue(new QueueEntryFilter() + { + + public boolean accept(E entry) + { + return entry.acquire(); + } + + public boolean filterComplete() + { + return false; + } + }); + + ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); + + + for(final E entry : entries) + { + // TODO log requeues with a post enqueue action + int requeues = entry.routeToAlternate(null, txn); + + if(requeues == 0) + { + // TODO log discard + } + } + + txn.commit(); + + if(_alternateExchange != null) + { + _alternateExchange.removeReference(this); + } + + + for (Action task : _deleteTaskList) + { + task.performAction((Q)this); + } + + _deleteTaskList.clear(); + stop(); + + //Log Queue Deletion + CurrentActor.get().message(_logSubject, QueueMessages.DELETED()); + + } + return getMessageCount(); + + } + + public void stop() + { + if (!_stopped.getAndSet(true)) + { + ReferenceCountingExecutorService.getInstance().releaseExecutorService(); + } + } + + public void checkCapacity(AMQSessionModel channel) + { + if(_capacity != 0l) + { + if(_atomicQueueSize.get() > _capacity) + { + _overfull.set(true); + //Overfull log message + _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); + + _blockedChannels.add(channel); + + channel.block(this); + + if(_atomicQueueSize.get() <= _flowResumeCapacity) + { + + //Underfull log message + _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); + + channel.unblock(this); + _blockedChannels.remove(channel); + + } + + } + + + + } + } + + private void checkCapacity() + { + if(_capacity != 0L) + { + if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) + { + if(_overfull.compareAndSet(true,false)) + {//Underfull log message + _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); + } + + for(final AMQSessionModel blockedChannel : _blockedChannels) + { + blockedChannel.unblock(this); + _blockedChannels.remove(blockedChannel); + } + } + } + } + + private QueueRunner _queueRunner = new QueueRunner(this); + + public void deliverAsync() + { + _stateChangeCount.incrementAndGet(); + + _queueRunner.execute(_asyncDelivery); + + } + + public void deliverAsync(QueueConsumer sub) + { + if(_exclusiveSubscriber == null) + { + deliverAsync(); + } + else + { + SubFlushRunner flusher = sub.getRunner(); + flusher.execute(_asyncDelivery); + } + + } + + void flushConsumer(QueueConsumer sub) + { + + flushConsumer(sub, Long.MAX_VALUE); + } + + boolean flushConsumer(QueueConsumer sub, long iterations) + { + boolean atTail = false; + final boolean keepSendLockHeld = iterations <= AbstractQueue.MAX_ASYNC_DELIVERIES; + boolean queueEmpty = false; + + try + { + if(keepSendLockHeld) + { + sub.getSendLock(); + } + while (!sub.isSuspended() && !atTail && iterations != 0) + { + try + { + if(!keepSendLockHeld) + { + sub.getSendLock(); + } + + atTail = attemptDelivery(sub, true); + if (atTail && getNextAvailableEntry(sub) == null) + { + queueEmpty = true; + } + else if (!atTail) + { + iterations--; + } + } + finally + { + if(!keepSendLockHeld) + { + sub.releaseSendLock(); + } + } + } + } + finally + { + if(keepSendLockHeld) + { + sub.releaseSendLock(); + } + if(queueEmpty) + { + sub.queueEmpty(); + } + + sub.flushBatched(); + + } + + + // if there's (potentially) more than one consumer the others will potentially not have been advanced to the + // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc + // which would give us memory "leak". + + if (!hasExclusiveConsumer()) + { + advanceAllConsumers(); + } + return atTail; + } + + /** + * Attempt delivery for the given consumer. + * + * Looks up the next node for the consumer and attempts to deliver it. + * + * + * @param sub the consumer + * @param batch true if processing can be batched + * @return true if we have completed all possible deliveries for this sub. + */ + private boolean attemptDelivery(QueueConsumer sub, boolean batch) + { + boolean atTail = false; + + boolean subActive = sub.isActive() && !sub.isSuspended(); + if (subActive) + { + + E node = getNextAvailableEntry(sub); + + if (node != null && node.isAvailable()) + { + if (sub.hasInterest(node) && mightAssign(sub, node)) + { + if (!sub.wouldSuspend(node)) + { + if (sub.acquires() && !assign(sub, node)) + { + // restore credit here that would have been taken away by wouldSuspend since we didn't manage + // to acquire the entry for this consumer + sub.restoreCredit(node); + } + else + { + deliverMessage(sub, node, batch); + } + + } + else // Not enough Credit for message and wouldSuspend + { + //QPID-1187 - Treat the consumer as suspended for this message + // and wait for the message to be removed to continue delivery. + subActive = false; + node.addStateChangeListener(new QueueEntryListener(sub)); + } + } + + } + atTail = (node == null) || (_entries.next(node) == null); + } + return atTail || !subActive; + } + + protected void advanceAllConsumers() + { + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + while (consumerNodeIterator.advance()) + { + QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode(); + QueueConsumer sub = subNode.getConsumer(); + if(sub.acquires()) + { + getNextAvailableEntry(sub); + } + else + { + // TODO + } + } + } + + private E getNextAvailableEntry(final QueueConsumer sub) + { + QueueContext context = sub.getQueueContext(); + if(context != null) + { + E lastSeen = context.getLastSeenEntry(); + E releasedNode = context.getReleasedEntry(); + + E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); + + boolean expired = false; + while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || + !mightAssign(sub,node))) + { + if (expired) + { + expired = false; + if (node.acquire()) + { + dequeueEntry(node); + } + } + + if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node)) + { + QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null); + } + + lastSeen = context.getLastSeenEntry(); + releasedNode = context.getReleasedEntry(); + node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); + } + return node; + } + else + { + return null; + } + } + + public boolean isEntryAheadOfConsumer(E entry, QueueConsumer sub) + { + QueueContext context = sub.getQueueContext(); + if(context != null) + { + E releasedNode = context.getReleasedEntry(); + return releasedNode != null && releasedNode.compareTo(entry) < 0; + } + else + { + return false; + } + } + + /** + * Used by queue Runners to asynchronously deliver messages to consumers. + * + * A queue Runner is started whenever a state change occurs, e.g when a new + * message arrives on the queue and cannot be immediately delivered to a + * consumer (i.e. asynchronous delivery is required). Unless there are + * SubFlushRunners operating (due to consumers unsuspending) which are + * capable of accepting/delivering all messages then these messages would + * otherwise remain on the queue. + * + * processQueue should be running while there are messages on the queue AND + * there are consumers that can deliver them. If there are no + * consumers capable of delivering the remaining messages on the queue + * then processQueue should stop to prevent spinning. + * + * Since processQueue is runs in a fixed size Executor, it should not run + * indefinitely to prevent starving other tasks of CPU (e.g jobs to process + * incoming messages may not be able to be scheduled in the thread pool + * because all threads are working on clearing down large queues). To solve + * this problem, after an arbitrary number of message deliveries the + * processQueue job stops iterating, resubmits itself to the executor, and + * ends the current instance + * + * @param runner the Runner to schedule + */ + public long processQueue(QueueRunner runner) + { + long stateChangeCount; + long previousStateChangeCount = Long.MIN_VALUE; + long rVal = Long.MIN_VALUE; + boolean deliveryIncomplete = true; + + boolean lastLoop = false; + int iterations = MAX_ASYNC_DELIVERIES; + + final int numSubs = _consumerList.size(); + + final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); + + // For every message enqueue/requeue the we fire deliveryAsync() which + // increases _stateChangeCount. If _sCC changes whilst we are in our loop + // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) + // then we will continue to run for a maximum of iterations. + // So whilst delivery/rejection is going on a processQueue thread will be running + while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) + { + // we want to have one extra loop after every consumer has reached the point where it cannot move + // further, just in case the advance of one consumer in the last loop allows a different consumer to + // move forward in the next iteration + + if (previousStateChangeCount != stateChangeCount) + { + //further asynchronous delivery is required since the + //previous loop. keep going if iteration slicing allows. + lastLoop = false; + rVal = stateChangeCount; + } + + previousStateChangeCount = stateChangeCount; + boolean allConsumersDone = true; + boolean consumerDone; + + QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); + //iterate over the subscribers and try to advance their pointer + while (consumerNodeIterator.advance()) + { + QueueConsumer sub = consumerNodeIterator.getNode().getConsumer(); + sub.getSendLock(); + + try + { + for(int i = 0 ; i < perSub; i++) + { + //attempt delivery. returns true if no further delivery currently possible to this sub + consumerDone = attemptDelivery(sub, true); + if (consumerDone) + { + sub.flushBatched(); + if (lastLoop && !sub.isSuspended()) + { + sub.queueEmpty(); + } + break; + } + else + { + //this consumer can accept additional deliveries, so we must + //keep going after this (if iteration slicing allows it) + allConsumersDone = false; + lastLoop = false; + if(--iterations == 0) + { + sub.flushBatched(); + break; + } + } + + } + + sub.flushBatched(); + } + finally + { + sub.releaseSendLock(); + } + } + + if(allConsumersDone && lastLoop) + { + //We have done an extra loop already and there are again + //again no further delivery attempts possible, only + //keep going if state change demands it. + deliveryIncomplete = false; + } + else if(allConsumersDone) + { + //All consumers reported being done, but we have to do + //an extra loop if the iterations are not exhausted and + //there is still any work to be done + deliveryIncomplete = _consumerList.size() != 0; + lastLoop = true; + } + else + { + //some consumers can still accept more messages, + //keep going if iteration count allows. + lastLoop = false; + deliveryIncomplete = true; + } + + } + + // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit + // therefore we should schedule this runner again (unless someone beats us to it :-) ). + if (iterations == 0) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Rescheduling runner:" + runner); + } + return 0L; + } + return rVal; + + } + + public void checkMessageStatus() + { + QueueEntryIterator> queueListIterator = _entries.iterator(); + + while (queueListIterator.advance()) + { + E node = queueListIterator.getNode(); + // Only process nodes that are not currently deleted and not dequeued + if (!node.isDeleted()) + { + // If the node has expired then acquire it + if (node.expired() && node.acquire()) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("Dequeuing expired node " + node); + } + // Then dequeue it. + dequeueEntry(node); + } + else + { + // There is a chance that the node could be deleted by + // the time the check actually occurs. So verify we + // can actually get the message to perform the check. + ServerMessage msg = node.getMessage(); + if (msg != null) + { + checkForNotification(msg); + } + } + } + } + + } + + public long getMinimumAlertRepeatGap() + { + return _minimumAlertRepeatGap; + } + + public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) + { + _minimumAlertRepeatGap = minimumAlertRepeatGap; + } + + public long getMaximumMessageAge() + { + return _maximumMessageAge; + } + + public void setMaximumMessageAge(long maximumMessageAge) + { + _maximumMessageAge = maximumMessageAge; + if (maximumMessageAge == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); + } + } + + public long getMaximumMessageCount() + { + return _maximumMessageCount; + } + + public void setMaximumMessageCount(final long maximumMessageCount) + { + _maximumMessageCount = maximumMessageCount; + if (maximumMessageCount == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + } + + } + + public long getMaximumQueueDepth() + { + return _maximumQueueDepth; + } + + // Sets the queue depth, the max queue size + public void setMaximumQueueDepth(final long maximumQueueDepth) + { + _maximumQueueDepth = maximumQueueDepth; + if (maximumQueueDepth == 0L) + { + _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); + } + + } + + public long getMaximumMessageSize() + { + return _maximumMessageSize; + } + + public void setMaximumMessageSize(final long maximumMessageSize) + { + _maximumMessageSize = maximumMessageSize; + if (maximumMessageSize == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); + } + } + + public long getCapacity() + { + return _capacity; + } + + public void setCapacity(long capacity) + { + _capacity = capacity; + } + + public long getFlowResumeCapacity() + { + return _flowResumeCapacity; + } + + public void setFlowResumeCapacity(long flowResumeCapacity) + { + _flowResumeCapacity = flowResumeCapacity; + + checkCapacity(); + } + + public boolean isOverfull() + { + return _overfull.get(); + } + + public Set getNotificationChecks() + { + return _notificationChecks; + } + + private static class DeleteDeleteTask implements Action + { + + private final Deletable _lifetimeObject; + private final Action _deleteQueueOwnerTask; + + public DeleteDeleteTask(final Deletable lifetimeObject, + final Action deleteQueueOwnerTask) + { + _lifetimeObject = lifetimeObject; + _deleteQueueOwnerTask = deleteQueueOwnerTask; + } + + @Override + public void performAction(final Deletable object) + { + _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask); + } + } + + private final class QueueEntryListener implements StateChangeListener + { + + private final QueueConsumer _sub; + + public QueueEntryListener(final QueueConsumer sub) + { + _sub = sub; + } + + public boolean equals(Object o) + { + return o instanceof AbstractQueue.QueueEntryListener + && _sub == ((QueueEntryListener) o)._sub; + } + + public int hashCode() + { + return System.identityHashCode(_sub); + } + + public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState) + { + entry.removeStateChangeListener(this); + deliverAsync(_sub); + } + } + + public List getMessagesOnTheQueue(int num) + { + return getMessagesOnTheQueue(num, 0); + } + + public List getMessagesOnTheQueue(int num, int offset) + { + ArrayList ids = new ArrayList(num); + QueueEntryIterator it = _entries.iterator(); + for (int i = 0; i < offset; i++) + { + it.advance(); + } + + for (int i = 0; i < num && !it.atTail(); i++) + { + it.advance(); + ids.add(it.getNode().getMessage().getMessageNumber()); + } + return ids; + } + + public long getTotalEnqueueSize() + { + return _enqueueSize.get(); + } + + public long getTotalDequeueSize() + { + return _dequeueSize.get(); + } + + public long getPersistentByteEnqueues() + { + return _persistentMessageEnqueueSize.get(); + } + + public long getPersistentByteDequeues() + { + return _persistentMessageDequeueSize.get(); + } + + public long getPersistentMsgEnqueues() + { + return _persistentMessageEnqueueCount.get(); + } + + public long getPersistentMsgDequeues() + { + return _persistentMessageDequeueCount.get(); + } + + + @Override + public String toString() + { + return getName(); + } + + public long getUnackedMessageCount() + { + return _unackedMsgCount.get(); + } + + public long getUnackedMessageBytes() + { + return _unackedMsgBytes.get(); + } + + public void decrementUnackedMsgCount(E queueEntry) + { + _unackedMsgCount.decrementAndGet(); + _unackedMsgBytes.addAndGet(-queueEntry.getSize()); + } + + private void incrementUnackedMsgCount(E entry) + { + _unackedMsgCount.incrementAndGet(); + _unackedMsgBytes.addAndGet(entry.getSize()); + } + + public LogActor getLogActor() + { + return _logActor; + } + + public int getMaximumDeliveryCount() + { + return _maximumDeliveryCount; + } + + public void setMaximumDeliveryCount(final int maximumDeliveryCount) + { + _maximumDeliveryCount = maximumDeliveryCount; + } + + /** + * Checks if there is any notification to send to the listeners + */ + private void checkForNotification(ServerMessage msg) + { + final Set notificationChecks = getNotificationChecks(); + final AMQQueue.NotificationListener listener = _notificationListener; + + if(listener != null && !notificationChecks.isEmpty()) + { + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) + { + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + { + if (check.notifyIfNecessary(msg, this, listener)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } + } + } + } + } + + public void setNotificationListener(AMQQueue.NotificationListener listener) + { + _notificationListener = listener; + } + + @Override + public void setDescription(String description) + { + _description = description; + } + + @Override + public String getDescription() + { + return _description; + } + + public final > int send(final M message, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action> postEnqueueAction) + { + txn.enqueue(this,message, new ServerTransaction.Action() + { + MessageReference _reference = message.newReference(); + + public void postCommit() + { + try + { + AbstractQueue.this.enqueue(message, postEnqueueAction); + } + finally + { + _reference.release(); + } + } + + public void onRollback() + { + _reference.release(); + } + }); + return 1; + + } + + @Override + public boolean verifySessionAccess(final AMQSessionModel session) + { + boolean allowed; + switch(_exclusivityPolicy) + { + case NONE: + allowed = true; + break; + case SESSION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session; + break; + case CONNECTION: + allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel(); + break; + case PRINCIPAL: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal()); + break; + case CONTAINER: + allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName()); + break; + case LINK: + allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session; + break; + default: + throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); + } + return allowed; + } + + @Override + public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy) + throws ExistingConsumerPreventsExclusive + { + if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE)) + { + switch(desiredPolicy) + { + case NONE: + _exclusiveOwner = null; + break; + case PRINCIPAL: + switchToPrincipalExclusivity(); + break; + case CONTAINER: + switchToContainerExclusivity(); + break; + case CONNECTION: + switchToConnectionExclusivity(); + break; + case SESSION: + switchToSessionExclusivity(); + break; + case LINK: + switchToLinkExclusivity(); + break; + } + _exclusivityPolicy = desiredPolicy; + } + } + + private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive + { + switch (getConsumerCount()) + { + case 1: + _exclusiveSubscriber = getConsumerList().getHead().getConsumer(); + // deliberate fall through + case 0: + _exclusiveOwner = null; + break; + default: + throw new ExistingConsumerPreventsExclusive(); + } + + } + + private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive + { + + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + case CONTAINER: + case CONNECTION: + AMQSessionModel session = null; + for(Consumer c : getConsumers()) + { + if(session == null) + { + session = c.getSessionModel(); + } + else if(!session.equals(c.getSessionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = session; + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + case PRINCIPAL: + AMQConnectionModel con = null; + for(Consumer c : getConsumers()) + { + if(con == null) + { + con = c.getSessionModel().getConnectionModel(); + } + else if(!con.equals(c.getSessionModel().getConnectionModel())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = con; + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); + } + } + + private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case PRINCIPAL: + String containerID = null; + for(Consumer c : getConsumers()) + { + if(containerID == null) + { + containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = containerID; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName(); + } + } + + private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive + { + switch(_exclusivityPolicy) + { + case NONE: + case CONTAINER: + Principal principal = null; + for(Consumer c : getConsumers()) + { + if(principal == null) + { + principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) + { + throw new ExistingConsumerPreventsExclusive(); + } + } + _exclusiveOwner = principal; + break; + case CONNECTION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal(); + break; + case SESSION: + _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal(); + break; + case LINK: + _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); + } + } + + private class ClearOwnerAction implements Action + { + private final Deletable _lifetimeObject; + private DeleteDeleteTask _deleteTask; + + public ClearOwnerAction(final Deletable lifetimeObject) + { + _lifetimeObject = lifetimeObject; + } + + @Override + public void performAction(final Deletable object) + { + if(AbstractQueue.this._exclusiveOwner == _lifetimeObject) + { + AbstractQueue.this._exclusiveOwner = null; + } + if(_deleteTask != null) + { + removeDeleteTask(_deleteTask); + } + } + + public void setDeleteTask(final DeleteDeleteTask deleteTask) + { + _deleteTask = deleteTask; + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java index efc82c0ab4..d8209e66ca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java @@ -28,7 +28,7 @@ import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; -public class AssignedConsumerMessageGroupManager, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements MessageGroupManager +public class AssignedConsumerMessageGroupManager, Q extends AbstractQueue, L extends QueueEntryListBase> implements MessageGroupManager { private static final Logger _logger = LoggerFactory.getLogger(AssignedConsumerMessageGroupManager.class); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index d9c130953a..1ffa404caf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -28,7 +28,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.util.MapValueConverter; import org.apache.qpid.server.virtualhost.VirtualHost; -public class ConflationQueue extends SimpleAMQQueue +public class ConflationQueue extends AbstractQueue { public static final String DEFAULT_LVQ_KEY = "qpid.LVQ_key"; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index e67591ae07..ebf8058584 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -32,7 +32,7 @@ import org.apache.qpid.server.message.ServerMessage; import java.util.HashMap; import java.util.Map; -public class DefinedGroupMessageGroupManager, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements MessageGroupManager +public class DefinedGroupMessageGroupManager, Q extends AbstractQueue, L extends QueueEntryListBase> implements MessageGroupManager { private static final Logger _logger = LoggerFactory.getLogger(DefinedGroupMessageGroupManager.class); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java index ba9dbc8a70..820114f33f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java @@ -20,9 +20,9 @@ */ package org.apache.qpid.server.queue; -public interface MessageGroupManager, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> +public interface MessageGroupManager, Q extends AbstractQueue, L extends QueueEntryListBase> { - public interface ConsumerResetHelper, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> + public interface ConsumerResetHelper, Q extends AbstractQueue, L extends QueueEntryListBase> { public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java index 369f42b183..7e4ed5401d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public abstract class OrderedQueueEntry, Q extends SimpleAMQQueue, L extends OrderedQueueEntryList> extends QueueEntryImpl +public abstract class OrderedQueueEntry, Q extends AbstractQueue, L extends OrderedQueueEntryList> extends QueueEntryImpl { static final AtomicReferenceFieldUpdater _nextUpdater = diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java index f2491bdb0c..a8bf3eecca 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java @@ -26,7 +26,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public abstract class OrderedQueueEntryList, Q extends SimpleAMQQueue, L extends OrderedQueueEntryList> implements SimpleQueueEntryList +public abstract class OrderedQueueEntryList, Q extends AbstractQueue, L extends OrderedQueueEntryList> implements + QueueEntryListBase { private final E _head; @@ -111,12 +112,12 @@ public abstract class OrderedQueueEntryList, return node.getNextValidEntry(); } - public static interface HeadCreator, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> + public static interface HeadCreator, Q extends AbstractQueue, L extends QueueEntryListBase> { E createHead(L list); } - public static class QueueEntryIteratorImpl, Q extends SimpleAMQQueue, L extends OrderedQueueEntryList> implements QueueEntryIterator> + public static class QueueEntryIteratorImpl, Q extends AbstractQueue, L extends OrderedQueueEntryList> implements QueueEntryIterator> { private E _lastNode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 28d926686f..20229ec3ff 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -public abstract class OutOfOrderQueue, Q extends OutOfOrderQueue, L extends SimpleQueueEntryList> extends SimpleAMQQueue +public abstract class OutOfOrderQueue, Q extends OutOfOrderQueue, L extends QueueEntryListBase> extends AbstractQueue { protected OutOfOrderQueue(VirtualHost virtualHost, diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java index 43f0b4254c..757db35af9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java @@ -113,7 +113,7 @@ public class QueueArgumentsConverter if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP)) { modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS, - SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP)))); + AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE.equals(String.valueOf(wireArguments.get(QPID_SHARED_MSG_GROUP)))); } if(wireArguments.get(X_QPID_DLQ_ENABLED) != null) { @@ -143,7 +143,7 @@ public class QueueArgumentsConverter if(Boolean.TRUE.equals(modelArguments.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) { - wireArguments.put(QPID_SHARED_MSG_GROUP, SimpleAMQQueue.SHARED_MSG_GROUP_ARG_VALUE); + wireArguments.put(QPID_SHARED_MSG_GROUP, AbstractQueue.SHARED_MSG_GROUP_ARG_VALUE); } return wireArguments; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index f7654d63fa..b2f473f9ff 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -24,7 +24,7 @@ import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageInstance; -interface QueueConsumer, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> extends Consumer +interface QueueConsumer, Q extends AbstractQueue, L extends QueueEntryListBase> extends Consumer { void flushBatched(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java index eeeab76656..ae30e53e9a 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -33,7 +33,6 @@ import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.util.StateChangeListener; @@ -47,7 +46,7 @@ import java.util.concurrent.locks.ReentrantLock; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -class QueueConsumerImpl, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements QueueConsumer +class QueueConsumerImpl, Q extends AbstractQueue, L extends QueueEntryListBase> implements QueueConsumer { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java index 2cf0e93e9c..1f1320cfd2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -class QueueConsumerList, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> +class QueueConsumerList, Q extends AbstractQueue, L extends QueueEntryListBase> { private final ConsumerNode _head = new ConsumerNode(); @@ -32,7 +32,7 @@ class QueueConsumerList, Q extends SimpleAMQQueu private final AtomicReference> _subNodeMarker = new AtomicReference>(_head); private final AtomicInteger _size = new AtomicInteger(); - public static final class ConsumerNode, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> + public static final class ConsumerNode, Q extends AbstractQueue, L extends QueueEntryListBase> { private final AtomicBoolean _deleted = new AtomicBoolean(); private final AtomicReference> _next = new AtomicReference>(); @@ -237,7 +237,7 @@ class QueueConsumerList, Q extends SimpleAMQQueu } - public static class ConsumerNodeIterator, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> + public static class ConsumerNodeIterator, Q extends AbstractQueue, L extends QueueEntryListBase> { private ConsumerNode _lastNode; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java index 7a59c154b6..45673aa555 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.queue; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -final class QueueContext, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> +final class QueueContext, Q extends AbstractQueue, L extends QueueEntryListBase> { private volatile E _lastSeenEntry; private volatile E _releasedEntry; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 1f09fe3fb9..5407985cd8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -public abstract class QueueEntryImpl, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements QueueEntry> +public abstract class QueueEntryImpl, Q extends AbstractQueue, L extends QueueEntryListBase> implements QueueEntry> { private static final Logger _log = Logger.getLogger(QueueEntryImpl.class); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java new file mode 100644 index 0000000000..f997e7383f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +public interface QueueEntryListBase, Q extends AbstractQueue, L extends QueueEntryListBase> extends QueueEntryList> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java index ae8b6560be..6e66fd2905 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -interface QueueEntryListFactory, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> +interface QueueEntryListFactory, Q extends AbstractQueue, L extends QueueEntryListBase> { public L createQueueEntryList(Q queue); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java index fb4740dd4a..d9f3297acf 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java @@ -40,7 +40,7 @@ public class QueueRunner implements Runnable { private static final Logger _logger = Logger.getLogger(QueueRunner.class); - private final SimpleAMQQueue _queue; + private final AbstractQueue _queue; private static int IDLE = 0; private static int SCHEDULED = 1; @@ -53,7 +53,7 @@ public class QueueRunner implements Runnable private final AtomicLong _lastRunAgain = new AtomicLong(); private final AtomicLong _lastRunTime = new AtomicLong(); - public QueueRunner(SimpleAMQQueue queue) + public QueueRunner(AbstractQueue queue) { _queue = queue; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java deleted file mode 100644 index 45660dec37..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ /dev/null @@ -1,2624 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.server.queue; - -import java.security.AccessController; -import java.security.Principal; -import java.util.*; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.connection.SessionPrincipal; -import org.apache.qpid.server.model.ExclusivityPolicy; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.protocol.AMQConnectionModel; -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.binding.Binding; -import org.apache.qpid.server.configuration.BrokerProperties; -import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.QueueActor; -import org.apache.qpid.server.logging.messages.QueueMessages; -import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.message.InstanceProperties; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; -import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.txn.AutoCommitTransaction; -import org.apache.qpid.server.txn.LocalTransaction; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.Deletable; -import org.apache.qpid.server.util.MapValueConverter; -import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.util.StateChangeListener; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import javax.security.auth.Subject; - -abstract class SimpleAMQQueue, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements AMQQueue>, - StateChangeListener, QueueConsumer.State>, - MessageGroupManager.ConsumerResetHelper -{ - - private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class); - - public static final String SHARED_MSG_GROUP_ARG_VALUE = "1"; - private static final String QPID_NO_GROUP = "qpid.no-group"; - private static final String DEFAULT_SHARED_MESSAGE_GROUP = System.getProperty(BrokerProperties.PROPERTY_DEFAULT_SHARED_MESSAGE_GROUP, QPID_NO_GROUP); - - // TODO - should make this configurable at the vhost / broker level - private static final int DEFAULT_MAX_GROUPS = 255; - - private final VirtualHost _virtualHost; - - private final String _name; - - /** null means shared */ - private String _description; - - private final boolean _durable; - - private Exchange _alternateExchange; - - - private final L _entries; - - private final QueueConsumerList _consumerList = new QueueConsumerList(); - - private volatile QueueConsumer _exclusiveSubscriber; - - - - private final AtomicInteger _atomicQueueCount = new AtomicInteger(0); - - private final AtomicLong _atomicQueueSize = new AtomicLong(0L); - - private final AtomicInteger _activeSubscriberCount = new AtomicInteger(); - - private final AtomicLong _totalMessagesReceived = new AtomicLong(); - - private final AtomicLong _dequeueCount = new AtomicLong(); - private final AtomicLong _dequeueSize = new AtomicLong(); - private final AtomicLong _enqueueCount = new AtomicLong(); - private final AtomicLong _enqueueSize = new AtomicLong(); - private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong(); - private final AtomicLong _persistentMessageDequeueSize = new AtomicLong(); - private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong(); - private final AtomicLong _persistentMessageDequeueCount = new AtomicLong(); - private final AtomicLong _unackedMsgCount = new AtomicLong(0); - private final AtomicLong _unackedMsgBytes = new AtomicLong(); - - private final AtomicInteger _bindingCountHigh = new AtomicInteger(); - - /** max allowed size(KB) of a single message */ - private long _maximumMessageSize; - - /** max allowed number of messages on a queue. */ - private long _maximumMessageCount; - - /** max queue depth for the queue */ - private long _maximumQueueDepth; - - /** maximum message age before alerts occur */ - private long _maximumMessageAge; - - /** the minimum interval between sending out consecutive alerts of the same type */ - private long _minimumAlertRepeatGap; - - private long _capacity; - - private long _flowResumeCapacity; - - private ExclusivityPolicy _exclusivityPolicy; - private LifetimePolicy _lifetimePolicy; - private Object _exclusiveOwner; // could be connection, session or Principal - - private final Set _notificationChecks = EnumSet.noneOf(NotificationCheck.class); - - - static final int MAX_ASYNC_DELIVERIES = 80; - - - private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE); - - private final Executor _asyncDelivery; - private AtomicInteger _deliveredMessages = new AtomicInteger(); - private AtomicBoolean _stopped = new AtomicBoolean(false); - - private final Set _blockedChannels = new ConcurrentSkipListSet(); - - private final AtomicBoolean _deleted = new AtomicBoolean(false); - private final List> _deleteTaskList = - new CopyOnWriteArrayList>(); - - - private LogSubject _logSubject; - private LogActor _logActor; - - private boolean _noLocal; - - private final AtomicBoolean _overfull = new AtomicBoolean(false); - private final CopyOnWriteArrayList _bindings = new CopyOnWriteArrayList(); - private UUID _id; - private final Map _arguments; - - //TODO : persist creation time - private long _createTime = System.currentTimeMillis(); - - /** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */ - private int _maximumDeliveryCount; - private final MessageGroupManager _messageGroupManager; - - private final Collection> _consumerListeners = - new ArrayList>(); - - private AMQQueue.NotificationListener _notificationListener; - private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; - - protected SimpleAMQQueue(VirtualHost virtualHost, - Map attributes, - QueueEntryListFactory entryListFactory) - { - if (virtualHost == null) - { - throw new IllegalArgumentException("Virtual Host must not be null"); - } - - UUID id = MapValueConverter.getUUIDAttribute(Queue.ID, attributes); - String name = MapValueConverter.getStringAttribute(Queue.NAME, attributes); - - if (name == null) - { - throw new IllegalArgumentException("Queue name must not be null"); - } - - boolean durable = MapValueConverter.getBooleanAttribute(Queue.DURABLE,attributes,false); - - - _exclusivityPolicy = MapValueConverter.getEnumAttribute(ExclusivityPolicy.class, - Queue.EXCLUSIVE, - attributes, - ExclusivityPolicy.NONE); - _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class, - Queue.LIFETIME_POLICY, - attributes, - LifetimePolicy.PERMANENT); - - - _name = name; - _durable = durable; - _virtualHost = virtualHost; - _entries = entryListFactory.createQueueEntryList((Q) this); - final LinkedHashMap arguments = new LinkedHashMap(attributes); - - arguments.put(Queue.EXCLUSIVE, _exclusivityPolicy); - arguments.put(Queue.LIFETIME_POLICY, _lifetimePolicy); - - _arguments = Collections.synchronizedMap(arguments); - _description = MapValueConverter.getStringAttribute(Queue.DESCRIPTION, attributes, null); - - _noLocal = MapValueConverter.getBooleanAttribute(Queue.NO_LOCAL, attributes, false); - - - _id = id; - _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); - - _logSubject = new QueueLogSubject(this); - _logActor = new QueueActor(this, CurrentActor.get().getRootMessageLogger()); - - virtualHost.getSecurityManager().authoriseCreateQueue(this); - - Subject activeSubject = Subject.getSubject(AccessController.getContext()); - Set sessionPrincipals = activeSubject == null ? Collections.emptySet() : activeSubject.getPrincipals(SessionPrincipal.class); - AMQSessionModel sessionModel; - if(sessionPrincipals.isEmpty()) - { - sessionModel = null; - } - else - { - final SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next(); - sessionModel = sessionPrincipal.getSession(); - } - - if(sessionModel != null) - { - - switch(_exclusivityPolicy) - { - - case PRINCIPAL: - _exclusiveOwner = sessionModel.getConnectionModel().getAuthorizedPrincipal(); - break; - case CONTAINER: - _exclusiveOwner = sessionModel.getConnectionModel().getRemoteContainerName(); - break; - case CONNECTION: - _exclusiveOwner = sessionModel.getConnectionModel(); - addExclusivityConstraint(sessionModel.getConnectionModel()); - break; - case SESSION: - _exclusiveOwner = sessionModel; - addExclusivityConstraint(sessionModel); - break; - case NONE: - case LINK: - // nothing to do as if link no link associated until there is a consumer associated - break; - default: - throw new ServerScopedRuntimeException("Unknown exclusivity policy: " - + _exclusivityPolicy - + " this is a coding error inside Qpid"); - } - } - else if(_exclusivityPolicy == ExclusivityPolicy.PRINCIPAL) - { - String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - if(owner != null) - { - _exclusiveOwner = new AuthenticatedPrincipal(owner); - } - } - else if(_exclusivityPolicy == ExclusivityPolicy.CONTAINER) - { - String owner = MapValueConverter.getStringAttribute(Queue.OWNER, attributes, null); - if(owner != null) - { - _exclusiveOwner = owner; - } - } - - - if(_lifetimePolicy == LifetimePolicy.DELETE_ON_CONNECTION_CLOSE) - { - if(sessionModel != null) - { - addLifetimeConstraint(sessionModel.getConnectionModel()); - } - else - { - throw new IllegalArgumentException("Queues created with a lifetime policy of " - + _lifetimePolicy - + " must be created from a connection."); - } - } - else if(_lifetimePolicy == LifetimePolicy.DELETE_ON_SESSION_END) - { - if(sessionModel != null) - { - addLifetimeConstraint(sessionModel); - } - else - { - throw new IllegalArgumentException("Queues created with a lifetime policy of " - + _lifetimePolicy - + " must be created from a connection."); - } - } - - - - - if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_AGE)) - { - setMaximumMessageAge(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_AGE, attributes)); - } - else - { - setMaximumMessageAge(virtualHost.getDefaultAlertThresholdMessageAge()); - } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_MESSAGE_SIZE)) - { - setMaximumMessageSize(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_MESSAGE_SIZE, attributes)); - } - else - { - setMaximumMessageSize(virtualHost.getDefaultAlertThresholdMessageSize()); - } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES)) - { - setMaximumMessageCount(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, - attributes)); - } - else - { - setMaximumMessageCount(virtualHost.getDefaultAlertThresholdQueueDepthMessages()); - } - if (attributes.containsKey(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES)) - { - setMaximumQueueDepth(MapValueConverter.getLongAttribute(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, - attributes)); - } - else - { - setMaximumQueueDepth(virtualHost.getDefaultAlertThresholdQueueDepthBytes()); - } - if (attributes.containsKey(Queue.ALERT_REPEAT_GAP)) - { - setMinimumAlertRepeatGap(MapValueConverter.getLongAttribute(Queue.ALERT_REPEAT_GAP, attributes)); - } - else - { - setMinimumAlertRepeatGap(virtualHost.getDefaultAlertRepeatGap()); - } - if (attributes.containsKey(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES)) - { - setCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, attributes)); - } - else - { - setCapacity(virtualHost.getDefaultQueueFlowControlSizeBytes()); - } - if (attributes.containsKey(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES)) - { - setFlowResumeCapacity(MapValueConverter.getLongAttribute(Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, attributes)); - } - else - { - setFlowResumeCapacity(virtualHost.getDefaultQueueFlowResumeSizeBytes()); - } - if (attributes.containsKey(Queue.MAXIMUM_DELIVERY_ATTEMPTS)) - { - setMaximumDeliveryCount(MapValueConverter.getIntegerAttribute(Queue.MAXIMUM_DELIVERY_ATTEMPTS, attributes)); - } - else - { - setMaximumDeliveryCount(virtualHost.getDefaultMaximumDeliveryAttempts()); - } - - final String ownerString; - switch(_exclusivityPolicy) - { - case PRINCIPAL: - ownerString = ((Principal) _exclusiveOwner).getName(); - break; - case CONTAINER: - ownerString = (String) _exclusiveOwner; - break; - default: - ownerString = null; - - } - - // Log the creation of this Queue. - // The priorities display is toggled on if we set priorities > 0 - CurrentActor.get().message(_logSubject, - QueueMessages.CREATED(ownerString, - _entries.getPriorities(), - ownerString != null , - _lifetimePolicy != LifetimePolicy.PERMANENT, - durable, - !durable, - _entries.getPriorities() > 0)); - - if(attributes != null && attributes.containsKey(Queue.MESSAGE_GROUP_KEY)) - { - if(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS) != null - && (Boolean)(attributes.get(Queue.MESSAGE_GROUP_SHARED_GROUPS))) - { - Object defaultGroup = attributes.get(Queue.MESSAGE_GROUP_DEFAULT_GROUP); - _messageGroupManager = - new DefinedGroupMessageGroupManager(String.valueOf(attributes.get(Queue.MESSAGE_GROUP_KEY)), - defaultGroup == null ? DEFAULT_SHARED_MESSAGE_GROUP : defaultGroup.toString(), - this); - } - else - { - _messageGroupManager = new AssignedConsumerMessageGroupManager(String.valueOf(attributes.get( - Queue.MESSAGE_GROUP_KEY)), DEFAULT_MAX_GROUPS); - } - } - else - { - _messageGroupManager = null; - } - - resetNotifications(); - - } - - private void addLifetimeConstraint(final Deletable lifetimeObject) - { - final Action deleteQueueTask = new Action() - { - @Override - public void performAction(final Deletable object) - { - getVirtualHost().removeQueue(SimpleAMQQueue.this); - } - }; - - lifetimeObject.addDeleteTask(deleteQueueTask); - addDeleteTask(new DeleteDeleteTask(lifetimeObject, deleteQueueTask)); - } - - private void addExclusivityConstraint(final Deletable lifetimeObject) - { - final ClearOwnerAction clearOwnerAction = new ClearOwnerAction(lifetimeObject); - final DeleteDeleteTask deleteDeleteTask = new DeleteDeleteTask(lifetimeObject, clearOwnerAction); - clearOwnerAction.setDeleteTask(deleteDeleteTask); - lifetimeObject.addDeleteTask(clearOwnerAction); - addDeleteTask(deleteDeleteTask); - } - - public void resetNotifications() - { - // This ensure that the notification checks for the configured alerts are created. - setMaximumMessageAge(_maximumMessageAge); - setMaximumMessageCount(_maximumMessageCount); - setMaximumMessageSize(_maximumMessageSize); - setMaximumQueueDepth(_maximumQueueDepth); - } - - // ------ Getters and Setters - - public void execute(Runnable runnable) - { - try - { - _asyncDelivery.execute(runnable); - } - catch (RejectedExecutionException ree) - { - // Ignore - SubFlusherRunner or QueueRunner submitted execution as queue was being stopped. - if(!_stopped.get()) - { - _logger.error("Unexpected rejected execution", ree); - throw ree; - - } - - } - } - - public void setNoLocal(boolean nolocal) - { - _noLocal = nolocal; - } - - public UUID getId() - { - return _id; - } - - public boolean isDurable() - { - return _durable; - } - - public boolean isExclusive() - { - return _exclusivityPolicy != ExclusivityPolicy.NONE; - } - - public Exchange getAlternateExchange() - { - return _alternateExchange; - } - - public void setAlternateExchange(Exchange exchange) - { - if(_alternateExchange != null) - { - _alternateExchange.removeReference(this); - } - if(exchange != null) - { - exchange.addReference(this); - } - _alternateExchange = exchange; - } - - - @Override - public Collection getAvailableAttributes() - { - return new ArrayList(_arguments.keySet()); - } - - @Override - public Object getAttribute(String attrName) - { - return _arguments.get(attrName); - } - - @Override - public LifetimePolicy getLifetimePolicy() - { - return _lifetimePolicy; - } - - public String getOwner() - { - if(_exclusiveOwner != null) - { - switch(_exclusivityPolicy) - { - case CONTAINER: - return (String) _exclusiveOwner; - case PRINCIPAL: - return ((Principal)_exclusiveOwner).getName(); - } - } - return null; - } - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public String getName() - { - return _name; - } - - // ------ Manage Consumers - - - @Override - public synchronized QueueConsumer addConsumer(final T target, - final FilterManager filters, - final Class messageClass, - final String consumerName, - EnumSet optionSet) - throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, - ConsumerAccessRefused - { - - - if (hasExclusiveConsumer()) - { - throw new ExistingExclusiveConsumer(); - } - - Object exclusiveOwner = _exclusiveOwner; - switch(_exclusivityPolicy) - { - case CONNECTION: - if(exclusiveOwner == null) - { - exclusiveOwner = target.getSessionModel().getConnectionModel(); - addExclusivityConstraint(target.getSessionModel().getConnectionModel()); - } - else - { - if(exclusiveOwner != target.getSessionModel().getConnectionModel()) - { - throw new ConsumerAccessRefused(); - } - } - break; - case SESSION: - if(exclusiveOwner == null) - { - exclusiveOwner = target.getSessionModel(); - addExclusivityConstraint(target.getSessionModel()); - } - else - { - if(exclusiveOwner != target.getSessionModel()) - { - throw new ConsumerAccessRefused(); - } - } - break; - case LINK: - if(getConsumerCount() != 0) - { - throw new ConsumerAccessRefused(); - } - break; - case PRINCIPAL: - if(exclusiveOwner == null) - { - exclusiveOwner = target.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); - } - else - { - if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) - { - throw new ConsumerAccessRefused(); - } - } - break; - case CONTAINER: - if(exclusiveOwner == null) - { - exclusiveOwner = target.getSessionModel().getConnectionModel().getRemoteContainerName(); - } - else - { - if(!exclusiveOwner.equals(target.getSessionModel().getConnectionModel().getRemoteContainerName())) - { - throw new ConsumerAccessRefused(); - } - } - break; - case NONE: - break; - default: - throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); - } - - boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); - boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); - - if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL)) - { - optionSet = EnumSet.copyOf(optionSet); - optionSet.add(Consumer.Option.NO_LOCAL); - } - - if(exclusive && getConsumerCount() != 0) - { - throw new ExistingConsumerPreventsExclusive(); - } - - QueueConsumerImpl consumer = new QueueConsumerImpl((Q)this, - target, - consumerName, - filters, messageClass, - optionSet); - - _exclusiveOwner = exclusiveOwner; - target.consumerAdded(consumer); - - - if (exclusive && !isTransient) - { - _exclusiveSubscriber = consumer; - } - - if(consumer.isActive()) - { - _activeSubscriberCount.incrementAndGet(); - } - - consumer.setStateListener(this); - consumer.setQueueContext(new QueueContext(_entries.getHead())); - - if (!isDeleted()) - { - synchronized (_consumerListeners) - { - for(ConsumerRegistrationListener listener : _consumerListeners) - { - listener.consumerAdded((Q)this, consumer); - } - } - - _consumerList.add(consumer); - - if (isDeleted()) - { - consumer.queueDeleted(); - } - } - else - { - // TODO - } - - deliverAsync(consumer); - - return consumer; - - } - - synchronized void unregisterConsumer(final QueueConsumerImpl consumer) - { - if (consumer == null) - { - throw new NullPointerException("consumer argument is null"); - } - - boolean removed = _consumerList.remove(consumer); - - if (removed) - { - consumer.close(); - // No longer can the queue have an exclusive consumer - setExclusiveSubscriber(null); - - consumer.setQueueContext(null); - - if(_exclusivityPolicy == ExclusivityPolicy.LINK) - { - _exclusiveOwner = null; - } - - if(_messageGroupManager != null) - { - resetSubPointersForGroups(consumer, true); - } - - synchronized (_consumerListeners) - { - for(ConsumerRegistrationListener listener : _consumerListeners) - { - listener.consumerRemoved((Q)this, consumer); - } - } - - // auto-delete queues must be deleted if there are no remaining subscribers - - if(!consumer.isTransient() - && ( _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS - || _lifetimePolicy == LifetimePolicy.DELETE_ON_NO_LINKS ) - && getConsumerCount() == 0) - { - - if (_logger.isInfoEnabled()) - { - _logger.info("Auto-deleting queue:" + this); - } - - getVirtualHost().removeQueue(this); - - // we need to manually fire the event to the removed consumer (which was the last one left for this - // queue. This is because the delete method uses the consumer set which has just been cleared - consumer.queueDeleted(); - } - } - - } - - public Collection> getConsumers() - { - List> consumers = new ArrayList>(); - QueueConsumerList.ConsumerNodeIterator iter = _consumerList.iterator(); - while(iter.advance()) - { - consumers.add(iter.getNode().getConsumer()); - } - return consumers; - - } - - public void addConsumerRegistrationListener(final ConsumerRegistrationListener listener) - { - synchronized (_consumerListeners) - { - _consumerListeners.add(listener); - } - } - - public void removeConsumerRegistrationListener(final ConsumerRegistrationListener listener) - { - synchronized (_consumerListeners) - { - _consumerListeners.remove(listener); - } - } - - public void resetSubPointersForGroups(QueueConsumer consumer, boolean clearAssignments) - { - E entry = _messageGroupManager.findEarliestAssignedAvailableEntry(consumer); - if(clearAssignments) - { - _messageGroupManager.clearAssignments(consumer); - } - - if(entry != null) - { - QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance()) - { - QueueConsumer sub = subscriberIter.getNode().getConsumer(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues()) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - - } - } - - public void addBinding(final Binding binding) - { - _bindings.add(binding); - int bindingCount = _bindings.size(); - int bindingCountHigh; - while(bindingCount > (bindingCountHigh = _bindingCountHigh.get())) - { - if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount)) - { - break; - } - } - } - - public void removeBinding(final Binding binding) - { - _bindings.remove(binding); - } - - public List getBindings() - { - return Collections.unmodifiableList(_bindings); - } - - public int getBindingCount() - { - return getBindings().size(); - } - - public LogSubject getLogSubject() - { - return _logSubject; - } - - // ------ Enqueue / Dequeue - - public void enqueue(ServerMessage message, Action>> action) - { - incrementQueueCount(); - incrementQueueSize(message); - - _totalMessagesReceived.incrementAndGet(); - - - E entry; - final QueueConsumer exclusiveSub = _exclusiveSubscriber; - entry = _entries.add(message); - - if(action != null || (exclusiveSub == null && _queueRunner.isIdle())) - { - /* - - iterate over consumers and if any is at the end of the queue and can deliver this message, then deliver the message - - */ - QueueConsumerList.ConsumerNode node = _consumerList.getMarkedNode(); - QueueConsumerList.ConsumerNode nextNode = node.findNext(); - if (nextNode == null) - { - nextNode = _consumerList.getHead().findNext(); - } - while (nextNode != null) - { - if (_consumerList.updateMarkedNode(node, nextNode)) - { - break; - } - else - { - node = _consumerList.getMarkedNode(); - nextNode = node.findNext(); - if (nextNode == null) - { - nextNode = _consumerList.getHead().findNext(); - } - } - } - - // always do one extra loop after we believe we've finished - // this catches the case where we *just* miss an update - int loops = 2; - - while (entry.isAvailable() && loops != 0) - { - if (nextNode == null) - { - loops--; - nextNode = _consumerList.getHead(); - } - else - { - // if consumer at end, and active, offer - QueueConsumer sub = nextNode.getConsumer(); - deliverToConsumer(sub, entry); - } - nextNode = nextNode.findNext(); - - } - } - - - if (entry.isAvailable()) - { - checkConsumersNotAheadOfDelivery(entry); - - if (exclusiveSub != null) - { - deliverAsync(exclusiveSub); - } - else - { - deliverAsync(); - } - } - - checkForNotification(entry.getMessage()); - - if(action != null) - { - action.performAction(entry); - } - - } - - private void deliverToConsumer(final QueueConsumer sub, final E entry) - { - - if(sub.trySendLock()) - { - try - { - if (!sub.isSuspended() - && consumerReadyAndHasInterest(sub, entry) - && mightAssign(sub, entry) - && !sub.wouldSuspend(entry)) - { - if (sub.acquires() && !assign(sub, entry)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this consumer - sub.restoreCredit(entry); - } - else - { - deliverMessage(sub, entry, false); - } - } - } - finally - { - sub.releaseSendLock(); - } - } - } - - private boolean assign(final QueueConsumer sub, final E entry) - { - if(_messageGroupManager == null) - { - //no grouping, try to acquire immediately. - return entry.acquire(sub); - } - else - { - //the group manager is responsible for acquiring the message if/when appropriate - return _messageGroupManager.acceptMessage(sub, entry); - } - } - - private boolean mightAssign(final QueueConsumer sub, final E entry) - { - if(_messageGroupManager == null || !sub.acquires()) - { - return true; - } - QueueConsumer assigned = _messageGroupManager.getAssignedConsumer(entry); - return (assigned == null) || (assigned == sub); - } - - protected void checkConsumersNotAheadOfDelivery(final E entry) - { - // This method is only required for queues which mess with ordering - // Simple Queues don't :-) - } - - private void incrementQueueSize(final ServerMessage message) - { - long size = message.getSize(); - getAtomicQueueSize().addAndGet(size); - _enqueueCount.incrementAndGet(); - _enqueueSize.addAndGet(size); - if(message.isPersistent() && isDurable()) - { - _persistentMessageEnqueueSize.addAndGet(size); - _persistentMessageEnqueueCount.incrementAndGet(); - } - } - - public long getTotalDequeueCount() - { - return _dequeueCount.get(); - } - - public long getTotalEnqueueCount() - { - return _enqueueCount.get(); - } - - private void incrementQueueCount() - { - getAtomicQueueCount().incrementAndGet(); - } - - private void deliverMessage(final QueueConsumer sub, final E entry, boolean batch) - { - setLastSeenEntry(sub, entry); - - _deliveredMessages.incrementAndGet(); - incrementUnackedMsgCount(entry); - - sub.send(entry, batch); - } - - private boolean consumerReadyAndHasInterest(final QueueConsumer sub, final E entry) - { - return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry); - } - - - private void setLastSeenEntry(final QueueConsumer sub, final E entry) - { - QueueContext subContext = sub.getQueueContext(); - if (subContext != null) - { - E releasedEntry = subContext.getReleasedEntry(); - - QueueContext._lastSeenUpdater.set(subContext, entry); - if(releasedEntry == entry) - { - QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null); - } - } - } - - private void updateSubRequeueEntry(final QueueConsumer sub, final E entry) - { - - QueueContext subContext = sub.getQueueContext(); - if(subContext != null) - { - E oldEntry; - - while((oldEntry = subContext.getReleasedEntry()) == null || oldEntry.compareTo(entry) > 0) - { - if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry)) - { - break; - } - } - } - } - - public void requeue(E entry) - { - QueueConsumerList.ConsumerNodeIterator subscriberIter = _consumerList.iterator(); - // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards - while (subscriberIter.advance() && entry.isAvailable()) - { - QueueConsumer sub = subscriberIter.getNode().getConsumer(); - - // we don't make browsers send the same stuff twice - if (sub.seesRequeues()) - { - updateSubRequeueEntry(sub, entry); - } - } - - deliverAsync(); - - } - - @Override - public void dequeue(E entry) - { - decrementQueueCount(); - decrementQueueSize(entry); - if (entry.acquiredByConsumer()) - { - _deliveredMessages.decrementAndGet(); - } - - checkCapacity(); - - } - - private void decrementQueueSize(final E entry) - { - final ServerMessage message = entry.getMessage(); - long size = message.getSize(); - getAtomicQueueSize().addAndGet(-size); - _dequeueSize.addAndGet(size); - if(message.isPersistent() && isDurable()) - { - _persistentMessageDequeueSize.addAndGet(size); - _persistentMessageDequeueCount.incrementAndGet(); - } - } - - void decrementQueueCount() - { - getAtomicQueueCount().decrementAndGet(); - _dequeueCount.incrementAndGet(); - } - - public boolean resend(final E entry, final QueueConsumer consumer) - { - /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message - entry to resend and move back the consumer pointer. */ - - consumer.getSendLock(); - try - { - if (!consumer.isClosed()) - { - deliverMessage(consumer, entry, false); - return true; - } - else - { - return false; - } - } - finally - { - consumer.releaseSendLock(); - } - } - - - - public int getConsumerCount() - { - return _consumerList.size(); - } - - public int getActiveConsumerCount() - { - return _activeSubscriberCount.get(); - } - - public boolean isUnused() - { - return getConsumerCount() == 0; - } - - public boolean isEmpty() - { - return getMessageCount() == 0; - } - - public int getMessageCount() - { - return getAtomicQueueCount().get(); - } - - public long getQueueDepth() - { - return getAtomicQueueSize().get(); - } - - public int getUndeliveredMessageCount() - { - int count = getMessageCount() - _deliveredMessages.get(); - if (count < 0) - { - return 0; - } - else - { - return count; - } - } - - public long getReceivedMessageCount() - { - return _totalMessagesReceived.get(); - } - - public long getOldestMessageArrivalTime() - { - E entry = getOldestQueueEntry(); - return entry == null ? Long.MAX_VALUE : entry.getMessage().getArrivalTime(); - } - - protected E getOldestQueueEntry() - { - return _entries.next(_entries.getHead()); - } - - public boolean isDeleted() - { - return _deleted.get(); - } - - public List getMessagesOnTheQueue() - { - ArrayList entryList = new ArrayList(); - QueueEntryIterator> queueListIterator = _entries.iterator(); - while (queueListIterator.advance()) - { - E node = queueListIterator.getNode(); - if (node != null && !node.isDeleted()) - { - entryList.add(node); - } - } - return entryList; - - } - - public void stateChanged(QueueConsumer sub, QueueConsumer.State oldState, QueueConsumer.State newState) - { - if (oldState == QueueConsumer.State.ACTIVE && newState != QueueConsumer.State.ACTIVE) - { - _activeSubscriberCount.decrementAndGet(); - - } - else if (newState == QueueConsumer.State.ACTIVE) - { - if (oldState != QueueConsumer.State.ACTIVE) - { - _activeSubscriberCount.incrementAndGet(); - - } - deliverAsync(sub); - } - } - - public int compareTo(final Q o) - { - return _name.compareTo(o.getName()); - } - - public AtomicInteger getAtomicQueueCount() - { - return _atomicQueueCount; - } - - public AtomicLong getAtomicQueueSize() - { - return _atomicQueueSize; - } - - public boolean hasExclusiveConsumer() - { - return _exclusiveSubscriber != null; - } - - private void setExclusiveSubscriber(QueueConsumer exclusiveSubscriber) - { - _exclusiveSubscriber = exclusiveSubscriber; - } - - long getStateChangeCount() - { - return _stateChangeCount.get(); - } - - /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */ - protected L getEntries() - { - return _entries; - } - - protected QueueConsumerList getConsumerList() - { - return _consumerList; - } - - - public static interface QueueEntryFilter - { - public boolean accept(E entry); - - public boolean filterComplete(); - } - - - - public List getMessagesOnTheQueue(final long fromMessageId, final long toMessageId) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(E entry) - { - final long messageId = entry.getMessage().getMessageNumber(); - return messageId >= fromMessageId && messageId <= toMessageId; - } - - public boolean filterComplete() - { - return false; - } - }); - } - - public E getMessageOnTheQueue(final long messageId) - { - List entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - private boolean _complete; - - public boolean accept(E entry) - { - _complete = entry.getMessage().getMessageNumber() == messageId; - return _complete; - } - - public boolean filterComplete() - { - return _complete; - } - }); - return entries.isEmpty() ? null : entries.get(0); - } - - public List getMessagesOnTheQueue(QueueEntryFilter filter) - { - ArrayList entryList = new ArrayList(); - QueueEntryIterator> queueListIterator = _entries.iterator(); - while (queueListIterator.advance() && !filter.filterComplete()) - { - E node = queueListIterator.getNode(); - if (!node.isDeleted() && filter.accept(node)) - { - entryList.add(node); - } - } - return entryList; - - } - - public void visit(final QueueEntryVisitor visitor) - { - QueueEntryIterator> queueListIterator = _entries.iterator(); - - while(queueListIterator.advance()) - { - E node = queueListIterator.getNode(); - - if(!node.isDeleted()) - { - if(visitor.visit(node)) - { - break; - } - } - } - } - - /** - * Returns a list of QueEntries from a given range of queue positions, eg messages 5 to 10 on the queue. - * - * The 'queue position' index starts from 1. Using 0 in 'from' will be ignored and continue from 1. - * Using 0 in the 'to' field will return an empty list regardless of the 'from' value. - * @param fromPosition first message position - * @param toPosition last message position - * @return list of messages - */ - public List getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition) - { - return getMessagesOnTheQueue(new QueueEntryFilter() - { - private long position = 0; - - public boolean accept(E entry) - { - position++; - return (position >= fromPosition) && (position <= toPosition); - } - - public boolean filterComplete() - { - return position >= toPosition; - } - }); - - } - - public long getCreateTime() - { - return _createTime; - } - - // ------ Management functions - - public long clearQueue() - { - return clear(0l); - } - - private long clear(final long request) - { - //Perform ACLs - getVirtualHost().getSecurityManager().authorisePurge(this); - - QueueEntryIterator> queueListIterator = _entries.iterator(); - long count = 0; - - ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - - while (queueListIterator.advance()) - { - E node = queueListIterator.getNode(); - if (node.acquire()) - { - dequeueEntry(node, txn); - if(++count == request) - { - break; - } - } - - } - - txn.commit(); - - return count; - } - - private void dequeueEntry(final E node) - { - ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getMessageStore()); - dequeueEntry(node, txn); - } - - private void dequeueEntry(final E node, ServerTransaction txn) - { - txn.dequeue(this, node.getMessage(), - new ServerTransaction.Action() - { - - public void postCommit() - { - node.delete(); - } - - public void onRollback() - { - - } - }); - } - - @Override - public void addDeleteTask(final Action task) - { - _deleteTaskList.add(task); - } - - @Override - public void removeDeleteTask(final Action task) - { - _deleteTaskList.remove(task); - } - - // TODO list all thrown exceptions - public int delete() - { - // Check access - _virtualHost.getSecurityManager().authoriseDelete(this); - - if (!_deleted.getAndSet(true)) - { - - final ArrayList bindingCopy = new ArrayList(_bindings); - - for (Binding b : bindingCopy) - { - b.getExchange().removeBinding(b); - } - - QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - - while (consumerNodeIterator.advance()) - { - QueueConsumer s = consumerNodeIterator.getNode().getConsumer(); - if (s != null) - { - s.queueDeleted(); - } - } - - - List entries = getMessagesOnTheQueue(new QueueEntryFilter() - { - - public boolean accept(E entry) - { - return entry.acquire(); - } - - public boolean filterComplete() - { - return false; - } - }); - - ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore()); - - - for(final E entry : entries) - { - // TODO log requeues with a post enqueue action - int requeues = entry.routeToAlternate(null, txn); - - if(requeues == 0) - { - // TODO log discard - } - } - - txn.commit(); - - if(_alternateExchange != null) - { - _alternateExchange.removeReference(this); - } - - - for (Action task : _deleteTaskList) - { - task.performAction((Q)this); - } - - _deleteTaskList.clear(); - stop(); - - //Log Queue Deletion - CurrentActor.get().message(_logSubject, QueueMessages.DELETED()); - - } - return getMessageCount(); - - } - - public void stop() - { - if (!_stopped.getAndSet(true)) - { - ReferenceCountingExecutorService.getInstance().releaseExecutorService(); - } - } - - public void checkCapacity(AMQSessionModel channel) - { - if(_capacity != 0l) - { - if(_atomicQueueSize.get() > _capacity) - { - _overfull.set(true); - //Overfull log message - _logActor.message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity)); - - _blockedChannels.add(channel); - - channel.block(this); - - if(_atomicQueueSize.get() <= _flowResumeCapacity) - { - - //Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); - - channel.unblock(this); - _blockedChannels.remove(channel); - - } - - } - - - - } - } - - private void checkCapacity() - { - if(_capacity != 0L) - { - if(_overfull.get() && _atomicQueueSize.get() <= _flowResumeCapacity) - { - if(_overfull.compareAndSet(true,false)) - {//Underfull log message - _logActor.message(_logSubject, QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity)); - } - - for(final AMQSessionModel blockedChannel : _blockedChannels) - { - blockedChannel.unblock(this); - _blockedChannels.remove(blockedChannel); - } - } - } - } - - private QueueRunner _queueRunner = new QueueRunner(this); - - public void deliverAsync() - { - _stateChangeCount.incrementAndGet(); - - _queueRunner.execute(_asyncDelivery); - - } - - public void deliverAsync(QueueConsumer sub) - { - if(_exclusiveSubscriber == null) - { - deliverAsync(); - } - else - { - SubFlushRunner flusher = sub.getRunner(); - flusher.execute(_asyncDelivery); - } - - } - - void flushConsumer(QueueConsumer sub) - { - - flushConsumer(sub, Long.MAX_VALUE); - } - - boolean flushConsumer(QueueConsumer sub, long iterations) - { - boolean atTail = false; - final boolean keepSendLockHeld = iterations <= SimpleAMQQueue.MAX_ASYNC_DELIVERIES; - boolean queueEmpty = false; - - try - { - if(keepSendLockHeld) - { - sub.getSendLock(); - } - while (!sub.isSuspended() && !atTail && iterations != 0) - { - try - { - if(!keepSendLockHeld) - { - sub.getSendLock(); - } - - atTail = attemptDelivery(sub, true); - if (atTail && getNextAvailableEntry(sub) == null) - { - queueEmpty = true; - } - else if (!atTail) - { - iterations--; - } - } - finally - { - if(!keepSendLockHeld) - { - sub.releaseSendLock(); - } - } - } - } - finally - { - if(keepSendLockHeld) - { - sub.releaseSendLock(); - } - if(queueEmpty) - { - sub.queueEmpty(); - } - - sub.flushBatched(); - - } - - - // if there's (potentially) more than one consumer the others will potentially not have been advanced to the - // next entry they are interested in yet. This would lead to holding on to references to expired messages, etc - // which would give us memory "leak". - - if (!hasExclusiveConsumer()) - { - advanceAllConsumers(); - } - return atTail; - } - - /** - * Attempt delivery for the given consumer. - * - * Looks up the next node for the consumer and attempts to deliver it. - * - * - * @param sub the consumer - * @param batch true if processing can be batched - * @return true if we have completed all possible deliveries for this sub. - */ - private boolean attemptDelivery(QueueConsumer sub, boolean batch) - { - boolean atTail = false; - - boolean subActive = sub.isActive() && !sub.isSuspended(); - if (subActive) - { - - E node = getNextAvailableEntry(sub); - - if (node != null && node.isAvailable()) - { - if (sub.hasInterest(node) && mightAssign(sub, node)) - { - if (!sub.wouldSuspend(node)) - { - if (sub.acquires() && !assign(sub, node)) - { - // restore credit here that would have been taken away by wouldSuspend since we didn't manage - // to acquire the entry for this consumer - sub.restoreCredit(node); - } - else - { - deliverMessage(sub, node, batch); - } - - } - else // Not enough Credit for message and wouldSuspend - { - //QPID-1187 - Treat the consumer as suspended for this message - // and wait for the message to be removed to continue delivery. - subActive = false; - node.addStateChangeListener(new QueueEntryListener(sub)); - } - } - - } - atTail = (node == null) || (_entries.next(node) == null); - } - return atTail || !subActive; - } - - protected void advanceAllConsumers() - { - QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - while (consumerNodeIterator.advance()) - { - QueueConsumerList.ConsumerNode subNode = consumerNodeIterator.getNode(); - QueueConsumer sub = subNode.getConsumer(); - if(sub.acquires()) - { - getNextAvailableEntry(sub); - } - else - { - // TODO - } - } - } - - private E getNextAvailableEntry(final QueueConsumer sub) - { - QueueContext context = sub.getQueueContext(); - if(context != null) - { - E lastSeen = context.getLastSeenEntry(); - E releasedNode = context.getReleasedEntry(); - - E node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen); - - boolean expired = false; - while (node != null && (!node.isAvailable() || (expired = node.expired()) || !sub.hasInterest(node) || - !mightAssign(sub,node))) - { - if (expired) - { - expired = false; - if (node.acquire()) - { - dequeueEntry(node); - } - } - - if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node)) - { - QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null); - } - - lastSeen = context.getLastSeenEntry(); - releasedNode = context.getReleasedEntry(); - node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen); - } - return node; - } - else - { - return null; - } - } - - public boolean isEntryAheadOfConsumer(E entry, QueueConsumer sub) - { - QueueContext context = sub.getQueueContext(); - if(context != null) - { - E releasedNode = context.getReleasedEntry(); - return releasedNode != null && releasedNode.compareTo(entry) < 0; - } - else - { - return false; - } - } - - /** - * Used by queue Runners to asynchronously deliver messages to consumers. - * - * A queue Runner is started whenever a state change occurs, e.g when a new - * message arrives on the queue and cannot be immediately delivered to a - * consumer (i.e. asynchronous delivery is required). Unless there are - * SubFlushRunners operating (due to consumers unsuspending) which are - * capable of accepting/delivering all messages then these messages would - * otherwise remain on the queue. - * - * processQueue should be running while there are messages on the queue AND - * there are consumers that can deliver them. If there are no - * consumers capable of delivering the remaining messages on the queue - * then processQueue should stop to prevent spinning. - * - * Since processQueue is runs in a fixed size Executor, it should not run - * indefinitely to prevent starving other tasks of CPU (e.g jobs to process - * incoming messages may not be able to be scheduled in the thread pool - * because all threads are working on clearing down large queues). To solve - * this problem, after an arbitrary number of message deliveries the - * processQueue job stops iterating, resubmits itself to the executor, and - * ends the current instance - * - * @param runner the Runner to schedule - */ - public long processQueue(QueueRunner runner) - { - long stateChangeCount; - long previousStateChangeCount = Long.MIN_VALUE; - long rVal = Long.MIN_VALUE; - boolean deliveryIncomplete = true; - - boolean lastLoop = false; - int iterations = MAX_ASYNC_DELIVERIES; - - final int numSubs = _consumerList.size(); - - final int perSub = Math.max(iterations / Math.max(numSubs,1), 1); - - // For every message enqueue/requeue the we fire deliveryAsync() which - // increases _stateChangeCount. If _sCC changes whilst we are in our loop - // (detected by setting previousStateChangeCount to stateChangeCount in the loop body) - // then we will continue to run for a maximum of iterations. - // So whilst delivery/rejection is going on a processQueue thread will be running - while (iterations != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete)) - { - // we want to have one extra loop after every consumer has reached the point where it cannot move - // further, just in case the advance of one consumer in the last loop allows a different consumer to - // move forward in the next iteration - - if (previousStateChangeCount != stateChangeCount) - { - //further asynchronous delivery is required since the - //previous loop. keep going if iteration slicing allows. - lastLoop = false; - rVal = stateChangeCount; - } - - previousStateChangeCount = stateChangeCount; - boolean allConsumersDone = true; - boolean consumerDone; - - QueueConsumerList.ConsumerNodeIterator consumerNodeIterator = _consumerList.iterator(); - //iterate over the subscribers and try to advance their pointer - while (consumerNodeIterator.advance()) - { - QueueConsumer sub = consumerNodeIterator.getNode().getConsumer(); - sub.getSendLock(); - - try - { - for(int i = 0 ; i < perSub; i++) - { - //attempt delivery. returns true if no further delivery currently possible to this sub - consumerDone = attemptDelivery(sub, true); - if (consumerDone) - { - sub.flushBatched(); - if (lastLoop && !sub.isSuspended()) - { - sub.queueEmpty(); - } - break; - } - else - { - //this consumer can accept additional deliveries, so we must - //keep going after this (if iteration slicing allows it) - allConsumersDone = false; - lastLoop = false; - if(--iterations == 0) - { - sub.flushBatched(); - break; - } - } - - } - - sub.flushBatched(); - } - finally - { - sub.releaseSendLock(); - } - } - - if(allConsumersDone && lastLoop) - { - //We have done an extra loop already and there are again - //again no further delivery attempts possible, only - //keep going if state change demands it. - deliveryIncomplete = false; - } - else if(allConsumersDone) - { - //All consumers reported being done, but we have to do - //an extra loop if the iterations are not exhausted and - //there is still any work to be done - deliveryIncomplete = _consumerList.size() != 0; - lastLoop = true; - } - else - { - //some consumers can still accept more messages, - //keep going if iteration count allows. - lastLoop = false; - deliveryIncomplete = true; - } - - } - - // If iterations == 0 then the limiting factor was the time-slicing rather than available messages or credit - // therefore we should schedule this runner again (unless someone beats us to it :-) ). - if (iterations == 0) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Rescheduling runner:" + runner); - } - return 0L; - } - return rVal; - - } - - public void checkMessageStatus() - { - QueueEntryIterator> queueListIterator = _entries.iterator(); - - while (queueListIterator.advance()) - { - E node = queueListIterator.getNode(); - // Only process nodes that are not currently deleted and not dequeued - if (!node.isDeleted()) - { - // If the node has expired then acquire it - if (node.expired() && node.acquire()) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("Dequeuing expired node " + node); - } - // Then dequeue it. - dequeueEntry(node); - } - else - { - // There is a chance that the node could be deleted by - // the time the check actually occurs. So verify we - // can actually get the message to perform the check. - ServerMessage msg = node.getMessage(); - if (msg != null) - { - checkForNotification(msg); - } - } - } - } - - } - - public long getMinimumAlertRepeatGap() - { - return _minimumAlertRepeatGap; - } - - public void setMinimumAlertRepeatGap(long minimumAlertRepeatGap) - { - _minimumAlertRepeatGap = minimumAlertRepeatGap; - } - - public long getMaximumMessageAge() - { - return _maximumMessageAge; - } - - public void setMaximumMessageAge(long maximumMessageAge) - { - _maximumMessageAge = maximumMessageAge; - if (maximumMessageAge == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); - } - } - - public long getMaximumMessageCount() - { - return _maximumMessageCount; - } - - public void setMaximumMessageCount(final long maximumMessageCount) - { - _maximumMessageCount = maximumMessageCount; - if (maximumMessageCount == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); - } - - } - - public long getMaximumQueueDepth() - { - return _maximumQueueDepth; - } - - // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(final long maximumQueueDepth) - { - _maximumQueueDepth = maximumQueueDepth; - if (maximumQueueDepth == 0L) - { - _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); - } - - } - - public long getMaximumMessageSize() - { - return _maximumMessageSize; - } - - public void setMaximumMessageSize(final long maximumMessageSize) - { - _maximumMessageSize = maximumMessageSize; - if (maximumMessageSize == 0L) - { - _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); - } - else - { - _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); - } - } - - public long getCapacity() - { - return _capacity; - } - - public void setCapacity(long capacity) - { - _capacity = capacity; - } - - public long getFlowResumeCapacity() - { - return _flowResumeCapacity; - } - - public void setFlowResumeCapacity(long flowResumeCapacity) - { - _flowResumeCapacity = flowResumeCapacity; - - checkCapacity(); - } - - public boolean isOverfull() - { - return _overfull.get(); - } - - public Set getNotificationChecks() - { - return _notificationChecks; - } - - private static class DeleteDeleteTask implements Action - { - - private final Deletable _lifetimeObject; - private final Action _deleteQueueOwnerTask; - - public DeleteDeleteTask(final Deletable lifetimeObject, - final Action deleteQueueOwnerTask) - { - _lifetimeObject = lifetimeObject; - _deleteQueueOwnerTask = deleteQueueOwnerTask; - } - - @Override - public void performAction(final Deletable object) - { - _lifetimeObject.removeDeleteTask(_deleteQueueOwnerTask); - } - } - - private final class QueueEntryListener implements StateChangeListener - { - - private final QueueConsumer _sub; - - public QueueEntryListener(final QueueConsumer sub) - { - _sub = sub; - } - - public boolean equals(Object o) - { - return o instanceof SimpleAMQQueue.QueueEntryListener - && _sub == ((QueueEntryListener) o)._sub; - } - - public int hashCode() - { - return System.identityHashCode(_sub); - } - - public void stateChanged(E entry, QueueEntry.State oldSate, QueueEntry.State newState) - { - entry.removeStateChangeListener(this); - deliverAsync(_sub); - } - } - - public List getMessagesOnTheQueue(int num) - { - return getMessagesOnTheQueue(num, 0); - } - - public List getMessagesOnTheQueue(int num, int offset) - { - ArrayList ids = new ArrayList(num); - QueueEntryIterator it = _entries.iterator(); - for (int i = 0; i < offset; i++) - { - it.advance(); - } - - for (int i = 0; i < num && !it.atTail(); i++) - { - it.advance(); - ids.add(it.getNode().getMessage().getMessageNumber()); - } - return ids; - } - - public long getTotalEnqueueSize() - { - return _enqueueSize.get(); - } - - public long getTotalDequeueSize() - { - return _dequeueSize.get(); - } - - public long getPersistentByteEnqueues() - { - return _persistentMessageEnqueueSize.get(); - } - - public long getPersistentByteDequeues() - { - return _persistentMessageDequeueSize.get(); - } - - public long getPersistentMsgEnqueues() - { - return _persistentMessageEnqueueCount.get(); - } - - public long getPersistentMsgDequeues() - { - return _persistentMessageDequeueCount.get(); - } - - - @Override - public String toString() - { - return getName(); - } - - public long getUnackedMessageCount() - { - return _unackedMsgCount.get(); - } - - public long getUnackedMessageBytes() - { - return _unackedMsgBytes.get(); - } - - public void decrementUnackedMsgCount(E queueEntry) - { - _unackedMsgCount.decrementAndGet(); - _unackedMsgBytes.addAndGet(-queueEntry.getSize()); - } - - private void incrementUnackedMsgCount(E entry) - { - _unackedMsgCount.incrementAndGet(); - _unackedMsgBytes.addAndGet(entry.getSize()); - } - - public LogActor getLogActor() - { - return _logActor; - } - - public int getMaximumDeliveryCount() - { - return _maximumDeliveryCount; - } - - public void setMaximumDeliveryCount(final int maximumDeliveryCount) - { - _maximumDeliveryCount = maximumDeliveryCount; - } - - /** - * Checks if there is any notification to send to the listeners - */ - private void checkForNotification(ServerMessage msg) - { - final Set notificationChecks = getNotificationChecks(); - final AMQQueue.NotificationListener listener = _notificationListener; - - if(listener != null && !notificationChecks.isEmpty()) - { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - getMinimumAlertRepeatGap(); - - for (NotificationCheck check : notificationChecks) - { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) - { - if (check.notifyIfNecessary(msg, this, listener)) - { - _lastNotificationTimes[check.ordinal()] = currentTime; - } - } - } - } - } - - public void setNotificationListener(AMQQueue.NotificationListener listener) - { - _notificationListener = listener; - } - - @Override - public void setDescription(String description) - { - _description = description; - } - - @Override - public String getDescription() - { - return _description; - } - - public final > int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action> postEnqueueAction) - { - txn.enqueue(this,message, new ServerTransaction.Action() - { - MessageReference _reference = message.newReference(); - - public void postCommit() - { - try - { - SimpleAMQQueue.this.enqueue(message, postEnqueueAction); - } - finally - { - _reference.release(); - } - } - - public void onRollback() - { - _reference.release(); - } - }); - return 1; - - } - - @Override - public boolean verifySessionAccess(final AMQSessionModel session) - { - boolean allowed; - switch(_exclusivityPolicy) - { - case NONE: - allowed = true; - break; - case SESSION: - allowed = _exclusiveOwner == null || _exclusiveOwner == session; - break; - case CONNECTION: - allowed = _exclusiveOwner == null || _exclusiveOwner == session.getConnectionModel(); - break; - case PRINCIPAL: - allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getAuthorizedPrincipal()); - break; - case CONTAINER: - allowed = _exclusiveOwner == null || _exclusiveOwner.equals(session.getConnectionModel().getRemoteContainerName()); - break; - case LINK: - allowed = _exclusiveSubscriber == null || _exclusiveSubscriber.getSessionModel() == session; - break; - default: - throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); - } - return allowed; - } - - @Override - public synchronized void setExclusivityPolicy(final ExclusivityPolicy desiredPolicy) - throws ExistingConsumerPreventsExclusive - { - if(desiredPolicy != _exclusivityPolicy && !(desiredPolicy == null && _exclusivityPolicy == ExclusivityPolicy.NONE)) - { - switch(desiredPolicy) - { - case NONE: - _exclusiveOwner = null; - break; - case PRINCIPAL: - switchToPrincipalExclusivity(); - break; - case CONTAINER: - switchToContainerExclusivity(); - break; - case CONNECTION: - switchToConnectionExclusivity(); - break; - case SESSION: - switchToSessionExclusivity(); - break; - case LINK: - switchToLinkExclusivity(); - break; - } - _exclusivityPolicy = desiredPolicy; - } - } - - private void switchToLinkExclusivity() throws ExistingConsumerPreventsExclusive - { - switch (getConsumerCount()) - { - case 1: - _exclusiveSubscriber = getConsumerList().getHead().getConsumer(); - // deliberate fall through - case 0: - _exclusiveOwner = null; - break; - default: - throw new ExistingConsumerPreventsExclusive(); - } - - } - - private void switchToSessionExclusivity() throws ExistingConsumerPreventsExclusive - { - - switch(_exclusivityPolicy) - { - case NONE: - case PRINCIPAL: - case CONTAINER: - case CONNECTION: - AMQSessionModel session = null; - for(Consumer c : getConsumers()) - { - if(session == null) - { - session = c.getSessionModel(); - } - else if(!session.equals(c.getSessionModel())) - { - throw new ExistingConsumerPreventsExclusive(); - } - } - _exclusiveOwner = session; - break; - case LINK: - _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); - } - } - - private void switchToConnectionExclusivity() throws ExistingConsumerPreventsExclusive - { - switch(_exclusivityPolicy) - { - case NONE: - case CONTAINER: - case PRINCIPAL: - AMQConnectionModel con = null; - for(Consumer c : getConsumers()) - { - if(con == null) - { - con = c.getSessionModel().getConnectionModel(); - } - else if(!con.equals(c.getSessionModel().getConnectionModel())) - { - throw new ExistingConsumerPreventsExclusive(); - } - } - _exclusiveOwner = con; - break; - case SESSION: - _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel(); - break; - case LINK: - _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel(); - } - } - - private void switchToContainerExclusivity() throws ExistingConsumerPreventsExclusive - { - switch(_exclusivityPolicy) - { - case NONE: - case PRINCIPAL: - String containerID = null; - for(Consumer c : getConsumers()) - { - if(containerID == null) - { - containerID = c.getSessionModel().getConnectionModel().getRemoteContainerName(); - } - else if(!containerID.equals(c.getSessionModel().getConnectionModel().getRemoteContainerName())) - { - throw new ExistingConsumerPreventsExclusive(); - } - } - _exclusiveOwner = containerID; - break; - case CONNECTION: - _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getRemoteContainerName(); - break; - case SESSION: - _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getRemoteContainerName(); - break; - case LINK: - _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getRemoteContainerName(); - } - } - - private void switchToPrincipalExclusivity() throws ExistingConsumerPreventsExclusive - { - switch(_exclusivityPolicy) - { - case NONE: - case CONTAINER: - Principal principal = null; - for(Consumer c : getConsumers()) - { - if(principal == null) - { - principal = c.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); - } - else if(!principal.equals(c.getSessionModel().getConnectionModel().getAuthorizedPrincipal())) - { - throw new ExistingConsumerPreventsExclusive(); - } - } - _exclusiveOwner = principal; - break; - case CONNECTION: - _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQConnectionModel)_exclusiveOwner).getAuthorizedPrincipal(); - break; - case SESSION: - _exclusiveOwner = _exclusiveOwner == null ? null : ((AMQSessionModel)_exclusiveOwner).getConnectionModel().getAuthorizedPrincipal(); - break; - case LINK: - _exclusiveOwner = _exclusiveSubscriber == null ? null : _exclusiveSubscriber.getSessionModel().getConnectionModel().getAuthorizedPrincipal(); - } - } - - private class ClearOwnerAction implements Action - { - private final Deletable _lifetimeObject; - private DeleteDeleteTask _deleteTask; - - public ClearOwnerAction(final Deletable lifetimeObject) - { - _lifetimeObject = lifetimeObject; - } - - @Override - public void performAction(final Deletable object) - { - if(SimpleAMQQueue.this._exclusiveOwner == _lifetimeObject) - { - SimpleAMQQueue.this._exclusiveOwner = null; - } - if(_deleteTask != null) - { - removeDeleteTask(_deleteTask); - } - } - - public void setDeleteTask(final DeleteDeleteTask deleteTask) - { - _deleteTask = deleteTask; - } - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java deleted file mode 100644 index b6954e0696..0000000000 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -public interface SimpleQueueEntryList, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> extends QueueEntryList> -{ -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java index 05b874cd91..801f9261f8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java @@ -30,7 +30,7 @@ import org.apache.qpid.server.queue.SortedQueueEntry.Colour; * ISBN-13: 978-0262033848 * see http://en.wikipedia.org/wiki/Red-black_tree */ -public class SortedQueueEntryList implements SimpleQueueEntryList +public class SortedQueueEntryList implements QueueEntryListBase { private final SortedQueueEntry _head; private SortedQueueEntry _root; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java index e5050eca86..51c2d7c7b5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java @@ -25,7 +25,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; -public class StandardQueue extends SimpleAMQQueue +public class StandardQueue extends AbstractQueue { public StandardQueue(final VirtualHost virtualHost, final Map arguments) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java index 3026f33604..9ef32e61e2 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java @@ -46,7 +46,7 @@ class SubFlushRunner implements Runnable private final AtomicInteger _scheduled = new AtomicInteger(IDLE); - private static final long ITERATIONS = SimpleAMQQueue.MAX_ASYNC_DELIVERIES; + private static final long ITERATIONS = AbstractQueue.MAX_ASYNC_DELIVERIES; private final AtomicBoolean _stateChange = new AtomicBoolean(); public SubFlushRunner(QueueConsumerImpl sub) @@ -92,9 +92,9 @@ class SubFlushRunner implements Runnable } } - private SimpleAMQQueue getQueue() + private AbstractQueue getQueue() { - return (SimpleAMQQueue) _sub.getQueue(); + return (AbstractQueue) _sub.getQueue(); } public String toString() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java index 88caf73032..efdca3b67d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/DefaultUpgraderProvider.java @@ -132,7 +132,7 @@ public class DefaultUpgraderProvider implements UpgraderProvider else { return _exchangeRegistry.getExchange(exchangeId) != null - && _exchangeRegistry.getExchange(exchangeId).getType() == TopicExchange.TYPE; + && _exchangeRegistry.getExchange(exchangeId).getExchangeType() == TopicExchange.TYPE; } } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java index 258ee1ba85..0d4f2b2227 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java @@ -190,7 +190,7 @@ public abstract class AbstractTestLogSubject extends QpidTestCase exchangeParts.length); assertEquals("Exchange type not correct", - exchange.getType().getType(), exchangeParts[0]); + exchange.getExchangeType().getType(), exchangeParts[0]); assertEquals("Exchange name not correct", exchange.getName(), exchangeParts[1]); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 4e517609bc..e05a9d43da 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -43,7 +43,6 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.plugin.ExchangeType; -import org.apache.qpid.server.security.*; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.util.MapValueConverter; @@ -155,7 +154,7 @@ public class AMQQueueFactoryTest extends QpidTestCase when(exchange.getName()).thenReturn(name); when(exchange.getId()).thenReturn(id); - when(exchange.getType()).thenReturn(exType); + when(exchange.getExchangeType()).thenReturn(exType); final String typeName = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributeValues); when(exType.getType()).thenReturn(typeName); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java new file mode 100644 index 0000000000..d5e7a92e8a --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -0,0 +1,1128 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.queue; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Matchers.contains; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +import java.util.*; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.server.exchange.DirectExchange; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; +import org.apache.qpid.server.consumer.MockConsumer; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +abstract class AbstractQueueTestBase, Q extends AbstractQueue, L extends QueueEntryListBase> extends QpidTestCase +{ + private static final Logger _logger = Logger.getLogger(AbstractQueueTestBase.class); + + + private Q _queue; + private VirtualHost _virtualHost; + private String _qname = "qname"; + private String _owner = "owner"; + private String _routingKey = "routing key"; + private DirectExchange _exchange; + private MockConsumer _consumerTarget = new MockConsumer(); + private QueueConsumer _consumer; + private Map _arguments = Collections.emptyMap(); + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + + _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); + + Map attributes = new HashMap(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, _qname); + attributes.put(Queue.OWNER, _owner); + + _queue = (Q) _virtualHost.createQueue(attributes); + + _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); + } + + @Override + public void tearDown() throws Exception + { + try + { + _queue.stop(); + _virtualHost.close(); + } + finally + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + } + + public void testCreateQueue() throws Exception + { + _queue.stop(); + try + { + Map attributes = new HashMap(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + + _queue = (Q) _virtualHost.createQueue(attributes); + assertNull("Queue was created", _queue); + } + catch (IllegalArgumentException e) + { + assertTrue("Exception was not about missing name", + e.getMessage().contains("name")); + } + + Map attributes = new HashMap(_arguments); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, "differentName"); + _queue = (Q) _virtualHost.createQueue(attributes); + assertNotNull("Queue was not created", _queue); + } + + + public void testGetVirtualHost() + { + assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); + } + + public void testBinding() + { + _exchange.addBinding(_routingKey, _queue, Collections.EMPTY_MAP); + + assertTrue("Routing key was not bound", + _exchange.isBound(_routingKey)); + assertTrue("Queue was not bound to key", + _exchange.isBound(_routingKey,_queue)); + assertEquals("Exchange binding count", 1, + _queue.getBindings().size()); + assertEquals("Wrong exchange bound", _routingKey, + _queue.getBindings().get(0).getBindingKey()); + assertEquals("Wrong exchange bound", _exchange, + _queue.getBindings().get(0).getExchange()); + + _exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP); + assertFalse("Routing key was still bound", + _exchange.isBound(_routingKey)); + + } + + public void testRegisterConsumerThenEnqueueMessage() throws Exception + { + ServerMessage messageA = createMessage(new Long(24)); + + // Check adding a consumer adds it to the queue + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + assertEquals("Queue does not have consumer", 1, + _queue.getConsumerCount()); + assertEquals("Queue does not have active consumer", 1, + _queue.getActiveConsumerCount()); + + // Check sending a message ends up with the subscriber + _queue.enqueue(messageA, null); + try + { + Thread.sleep(2000L); + } + catch(InterruptedException e) + { + } + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull(_consumer.getQueueContext().getReleasedEntry()); + + // Check removing the consumer removes it's information from the queue + _consumer.close(); + assertTrue("Consumer still had queue", _consumerTarget.isClosed()); + assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); + assertFalse("Queue still has active consumer", + 1 == _queue.getActiveConsumerCount()); + + ServerMessage messageB = createMessage(new Long (25)); + _queue.enqueue(messageB, null); + assertNull(_consumer.getQueueContext()); + + } + + public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException + { + ServerMessage messageA = createMessage(new Long(24)); + _queue.enqueue(messageA, null); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + Thread.sleep(150); + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull("There should be no releasedEntry after an enqueue", + _consumer.getQueueContext().getReleasedEntry()); + } + + /** + * Tests enqueuing two messages. + */ + public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception + { + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + _queue.enqueue(messageA, null); + _queue.enqueue(messageB, null); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + Thread.sleep(150); + assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + assertNull("There should be no releasedEntry after enqueues", + _consumer.getQueueContext().getReleasedEntry()); + } + + /** + * Tests that a released queue entry is resent to the subscriber. Verifies also that the + * QueueContext._releasedEntry is reset to null after the entry has been reset. + */ + public void testReleasedMessageIsResentToSubscriber() throws Exception + { + + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + ServerMessage messageC = createMessage(new Long(26)); + + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + final ArrayList queueEntries = new ArrayList(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); + + /* Enqueue three messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + _queue.enqueue(messageC, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); + + /* Now release the first message only, causing it to be requeued */ + + queueEntries.get(0).release(); + + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to consumer", + 4, + _consumerTarget.getMessages().size()); + assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); + assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); + assertNull("releasedEntry should be cleared after requeue processed", + _consumer.getQueueContext().getReleasedEntry()); + } + + /** + * Tests that a released message that becomes expired is not resent to the subscriber. + * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. + * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. + */ + public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception + { + ServerMessage messageA = createMessage(new Long(24)); + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.SEES_REQUEUES, + Consumer.Option.ACQUIRES)); + + final ArrayList queueEntries = new ArrayList(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); + + /* Enqueue one message with expiration set for a short time in the future */ + + int messageExpirationOffset = 200; + final long expiration = System.currentTimeMillis() + messageExpirationOffset; + when(messageA.getExpiration()).thenReturn(expiration); + + _queue.enqueue(messageA, postEnqueueAction); + + int subFlushWaitTime = 150; + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to consumer", + 1, + _consumerTarget.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + + /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ + Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); + queueEntries.get(0).release(); + + Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads + + assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); + assertEquals("Total number of messages sent should not have changed", + 1, + _consumerTarget.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + assertNull("releasedEntry should be cleared after requeue processed", + _consumer.getQueueContext().getReleasedEntry()); + + } + + /** + * Tests that if a client releases entries 'out of order' (the order + * used by QueueEntryImpl.compareTo) that messages are still resent + * successfully. Specifically this test ensures the {@see AbstractQueue#requeue()} + * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. + */ + public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception + { + + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + ServerMessage messageC = createMessage(new Long(26)); + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + final ArrayList queueEntries = new ArrayList(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); + + /* Enqueue three messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + _queue.enqueue(messageC, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to consumer", + 3, + _consumerTarget.getMessages().size()); + assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); + assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); + + /* Now release the third and first message only, causing it to be requeued */ + + queueEntries.get(2).release(); + queueEntries.get(0).release(); + + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to consumer", + 5, + _consumerTarget.getMessages().size()); + assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); + assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); + assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); + assertNull("releasedEntry should be cleared after requeue processed", + _consumer.getQueueContext().getReleasedEntry()); + } + + + /** + * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a + * requeue resends a message to a single subscriber. + */ + public void testReleaseForQueueWithMultipleConsumers() throws Exception + { + ServerMessage messageA = createMessage(new Long(24)); + ServerMessage messageB = createMessage(new Long(25)); + + MockConsumer target1 = new MockConsumer(); + MockConsumer target2 = new MockConsumer(); + + + QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + + final ArrayList queueEntries = new ArrayList(); + EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); + + /* Enqueue two messages */ + + _queue.enqueue(messageA, postEnqueueAction); + _queue.enqueue(messageB, postEnqueueAction); + + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to both after enqueue", + 2, + target1.getMessages().size() + target2.getMessages().size()); + + /* Now release the first message only, causing it to be requeued */ + queueEntries.get(0).release(); + + Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads + + assertEquals("Unexpected total number of messages sent to both consumers after release", + 3, + target1.getMessages().size() + target2.getMessages().size()); + assertNull("releasedEntry should be cleared after requeue processed", + consumer1.getQueueContext().getReleasedEntry()); + assertNull("releasedEntry should be cleared after requeue processed", + consumer2.getQueueContext().getReleasedEntry()); + } + + public void testExclusiveConsumer() throws Exception + { + ServerMessage messageA = createMessage(new Long(24)); + // Check adding an exclusive consumer adds it to the queue + + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + assertEquals("Queue does not have consumer", 1, + _queue.getConsumerCount()); + assertEquals("Queue does not have active consumer", 1, + _queue.getActiveConsumerCount()); + + // Check sending a message ends up with the subscriber + _queue.enqueue(messageA, null); + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + } + assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); + + // Check we cannot add a second subscriber to the queue + MockConsumer subB = new MockConsumer(); + Exception ex = null; + try + { + + _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + } + catch (MessageSource.ExistingExclusiveConsumer e) + { + ex = e; + } + assertNotNull(ex); + + // Check we cannot add an exclusive subscriber to a queue with an + // existing consumer + _consumer.close(); + _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, + Consumer.Option.SEES_REQUEUES)); + + try + { + + _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test", + EnumSet.of(Consumer.Option.EXCLUSIVE)); + + } + catch (MessageSource.ExistingConsumerPreventsExclusive e) + { + ex = e; + } + assertNotNull(ex); + } + + + public void testResend() throws Exception + { + Long id = new Long(26); + ServerMessage message = createMessage(id); + + _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + + _queue.enqueue(message, new Action>() + { + @Override + public void performAction(final MessageInstance object) + { + QueueEntryImpl entry = (QueueEntryImpl) object; + entry.setRedelivered(); + _consumer.resend(entry); + + } + }); + + + + } + + public void testGetFirstMessageId() throws Exception + { + // Create message + Long messageId = new Long(23); + ServerMessage message = createMessage(messageId); + + // Put message on queue + _queue.enqueue(message, null); + // Get message id + Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); + + // Check message id + assertEquals("Message ID was wrong", messageId, testmsgid); + } + + public void testGetFirstFiveMessageIds() throws Exception + { + for (int i = 0 ; i < 5; i++) + { + // Create message + Long messageId = new Long(i); + ServerMessage message = createMessage(messageId); + // Put message on queue + _queue.enqueue(message, null); + } + // Get message ids + List msgids = _queue.getMessagesOnTheQueue(5); + + // Check message id + for (int i = 0; i < 5; i++) + { + Long messageId = new Long(i); + assertEquals("Message ID was wrong", messageId, msgids.get(i)); + } + } + + public void testGetLastFiveMessageIds() throws Exception + { + for (int i = 0 ; i < 10; i++) + { + // Create message + Long messageId = new Long(i); + ServerMessage message = createMessage(messageId); + // Put message on queue + _queue.enqueue(message, null); + } + // Get message ids + List msgids = _queue.getMessagesOnTheQueue(5, 5); + + // Check message id + for (int i = 0; i < 5; i++) + { + Long messageId = new Long(i+5); + assertEquals("Message ID was wrong", messageId, msgids.get(i)); + } + } + + public void testGetMessagesRangeOnTheQueue() throws Exception + { + for (int i = 1 ; i <= 10; i++) + { + // Create message + Long messageId = new Long(i); + ServerMessage message = createMessage(messageId); + // Put message on queue + _queue.enqueue(message, null); + } + + // Get non-existent 0th QueueEntry & check returned list was empty + // (the position parameters in this method are indexed from 1) + List entries = _queue.getMessagesRangeOnTheQueue(0, 0); + assertTrue(entries.size() == 0); + + // Check that when 'from' is 0 it is ignored and the range continues from 1 + entries = _queue.getMessagesRangeOnTheQueue(0, 2); + assertTrue(entries.size() == 2); + long msgID = entries.get(0).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 1L); + msgID = entries.get(1).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 2L); + + // Check that when 'from' is greater than 'to' the returned list is empty + entries = _queue.getMessagesRangeOnTheQueue(5, 4); + assertTrue(entries.size() == 0); + + // Get first QueueEntry & check id + entries = _queue.getMessagesRangeOnTheQueue(1, 1); + assertTrue(entries.size() == 1); + msgID = entries.get(0).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 1L); + + // Get 5th,6th,7th entries and check id's + entries = _queue.getMessagesRangeOnTheQueue(5, 7); + assertTrue(entries.size() == 3); + msgID = entries.get(0).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 5L); + msgID = entries.get(1).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 6L); + msgID = entries.get(2).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 7L); + + // Get 10th QueueEntry & check id + entries = _queue.getMessagesRangeOnTheQueue(10, 10); + assertTrue(entries.size() == 1); + msgID = entries.get(0).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 10L); + + // Get non-existent 11th QueueEntry & check returned set was empty + entries = _queue.getMessagesRangeOnTheQueue(11, 11); + assertTrue(entries.size() == 0); + + // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs + entries = _queue.getMessagesRangeOnTheQueue(9, 11); + assertTrue(entries.size() == 2); + msgID = entries.get(0).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 9L); + msgID = entries.get(1).getMessage().getMessageNumber(); + assertEquals("Message ID was wrong", msgID, 10L); + } + + + /** + * processQueue() is used when asynchronously delivering messages to + * consumers which could not be delivered immediately during the + * enqueue() operation. + * + * A defect within the method would mean that delivery of these messages may + * not occur should the Runner stop before all messages have been processed. + * Such a defect was discovered when Selectors were used such that one and + * only one consumer can/will accept any given messages, but multiple + * consumers are present, and one of the earlier consumers receives + * more messages than the others. + * + * This test is to validate that the processQueue() method is able to + * correctly deliver all of the messages present for asynchronous delivery + * to consumers in such a scenario. + */ + public void testProcessQueueWithUniqueSelectors() throws Exception + { + AbstractQueue testQueue = createNonAsyncDeliverQueue(); + + // retrieve the QueueEntryList the queue creates and insert the test + // messages, thus avoiding straight-through delivery attempts during + //enqueue() process. + QueueEntryList list = testQueue.getEntries(); + assertNotNull("QueueEntryList should have been created", list); + + QueueEntry msg1 = list.add(createMessage(1L)); + QueueEntry msg2 = list.add(createMessage(2L)); + QueueEntry msg3 = list.add(createMessage(3L)); + QueueEntry msg4 = list.add(createMessage(4L)); + QueueEntry msg5 = list.add(createMessage(5L)); + + // Create lists of the entries each consumer should be interested + // in.Bias over 50% of the messages to the first consumer so that + // the later consumers reject them and report being done before + // the first consumer as the processQueue method proceeds. + List msgListSub1 = createEntriesList(msg1, msg2, msg3); + List msgListSub2 = createEntriesList(msg4); + List msgListSub3 = createEntriesList(msg5); + + MockConsumer sub1 = new MockConsumer(msgListSub1); + MockConsumer sub2 = new MockConsumer(msgListSub2); + MockConsumer sub3 = new MockConsumer(msgListSub3); + + // register the consumers + testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", + EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + + //check that no messages have been delivered to the + //consumers during registration + assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); + assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); + + // call processQueue to deliver the messages + testQueue.processQueue(new QueueRunner(testQueue) + { + @Override + public void run() + { + // we don't actually want/need this runner to do any work + // because we we are already doing it! + } + }); + + // check expected messages delivered to correct consumers + verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages()); + verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages()); + verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages()); + } + + private AbstractQueue createNonAsyncDeliverQueue() + { + TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); + return new NonAsyncDeliverQueue(factory, getVirtualHost()); + } + + /** + * Tests that dequeued message is not present in the list returned form + * {@link AbstractQueue#getMessagesOnTheQueue()} + */ + public void testGetMessagesOnTheQueueWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue + List entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = ( entries.get(i).getMessage()).getMessageNumber(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that dequeued message is not present in the list returned form + * {@link AbstractQueue#getMessagesOnTheQueue(QueueEntryFilter)} + */ + public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // send test messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message + dequeueMessage(_queue, dequeueMessageIndex); + + // get messages on the queue with filter accepting all available messages + List entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() + { + public boolean accept(E entry) + { + return true; + } + + public boolean filterComplete() + { + return false; + } + }); + + // assert entries on the queue + assertEquals(messageNumber - 1, entries.size()); + int expectedId = 0; + for (int i = 0; i < messageNumber - 1; i++) + { + Long id = (entries.get(i).getMessage()).getMessageNumber(); + if (i == dequeueMessageIndex) + { + assertFalse("Message with id " + dequeueMessageIndex + + " was dequeued and should not be returned by method getMessagesOnTheQueue!", + new Long(expectedId).equals(id)); + expectedId++; + } + assertEquals("Expected message with id " + expectedId + " but got message with id " + id, + new Long(expectedId), id); + expectedId++; + } + } + + /** + * Tests that all messages including dequeued one are deleted from the queue + * on invocation of {@link AbstractQueue#clearQueue()} + */ + public void testClearQueueWithDequeuedEntry() throws Exception + { + int messageNumber = 4; + int dequeueMessageIndex = 1; + + // put messages into a test queue + enqueueGivenNumberOfMessages(_queue, messageNumber); + + // dequeue message on a test queue + dequeueMessage(_queue, dequeueMessageIndex); + + // clean queue + _queue.clearQueue(); + + // get queue entries + List entries = _queue.getMessagesOnTheQueue(); + + // assert queue entries + assertNotNull(entries); + assertEquals(0, entries.size()); + } + + + public void testNotificationFiredOnEnqueue() throws Exception + { + AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); + + _queue.setNotificationListener(listener); + _queue.setMaximumMessageCount(2); + + _queue.enqueue(createMessage(new Long(24)), null); + verifyZeroInteractions(listener); + + _queue.enqueue(createMessage(new Long(25)), null); + + verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); + } + + public void testNotificationFiredAsync() throws Exception + { + AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); + + _queue.enqueue(createMessage(new Long(24)), null); + _queue.enqueue(createMessage(new Long(25)), null); + _queue.enqueue(createMessage(new Long(26)), null); + + _queue.setNotificationListener(listener); + _queue.setMaximumMessageCount(2); + + verifyZeroInteractions(listener); + + _queue.checkMessageStatus(); + + verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); + } + + /** + * A helper method to put given number of messages into queue + *

+ * All messages are asserted that they are present on queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + */ + protected List enqueueGivenNumberOfMessages(Q queue, int messageNumber) + { + putGivenNumberOfMessages(queue, messageNumber); + + // make sure that all enqueued messages are on the queue + List entries = queue.getMessagesOnTheQueue(); + assertEquals(messageNumber, entries.size()); + for (int i = 0; i < messageNumber; i++) + { + assertEquals((long)i, (entries.get(i).getMessage()).getMessageNumber()); + } + return entries; + } + + /** + * A helper method to put given number of messages into queue + *

+ * Queue is not checked if messages are added into queue + * + * @param queue + * queue to put messages into + * @param messageNumber + * number of messages to put into queue + * @param queue + * @param messageNumber + */ + protected void putGivenNumberOfMessages(T queue, int messageNumber) + { + for (int i = 0; i < messageNumber; i++) + { + // Create message + ServerMessage message = null; + message = createMessage((long)i); + + // Put message on queue + queue.enqueue(message,null); + + } + try + { + Thread.sleep(2000L); + } + catch (InterruptedException e) + { + _logger.error("Thread interrupted", e); + } + } + + /** + * A helper method to dequeue an entry on queue with given index + * + * @param queue + * queue to dequeue message on + * @param dequeueMessageIndex + * entry index to dequeue. + */ + protected QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) + { + List entries = queue.getMessagesOnTheQueue(); + QueueEntry entry = entries.get(dequeueMessageIndex); + entry.acquire(); + entry.delete(); + assertTrue(entry.isDeleted()); + return entry; + } + + private List createEntriesList(QueueEntry... entries) + { + ArrayList entriesList = new ArrayList(); + for (QueueEntry entry : entries) + { + entriesList.add(entry.getMessage().getMessageHeader().getMessageId()); + } + return entriesList; + } + + protected void verifyReceivedMessages(List expected, + List delivered) + { + assertEquals("Consumer did not receive the expected number of messages", + expected.size(), delivered.size()); + + for (MessageInstance msg : expected) + { + assertTrue("Consumer did not receive msg: " + + msg.getMessage().getMessageNumber(), delivered.contains(msg)); + } + } + + public Q getQueue() + { + return _queue; + } + + protected void setQueue(Q queue) + { + _queue = queue; + } + + public MockConsumer getConsumer() + { + return _consumerTarget; + } + + public Map getArguments() + { + return _arguments; + } + + public void setArguments(Map arguments) + { + _arguments = arguments; + } + + + protected ServerMessage createMessage(Long id) + { + AMQMessageHeader header = mock(AMQMessageHeader.class); + when(header.getMessageId()).thenReturn(String.valueOf(id)); + ServerMessage message = mock(ServerMessage.class); + when(message.getMessageNumber()).thenReturn(id); + when(message.getMessageHeader()).thenReturn(header); + + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(message); + + + when(message.newReference()).thenReturn(ref); + + return message; + } + + private static class EntryListAddingAction implements Action> + { + private final ArrayList _queueEntries; + + public EntryListAddingAction(final ArrayList queueEntries) + { + _queueEntries = queueEntries; + } + + public void performAction(MessageInstance entry) + { + _queueEntries.add((QueueEntry) entry); + } + } + + + public VirtualHost getVirtualHost() + { + return _virtualHost; + } + + public String getQname() + { + return _qname; + } + + public String getOwner() + { + return _owner; + } + + public String getRoutingKey() + { + return _routingKey; + } + + public DirectExchange getExchange() + { + return _exchange; + } + + public MockConsumer getConsumerTarget() + { + return _consumerTarget; + } + + + static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory + { + + @Override + public NonAsyncDeliverList createQueueEntryList(final NonAsyncDeliverQueue queue) + { + return new NonAsyncDeliverList(queue); + } + } + + private static class NonAsyncDeliverEntry extends OrderedQueueEntry + { + + public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList) + { + super(queueEntryList); + } + + public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, + final ServerMessage message, + final long entryId) + { + super(queueEntryList, message, entryId); + } + + public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message) + { + super(queueEntryList, message); + } + } + + private static class NonAsyncDeliverList extends OrderedQueueEntryList + { + + private static final HeadCreator HEAD_CREATOR = + new HeadCreator() + { + + @Override + public NonAsyncDeliverEntry createHead(final NonAsyncDeliverList list) + { + return new NonAsyncDeliverEntry(list); + } + }; + + public NonAsyncDeliverList(final NonAsyncDeliverQueue queue) + { + super(queue, HEAD_CREATOR); + } + + @Override + protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage message) + { + return new NonAsyncDeliverEntry(this,message); + } + } + + + private static class NonAsyncDeliverQueue extends AbstractQueue + { + public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost) + { + super(vhost, attributes(), factory); + } + + private static Map attributes() + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUID.randomUUID()); + attributes.put(Queue.NAME, "test"); + attributes.put(Queue.DURABLE, false); + attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); + return attributes; + } + + @Override + public void deliverAsync(QueueConsumer sub) + { + // do nothing, i.e prevent deliveries by the SubFlushRunner + // when registering the new consumers + } + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index acddd49c04..f6c700f959 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -35,7 +35,7 @@ import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.when; -public class PriorityQueueTest extends SimpleAMQQueueTestBase +public class PriorityQueueTest extends AbstractQueueTestBase { @Override @@ -49,7 +49,7 @@ public class PriorityQueueTest extends SimpleAMQQueueTestBase { // Enqueue messages in order - SimpleAMQQueue queue = getQueue(); + AbstractQueue queue = getQueue(); queue.enqueue(createMessage(1L, (byte) 10), null); queue.enqueue(createMessage(2L, (byte) 4), null); queue.enqueue(createMessage(3L, (byte) 0), null); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java new file mode 100644 index 0000000000..234ac5dffa --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueThreadPoolTest.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.qpid.pool.ReferenceCountingExecutorService; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.test.utils.QpidTestCase; + +import java.util.HashMap; +import java.util.Map; + +public class QueueThreadPoolTest extends QpidTestCase +{ + + @Override + public void setUp() throws Exception + { + super.setUp(); + BrokerTestHelper.setUp(); + } + + @Override + public void tearDown() throws Exception + { + BrokerTestHelper.tearDown(); + super.tearDown(); + } + + public void test() throws Exception + { + int initialCount = ReferenceCountingExecutorService.getInstance().getReferenceCount(); + VirtualHost test = BrokerTestHelper.createVirtualHost("test"); + + try + { + Map attributes = new HashMap(); + attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); + attributes.put(Queue.NAME, "test"); + AMQQueue queue = test.createQueue(attributes); + + assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); + + assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount()); + + queue.stop(); + + assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount()); + } + finally + { + test.close(); + } + } +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java deleted file mode 100644 index 8f4ebf1b2e..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTestBase.java +++ /dev/null @@ -1,1128 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.queue; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Matchers.contains; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.when; - -import java.util.*; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.model.LifetimePolicy; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.exchange.ExchangeDefaults; -import org.apache.qpid.server.exchange.DirectExchange; -import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; -import org.apache.qpid.server.consumer.MockConsumer; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -abstract class SimpleAMQQueueTestBase, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> extends QpidTestCase -{ - private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class); - - - private Q _queue; - private VirtualHost _virtualHost; - private String _qname = "qname"; - private String _owner = "owner"; - private String _routingKey = "routing key"; - private DirectExchange _exchange; - private MockConsumer _consumerTarget = new MockConsumer(); - private QueueConsumer _consumer; - private Map _arguments = Collections.emptyMap(); - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - - _virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName()); - - Map attributes = new HashMap(_arguments); - attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); - attributes.put(Queue.NAME, _qname); - attributes.put(Queue.OWNER, _owner); - - _queue = (Q) _virtualHost.createQueue(attributes); - - _exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME); - } - - @Override - public void tearDown() throws Exception - { - try - { - _queue.stop(); - _virtualHost.close(); - } - finally - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - } - - public void testCreateQueue() throws Exception - { - _queue.stop(); - try - { - Map attributes = new HashMap(_arguments); - attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); - - _queue = (Q) _virtualHost.createQueue(attributes); - assertNull("Queue was created", _queue); - } - catch (IllegalArgumentException e) - { - assertTrue("Exception was not about missing name", - e.getMessage().contains("name")); - } - - Map attributes = new HashMap(_arguments); - attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); - attributes.put(Queue.NAME, "differentName"); - _queue = (Q) _virtualHost.createQueue(attributes); - assertNotNull("Queue was not created", _queue); - } - - - public void testGetVirtualHost() - { - assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); - } - - public void testBinding() - { - _exchange.addBinding(_routingKey, _queue, Collections.EMPTY_MAP); - - assertTrue("Routing key was not bound", - _exchange.isBound(_routingKey)); - assertTrue("Queue was not bound to key", - _exchange.isBound(_routingKey,_queue)); - assertEquals("Exchange binding count", 1, - _queue.getBindings().size()); - assertEquals("Wrong exchange bound", _routingKey, - _queue.getBindings().get(0).getBindingKey()); - assertEquals("Wrong exchange bound", _exchange, - _queue.getBindings().get(0).getExchange()); - - _exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP); - assertFalse("Routing key was still bound", - _exchange.isBound(_routingKey)); - - } - - public void testRegisterConsumerThenEnqueueMessage() throws Exception - { - ServerMessage messageA = createMessage(new Long(24)); - - // Check adding a consumer adds it to the queue - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); - - // Check sending a message ends up with the subscriber - _queue.enqueue(messageA, null); - try - { - Thread.sleep(2000L); - } - catch(InterruptedException e) - { - } - assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); - assertNull(_consumer.getQueueContext().getReleasedEntry()); - - // Check removing the consumer removes it's information from the queue - _consumer.close(); - assertTrue("Consumer still had queue", _consumerTarget.isClosed()); - assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); - assertFalse("Queue still has active consumer", - 1 == _queue.getActiveConsumerCount()); - - ServerMessage messageB = createMessage(new Long (25)); - _queue.enqueue(messageB, null); - assertNull(_consumer.getQueueContext()); - - } - - public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException - { - ServerMessage messageA = createMessage(new Long(24)); - _queue.enqueue(messageA, null); - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - Thread.sleep(150); - assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after an enqueue", - _consumer.getQueueContext().getReleasedEntry()); - } - - /** - * Tests enqueuing two messages. - */ - public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception - { - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); - _queue.enqueue(messageA, null); - _queue.enqueue(messageB, null); - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - Thread.sleep(150); - assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); - assertNull("There should be no releasedEntry after enqueues", - _consumer.getQueueContext().getReleasedEntry()); - } - - /** - * Tests that a released queue entry is resent to the subscriber. Verifies also that the - * QueueContext._releasedEntry is reset to null after the entry has been reset. - */ - public void testReleasedMessageIsResentToSubscriber() throws Exception - { - - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); - ServerMessage messageC = createMessage(new Long(26)); - - - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - final ArrayList queueEntries = new ArrayList(); - EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); - - /* Enqueue three messages */ - - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - _queue.enqueue(messageC, postEnqueueAction); - - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to consumer", - 3, - _consumerTarget.getMessages().size()); - assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); - assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - - /* Now release the first message only, causing it to be requeued */ - - queueEntries.get(0).release(); - - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to consumer", - 4, - _consumerTarget.getMessages().size()); - assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); - assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); - assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", - _consumer.getQueueContext().getReleasedEntry()); - } - - /** - * Tests that a released message that becomes expired is not resent to the subscriber. - * This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries. - * Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset. - */ - public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception - { - ServerMessage messageA = createMessage(new Long(24)); - - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); - - final ArrayList queueEntries = new ArrayList(); - EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); - - /* Enqueue one message with expiration set for a short time in the future */ - - int messageExpirationOffset = 200; - final long expiration = System.currentTimeMillis() + messageExpirationOffset; - when(messageA.getExpiration()).thenReturn(expiration); - - _queue.enqueue(messageA, postEnqueueAction); - - int subFlushWaitTime = 150; - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to consumer", - 1, - _consumerTarget.getMessages().size()); - assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - - /* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */ - Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10); - queueEntries.get(0).release(); - - Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads - - assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired()); - assertEquals("Total number of messages sent should not have changed", - 1, - _consumerTarget.getMessages().size()); - assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", - _consumer.getQueueContext().getReleasedEntry()); - - } - - /** - * Tests that if a client releases entries 'out of order' (the order - * used by QueueEntryImpl.compareTo) that messages are still resent - * successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()} - * can correctly move the _releasedEntry to an earlier position in the QueueEntry list. - */ - public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception - { - - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); - ServerMessage messageC = createMessage(new Long(26)); - - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - final ArrayList queueEntries = new ArrayList(); - EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); - - /* Enqueue three messages */ - - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - _queue.enqueue(messageC, postEnqueueAction); - - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to consumer", - 3, - _consumerTarget.getMessages().size()); - assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered()); - assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered()); - assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered()); - - /* Now release the third and first message only, causing it to be requeued */ - - queueEntries.get(2).release(); - queueEntries.get(0).release(); - - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to consumer", - 5, - _consumerTarget.getMessages().size()); - assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered()); - assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered()); - assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered()); - assertNull("releasedEntry should be cleared after requeue processed", - _consumer.getQueueContext().getReleasedEntry()); - } - - - /** - * Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a - * requeue resends a message to a single subscriber. - */ - public void testReleaseForQueueWithMultipleConsumers() throws Exception - { - ServerMessage messageA = createMessage(new Long(24)); - ServerMessage messageB = createMessage(new Long(25)); - - MockConsumer target1 = new MockConsumer(); - MockConsumer target2 = new MockConsumer(); - - - QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - - final ArrayList queueEntries = new ArrayList(); - EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); - - /* Enqueue two messages */ - - _queue.enqueue(messageA, postEnqueueAction); - _queue.enqueue(messageB, postEnqueueAction); - - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to both after enqueue", - 2, - target1.getMessages().size() + target2.getMessages().size()); - - /* Now release the first message only, causing it to be requeued */ - queueEntries.get(0).release(); - - Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads - - assertEquals("Unexpected total number of messages sent to both consumers after release", - 3, - target1.getMessages().size() + target2.getMessages().size()); - assertNull("releasedEntry should be cleared after requeue processed", - consumer1.getQueueContext().getReleasedEntry()); - assertNull("releasedEntry should be cleared after requeue processed", - consumer2.getQueueContext().getReleasedEntry()); - } - - public void testExclusiveConsumer() throws Exception - { - ServerMessage messageA = createMessage(new Long(24)); - // Check adding an exclusive consumer adds it to the queue - - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - assertEquals("Queue does not have consumer", 1, - _queue.getConsumerCount()); - assertEquals("Queue does not have active consumer", 1, - _queue.getActiveConsumerCount()); - - // Check sending a message ends up with the subscriber - _queue.enqueue(messageA, null); - try - { - Thread.sleep(2000L); - } - catch (InterruptedException e) - { - } - assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); - - // Check we cannot add a second subscriber to the queue - MockConsumer subB = new MockConsumer(); - Exception ex = null; - try - { - - _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - } - catch (MessageSource.ExistingExclusiveConsumer e) - { - ex = e; - } - assertNotNull(ex); - - // Check we cannot add an exclusive subscriber to a queue with an - // existing consumer - _consumer.close(); - _consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); - - try - { - - _consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE)); - - } - catch (MessageSource.ExistingConsumerPreventsExclusive e) - { - ex = e; - } - assertNotNull(ex); - } - - - public void testResend() throws Exception - { - Long id = new Long(26); - ServerMessage message = createMessage(id); - - _consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - - _queue.enqueue(message, new Action>() - { - @Override - public void performAction(final MessageInstance object) - { - QueueEntryImpl entry = (QueueEntryImpl) object; - entry.setRedelivered(); - _consumer.resend(entry); - - } - }); - - - - } - - public void testGetFirstMessageId() throws Exception - { - // Create message - Long messageId = new Long(23); - ServerMessage message = createMessage(messageId); - - // Put message on queue - _queue.enqueue(message, null); - // Get message id - Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); - - // Check message id - assertEquals("Message ID was wrong", messageId, testmsgid); - } - - public void testGetFirstFiveMessageIds() throws Exception - { - for (int i = 0 ; i < 5; i++) - { - // Create message - Long messageId = new Long(i); - ServerMessage message = createMessage(messageId); - // Put message on queue - _queue.enqueue(message, null); - } - // Get message ids - List msgids = _queue.getMessagesOnTheQueue(5); - - // Check message id - for (int i = 0; i < 5; i++) - { - Long messageId = new Long(i); - assertEquals("Message ID was wrong", messageId, msgids.get(i)); - } - } - - public void testGetLastFiveMessageIds() throws Exception - { - for (int i = 0 ; i < 10; i++) - { - // Create message - Long messageId = new Long(i); - ServerMessage message = createMessage(messageId); - // Put message on queue - _queue.enqueue(message, null); - } - // Get message ids - List msgids = _queue.getMessagesOnTheQueue(5, 5); - - // Check message id - for (int i = 0; i < 5; i++) - { - Long messageId = new Long(i+5); - assertEquals("Message ID was wrong", messageId, msgids.get(i)); - } - } - - public void testGetMessagesRangeOnTheQueue() throws Exception - { - for (int i = 1 ; i <= 10; i++) - { - // Create message - Long messageId = new Long(i); - ServerMessage message = createMessage(messageId); - // Put message on queue - _queue.enqueue(message, null); - } - - // Get non-existent 0th QueueEntry & check returned list was empty - // (the position parameters in this method are indexed from 1) - List entries = _queue.getMessagesRangeOnTheQueue(0, 0); - assertTrue(entries.size() == 0); - - // Check that when 'from' is 0 it is ignored and the range continues from 1 - entries = _queue.getMessagesRangeOnTheQueue(0, 2); - assertTrue(entries.size() == 2); - long msgID = entries.get(0).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 1L); - msgID = entries.get(1).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 2L); - - // Check that when 'from' is greater than 'to' the returned list is empty - entries = _queue.getMessagesRangeOnTheQueue(5, 4); - assertTrue(entries.size() == 0); - - // Get first QueueEntry & check id - entries = _queue.getMessagesRangeOnTheQueue(1, 1); - assertTrue(entries.size() == 1); - msgID = entries.get(0).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 1L); - - // Get 5th,6th,7th entries and check id's - entries = _queue.getMessagesRangeOnTheQueue(5, 7); - assertTrue(entries.size() == 3); - msgID = entries.get(0).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 5L); - msgID = entries.get(1).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 6L); - msgID = entries.get(2).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 7L); - - // Get 10th QueueEntry & check id - entries = _queue.getMessagesRangeOnTheQueue(10, 10); - assertTrue(entries.size() == 1); - msgID = entries.get(0).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 10L); - - // Get non-existent 11th QueueEntry & check returned set was empty - entries = _queue.getMessagesRangeOnTheQueue(11, 11); - assertTrue(entries.size() == 0); - - // Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs - entries = _queue.getMessagesRangeOnTheQueue(9, 11); - assertTrue(entries.size() == 2); - msgID = entries.get(0).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 9L); - msgID = entries.get(1).getMessage().getMessageNumber(); - assertEquals("Message ID was wrong", msgID, 10L); - } - - - /** - * processQueue() is used when asynchronously delivering messages to - * consumers which could not be delivered immediately during the - * enqueue() operation. - * - * A defect within the method would mean that delivery of these messages may - * not occur should the Runner stop before all messages have been processed. - * Such a defect was discovered when Selectors were used such that one and - * only one consumer can/will accept any given messages, but multiple - * consumers are present, and one of the earlier consumers receives - * more messages than the others. - * - * This test is to validate that the processQueue() method is able to - * correctly deliver all of the messages present for asynchronous delivery - * to consumers in such a scenario. - */ - public void testProcessQueueWithUniqueSelectors() throws Exception - { - SimpleAMQQueue testQueue = createNonAsyncDeliverQueue(); - - // retrieve the QueueEntryList the queue creates and insert the test - // messages, thus avoiding straight-through delivery attempts during - //enqueue() process. - QueueEntryList list = testQueue.getEntries(); - assertNotNull("QueueEntryList should have been created", list); - - QueueEntry msg1 = list.add(createMessage(1L)); - QueueEntry msg2 = list.add(createMessage(2L)); - QueueEntry msg3 = list.add(createMessage(3L)); - QueueEntry msg4 = list.add(createMessage(4L)); - QueueEntry msg5 = list.add(createMessage(5L)); - - // Create lists of the entries each consumer should be interested - // in.Bias over 50% of the messages to the first consumer so that - // the later consumers reject them and report being done before - // the first consumer as the processQueue method proceeds. - List msgListSub1 = createEntriesList(msg1, msg2, msg3); - List msgListSub2 = createEntriesList(msg4); - List msgListSub3 = createEntriesList(msg5); - - MockConsumer sub1 = new MockConsumer(msgListSub1); - MockConsumer sub2 = new MockConsumer(msgListSub2); - MockConsumer sub3 = new MockConsumer(msgListSub3); - - // register the consumers - testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); - - //check that no messages have been delivered to the - //consumers during registration - assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size()); - assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size()); - assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size()); - - // call processQueue to deliver the messages - testQueue.processQueue(new QueueRunner(testQueue) - { - @Override - public void run() - { - // we don't actually want/need this runner to do any work - // because we we are already doing it! - } - }); - - // check expected messages delivered to correct consumers - verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages()); - verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages()); - verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages()); - } - - private SimpleAMQQueue createNonAsyncDeliverQueue() - { - TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - return new NonAsyncDeliverQueue(factory, getVirtualHost()); - } - - /** - * Tests that dequeued message is not present in the list returned form - * {@link SimpleAMQQueue#getMessagesOnTheQueue()} - */ - public void testGetMessagesOnTheQueueWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // send test messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // get messages on the queue - List entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = ( entries.get(i).getMessage()).getMessageNumber(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not be returned by method getMessagesOnTheQueue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that dequeued message is not present in the list returned form - * {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)} - */ - public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry() - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // send test messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message - dequeueMessage(_queue, dequeueMessageIndex); - - // get messages on the queue with filter accepting all available messages - List entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter() - { - public boolean accept(E entry) - { - return true; - } - - public boolean filterComplete() - { - return false; - } - }); - - // assert entries on the queue - assertEquals(messageNumber - 1, entries.size()); - int expectedId = 0; - for (int i = 0; i < messageNumber - 1; i++) - { - Long id = (entries.get(i).getMessage()).getMessageNumber(); - if (i == dequeueMessageIndex) - { - assertFalse("Message with id " + dequeueMessageIndex - + " was dequeued and should not be returned by method getMessagesOnTheQueue!", - new Long(expectedId).equals(id)); - expectedId++; - } - assertEquals("Expected message with id " + expectedId + " but got message with id " + id, - new Long(expectedId), id); - expectedId++; - } - } - - /** - * Tests that all messages including dequeued one are deleted from the queue - * on invocation of {@link SimpleAMQQueue#clearQueue()} - */ - public void testClearQueueWithDequeuedEntry() throws Exception - { - int messageNumber = 4; - int dequeueMessageIndex = 1; - - // put messages into a test queue - enqueueGivenNumberOfMessages(_queue, messageNumber); - - // dequeue message on a test queue - dequeueMessage(_queue, dequeueMessageIndex); - - // clean queue - _queue.clearQueue(); - - // get queue entries - List entries = _queue.getMessagesOnTheQueue(); - - // assert queue entries - assertNotNull(entries); - assertEquals(0, entries.size()); - } - - - public void testNotificationFiredOnEnqueue() throws Exception - { - AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); - - _queue.setNotificationListener(listener); - _queue.setMaximumMessageCount(2); - - _queue.enqueue(createMessage(new Long(24)), null); - verifyZeroInteractions(listener); - - _queue.enqueue(createMessage(new Long(25)), null); - - verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); - } - - public void testNotificationFiredAsync() throws Exception - { - AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class); - - _queue.enqueue(createMessage(new Long(24)), null); - _queue.enqueue(createMessage(new Long(25)), null); - _queue.enqueue(createMessage(new Long(26)), null); - - _queue.setNotificationListener(listener); - _queue.setMaximumMessageCount(2); - - verifyZeroInteractions(listener); - - _queue.checkMessageStatus(); - - verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold")); - } - - /** - * A helper method to put given number of messages into queue - *

- * All messages are asserted that they are present on queue - * - * @param queue - * queue to put messages into - * @param messageNumber - * number of messages to put into queue - */ - protected List enqueueGivenNumberOfMessages(Q queue, int messageNumber) - { - putGivenNumberOfMessages(queue, messageNumber); - - // make sure that all enqueued messages are on the queue - List entries = queue.getMessagesOnTheQueue(); - assertEquals(messageNumber, entries.size()); - for (int i = 0; i < messageNumber; i++) - { - assertEquals((long)i, (entries.get(i).getMessage()).getMessageNumber()); - } - return entries; - } - - /** - * A helper method to put given number of messages into queue - *

- * Queue is not checked if messages are added into queue - * - * @param queue - * queue to put messages into - * @param messageNumber - * number of messages to put into queue - * @param queue - * @param messageNumber - */ - protected void putGivenNumberOfMessages(T queue, int messageNumber) - { - for (int i = 0; i < messageNumber; i++) - { - // Create message - ServerMessage message = null; - message = createMessage((long)i); - - // Put message on queue - queue.enqueue(message,null); - - } - try - { - Thread.sleep(2000L); - } - catch (InterruptedException e) - { - _logger.error("Thread interrupted", e); - } - } - - /** - * A helper method to dequeue an entry on queue with given index - * - * @param queue - * queue to dequeue message on - * @param dequeueMessageIndex - * entry index to dequeue. - */ - protected QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex) - { - List entries = queue.getMessagesOnTheQueue(); - QueueEntry entry = entries.get(dequeueMessageIndex); - entry.acquire(); - entry.delete(); - assertTrue(entry.isDeleted()); - return entry; - } - - private List createEntriesList(QueueEntry... entries) - { - ArrayList entriesList = new ArrayList(); - for (QueueEntry entry : entries) - { - entriesList.add(entry.getMessage().getMessageHeader().getMessageId()); - } - return entriesList; - } - - protected void verifyReceivedMessages(List expected, - List delivered) - { - assertEquals("Consumer did not receive the expected number of messages", - expected.size(), delivered.size()); - - for (MessageInstance msg : expected) - { - assertTrue("Consumer did not receive msg: " - + msg.getMessage().getMessageNumber(), delivered.contains(msg)); - } - } - - public Q getQueue() - { - return _queue; - } - - protected void setQueue(Q queue) - { - _queue = queue; - } - - public MockConsumer getConsumer() - { - return _consumerTarget; - } - - public Map getArguments() - { - return _arguments; - } - - public void setArguments(Map arguments) - { - _arguments = arguments; - } - - - protected ServerMessage createMessage(Long id) - { - AMQMessageHeader header = mock(AMQMessageHeader.class); - when(header.getMessageId()).thenReturn(String.valueOf(id)); - ServerMessage message = mock(ServerMessage.class); - when(message.getMessageNumber()).thenReturn(id); - when(message.getMessageHeader()).thenReturn(header); - - MessageReference ref = mock(MessageReference.class); - when(ref.getMessage()).thenReturn(message); - - - when(message.newReference()).thenReturn(ref); - - return message; - } - - private static class EntryListAddingAction implements Action> - { - private final ArrayList _queueEntries; - - public EntryListAddingAction(final ArrayList queueEntries) - { - _queueEntries = queueEntries; - } - - public void performAction(MessageInstance entry) - { - _queueEntries.add((QueueEntry) entry); - } - } - - - public VirtualHost getVirtualHost() - { - return _virtualHost; - } - - public String getQname() - { - return _qname; - } - - public String getOwner() - { - return _owner; - } - - public String getRoutingKey() - { - return _routingKey; - } - - public DirectExchange getExchange() - { - return _exchange; - } - - public MockConsumer getConsumerTarget() - { - return _consumerTarget; - } - - - static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory - { - - @Override - public NonAsyncDeliverList createQueueEntryList(final NonAsyncDeliverQueue queue) - { - return new NonAsyncDeliverList(queue); - } - } - - private static class NonAsyncDeliverEntry extends OrderedQueueEntry - { - - public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList) - { - super(queueEntryList); - } - - public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, - final ServerMessage message, - final long entryId) - { - super(queueEntryList, message, entryId); - } - - public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message) - { - super(queueEntryList, message); - } - } - - private static class NonAsyncDeliverList extends OrderedQueueEntryList - { - - private static final HeadCreator HEAD_CREATOR = - new HeadCreator() - { - - @Override - public NonAsyncDeliverEntry createHead(final NonAsyncDeliverList list) - { - return new NonAsyncDeliverEntry(list); - } - }; - - public NonAsyncDeliverList(final NonAsyncDeliverQueue queue) - { - super(queue, HEAD_CREATOR); - } - - @Override - protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage message) - { - return new NonAsyncDeliverEntry(this,message); - } - } - - - private static class NonAsyncDeliverQueue extends SimpleAMQQueue - { - public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost) - { - super(vhost, attributes(), factory); - } - - private static Map attributes() - { - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUID.randomUUID()); - attributes.put(Queue.NAME, "test"); - attributes.put(Queue.DURABLE, false); - attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT); - return attributes; - } - - @Override - public void deliverAsync(QueueConsumer sub) - { - // do nothing, i.e prevent deliveries by the SubFlushRunner - // when registering the new consumers - } - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java deleted file mode 100644 index 1d4bfa9960..0000000000 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.queue; - -import org.apache.qpid.pool.ReferenceCountingExecutorService; -import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.util.BrokerTestHelper; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.test.utils.QpidTestCase; - -import java.util.HashMap; -import java.util.Map; - -public class SimpleAMQQueueThreadPoolTest extends QpidTestCase -{ - - @Override - public void setUp() throws Exception - { - super.setUp(); - BrokerTestHelper.setUp(); - } - - @Override - public void tearDown() throws Exception - { - BrokerTestHelper.tearDown(); - super.tearDown(); - } - - public void test() throws Exception - { - int initialCount = ReferenceCountingExecutorService.getInstance().getReferenceCount(); - VirtualHost test = BrokerTestHelper.createVirtualHost("test"); - - try - { - Map attributes = new HashMap(); - attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID()); - attributes.put(Queue.NAME, "test"); - AMQQueue queue = test.createQueue(attributes); - - assertFalse("Creation did not start Pool.", ReferenceCountingExecutorService.getInstance().getPool().isShutdown()); - - assertEquals("References not increased", initialCount + 1, ReferenceCountingExecutorService.getInstance().getReferenceCount()); - - queue.stop(); - - assertEquals("References not decreased", initialCount , ReferenceCountingExecutorService.getInstance().getReferenceCount()); - } - finally - { - test.close(); - } - } -} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 4dd0423c27..da69700f6b 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; -public class StandardQueueTest extends SimpleAMQQueueTestBase +public class StandardQueueTest extends AbstractQueueTestBase { public void testCreationFailsWithNoVhost() @@ -162,7 +162,7 @@ public class StandardQueueTest extends SimpleAMQQueueTestBase + private static class DequeuedQueue extends AbstractQueue { public DequeuedQueue(VirtualHost virtualHost) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java index 6801c50722..324a0a809e 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java @@ -53,8 +53,6 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; @@ -86,11 +84,11 @@ public class DurableConfigurationRecovererTest extends QpidTestCase _directExchange = mock(Exchange.class); - when(_directExchange.getType()).thenReturn(DirectExchange.TYPE); + when(_directExchange.getExchangeType()).thenReturn(DirectExchange.TYPE); _topicExchange = mock(Exchange.class); - when(_topicExchange.getType()).thenReturn(TopicExchange.TYPE); + when(_topicExchange.getExchangeType()).thenReturn(TopicExchange.TYPE); AMQQueue queue = mock(AMQQueue.class); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index c03daf20b3..8caf4b3ab5 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -926,7 +926,7 @@ public class ServerSessionDelegate extends SessionDelegate { exception(session, method, ExecutionErrorCode.NOT_FOUND, "Exchange: '" + method.getExchange() + "' not found"); } - else if(exchange.getType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) + else if(exchange.getExchangeType().equals(HeadersExchange.TYPE) && (!method.hasArguments() || method.getArguments() == null || !method.getArguments().containsKey("x-match"))) { exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, "Bindings to an exchange of type " + HeadersExchange.TYPE.getType() + " require an x-match header"); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java index 8eb6c63542..78278b09c8 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java @@ -119,7 +119,7 @@ public class QueueBindHandler implements StateAwareMethodListener if (!exch.isBound(bindingKey, arguments, queue)) { - if(!exch.addBinding(bindingKey, queue, arguments) && TopicExchange.TYPE.equals(exch.getType())) + if(!exch.addBinding(bindingKey, queue, arguments) && TopicExchange.TYPE.equals(exch.getExchangeType())) { Binding oldBinding = exch.getBinding(bindingKey, queue, arguments); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index f757137cf1..5d9cd4b80a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -249,7 +249,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { if(!hasBindingFilter && entry.getValue() instanceof ExactSubjectFilter - && exchange.getType() == DirectExchange.TYPE) + && exchange.getExchangeType() == DirectExchange.TYPE) { ExactSubjectFilter filter = (ExactSubjectFilter) filters.values().iterator().next(); source.setFilter(filters); @@ -259,7 +259,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS } else if(!hasBindingFilter && entry.getValue() instanceof MatchingSubjectFilter - && exchange.getType() == TopicExchange.TYPE) + && exchange.getExchangeType() == TopicExchange.TYPE) { MatchingSubjectFilter filter = (MatchingSubjectFilter) filters.values().iterator().next(); source.setFilter(filters); -- cgit v1.2.1