summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-10-25 18:13:35 +0000
committerAlan Conway <aconway@apache.org>2013-10-25 18:13:35 +0000
commitcafc3478000e381db390bcd97615518cd1f02d00 (patch)
treeb3d5285f1cabff50492a3af2ee4b67d83dcd02f4 /qpid/cpp/src
parentde782d5e01159cb6a3283e917d47cab70c9730f9 (diff)
downloadqpid-python-cafc3478000e381db390bcd97615518cd1f02d00.tar.gz
QPID-4287: Poor performance when a priority queue with a ring queue policy has a large backlog
LossyQueue::checkDepth was performing an unintended linear search of its messages when a new message was added at capacity. Since the messages are in priority order, only the tail message on the queue needs to be compared with the new message to determine which of them should be dropped. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1535803 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/LossyQueue.cpp15
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp29
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.h17
3 files changed, 43 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.cpp b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
index ba7dfd11a1..ee13d7733a 100644
--- a/qpid/cpp/src/qpid/broker/LossyQueue.cpp
+++ b/qpid/cpp/src/qpid/broker/LossyQueue.cpp
@@ -51,8 +51,19 @@ bool LossyQueue::checkDepth(const QueueDepth& increment, const Message& message)
while (settings.maxDepth && (settings.maxDepth - current < increment)) {
QPID_LOG(debug, "purging " << name << ": current depth is [" << current << "], max depth is [" << settings.maxDepth << "], new message has size " << increment.getSize());
qpid::sys::Mutex::ScopedUnlock u(messageLock);
- //TODO: arguably we should try and purge expired messages first but that is potentially expensive
- if (remove(1, settings.priorities ? boost::bind(&isLowerPriorityThan, message.getPriority(), _1) : MessagePredicate(), boost::bind(&reroute, alternateExchange, _1), PURGE, false)) {
+ //TODO: arguably we should try and purge expired messages first but that
+ //is potentially expensive
+
+ // Note: in the case of a priority queue we are only comparing the new mesage
+ // with single lowest-priority message, hence the final parameter maxTests
+ // is 1 in this case, so we only test one message for removal.
+ if (remove(1,
+ settings.priorities ?
+ boost::bind(&isLowerPriorityThan, message.getPriority(), _1) :
+ MessagePredicate(), boost::bind(&reroute, alternateExchange, _1),
+ PURGE, false,
+ settings.priorities ? 1 : 0))
+ {
if (mgmtObject) {
mgmtObject->inc_discardsRing(1);
if (brokerMgmtObject)
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 29e7c06e90..19b18e1b0e 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -706,30 +706,29 @@ namespace {
}
} // end namespace
-uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f, SubscriptionType type, bool triggerAutoDelete)
+uint32_t Queue::remove(const uint32_t maxCount, MessagePredicate p, MessageFunctor f,
+ SubscriptionType type, bool triggerAutoDelete, uint32_t maxTests)
{
ScopedAutoDelete autodelete(*this);
std::deque<Message> removed;
{
QueueCursor c(type);
- uint32_t count(0);
+ uint32_t count(0), tests(0);
Mutex::ScopedLock locker(messageLock);
Message* m = messages->next(c);
while (m){
+ if (maxTests && tests++ >= maxTests) break;
if (!p || p(*m)) {
- if (!maxCount || count++ < maxCount) {
- if (m->getState() == AVAILABLE) {
- //don't actually acquire, just act as if we did
- observeAcquire(*m, locker);
- }
- observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
- removed.push_back(*m);//takes a copy of the message
- if (!messages->deleted(c)) {
- QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
- assert(false);
- }
- } else {
- break;
+ if (maxCount && count++ >= maxCount) break;
+ if (m->getState() == AVAILABLE) {
+ //don't actually acquire, just act as if we did
+ observeAcquire(*m, locker);
+ }
+ observeDequeue(*m, locker, triggerAutoDelete ? &autodelete : 0);
+ removed.push_back(*m);//takes a copy of the message
+ if (!messages->deleted(c)) {
+ QPID_LOG(warning, "Failed to correctly remove message from " << name << "; state is not consistent!");
+ assert(false);
}
}
m = messages->next(c);
diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h
index a7eb71c6bb..3622b06dbd 100644
--- a/qpid/cpp/src/qpid/broker/Queue.h
+++ b/qpid/cpp/src/qpid/broker/Queue.h
@@ -255,7 +255,22 @@ class Queue : public boost::enable_shared_from_this<Queue>,
void abandoned(const Message& message);
bool checkNotDeleted(const Consumer::shared_ptr&);
void notifyDeleted();
- uint32_t remove(uint32_t maxCount, MessagePredicate, MessageFunctor, SubscriptionType, bool triggerAutoDelete);
+
+ /** Remove messages from the queue:
+ *@param maxCount Maximum number of messages to remove, 0 means unlimited.
+ *@param p Only remove messages for which p(msg) is true.
+ *@param f Call f on each message that is removed.
+ *@param st Use a cursor of this SubscriptionType to iterate messages to remove.
+ *@param triggerAutoDelete If true removing messages may trigger aut-delete.
+ *@param maxTests Max number of messages to test for removal, 0 means unlimited.
+ *@return Number of messages removed.
+ */
+ uint32_t remove(uint32_t maxCount,
+ MessagePredicate p, MessageFunctor f,
+ SubscriptionType st,
+ bool triggerAutoDelete,
+ uint32_t maxTests=0);
+
virtual bool checkDepth(const QueueDepth& increment, const Message&);
void tryAutoDelete();
public: