diff options
author | Gordon Sim <gsim@apache.org> | 2016-06-15 08:47:59 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2016-06-15 08:47:59 +0000 |
commit | 7ab8ebde50308f76428359c0120473c4d491b55a (patch) | |
tree | 8992337581991eb83f728baaab0de351384bf17a | |
parent | ac0636922f3d15a55b5ede7c01b236c047f9603a (diff) | |
download | qpid-python-7ab8ebde50308f76428359c0120473c4d491b55a.tar.gz |
QPID-7302: Restart delayed auto-delete timer if the queue is declared
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1748523 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 29 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.h | 5 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.cpp | 43 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueueRegistry.h | 14 |
4 files changed, 62 insertions, 29 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d90bd1110a..fea5946247 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1297,9 +1297,10 @@ boost::shared_ptr<Exchange> Queue::getAlternateExchange() struct AutoDeleteTask : qpid::sys::TimerTask { Queue::shared_ptr queue; + long expectedVersion; AutoDeleteTask(Queue::shared_ptr q, AbsTime fireTime) - : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q) {} + : qpid::sys::TimerTask(fireTime, "DelayedAutoDeletion:"+q->getName()), queue(q), expectedVersion(q->version) {} void fire() { @@ -1307,7 +1308,7 @@ struct AutoDeleteTask : qpid::sys::TimerTask //created, but then became unused again before the task fired; //in this case ignore this request as there will have already //been a later task added - queue->tryAutoDelete(); + queue->tryAutoDelete(expectedVersion); } }; @@ -1320,29 +1321,37 @@ void Queue::scheduleAutoDelete(bool immediate) broker->getTimer().add(autoDeleteTask); QPID_LOG(debug, "Timed auto-delete for " << getName() << " initiated"); } else { - tryAutoDelete(); + tryAutoDelete(version); } } } -void Queue::tryAutoDelete() +void Queue::tryAutoDelete(long expectedVersion) { bool proceed(false); { Mutex::ScopedLock locker(messageLock); if (!deleted && checkAutoDelete(locker)) { proceed = true; - deleted = true; } } if (proceed) { - broker->getQueues().destroy(name); - if (broker->getAcl()) - broker->getAcl()->recordDestroyQueue(name); + if (broker->getQueues().destroyIfUntouched(name, expectedVersion)) { + { + Mutex::ScopedLock locker(messageLock); + deleted = true; + } + if (broker->getAcl()) + broker->getAcl()->recordDestroyQueue(name); - QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")"); - destroyed(); + QPID_LOG_CAT(debug, model, "Auto-delete queue deleted: " << name << " (" << deleted << ")"); + destroyed(); + } else { + //queue was accessed since the delayed auto-delete was scheduled, so try again + QPID_LOG_CAT(debug, model, "Auto-delete interrupted for queue: " << name); + scheduleAutoDelete(); + } } else { QPID_LOG_CAT(debug, model, "Auto-delete queue could not be deleted: " << name); } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 875b996637..150ad1ce12 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -37,6 +37,7 @@ #include "qpid/framing/FieldTable.h" #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/AtomicCount.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Monitor.h" #include "qpid/management/Manageable.h" @@ -219,6 +220,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask; boost::shared_ptr<MessageDistributor> allocator; boost::scoped_ptr<Selector> selector; + qpid::sys::AtomicCount version; // Redirect source and target refer to each other. Only one is source. Queue::shared_ptr redirectPeer; @@ -271,7 +273,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, uint32_t maxTests=0); virtual bool checkDepth(const QueueDepth& increment, const Message&); - void tryAutoDelete(); + void tryAutoDelete(long expectedVersion); public: typedef std::vector<shared_ptr> vector; @@ -533,6 +535,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, static bool isExpired(const std::string& queueName, const Message&, qpid::sys::AbsTime); friend class QueueFactory; + friend class QueueRegistry; }; } } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp index 1283a42e6d..2101d51fc2 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.cpp @@ -74,6 +74,7 @@ QueueRegistry::declare(const string& name, const QueueSettings& settings, result = std::pair<Queue::shared_ptr, bool>(queue, true); } else { result = std::pair<Queue::shared_ptr, bool>(i->second, false); + ++(i->second->version); } if (getBroker() && getBroker()->getManagementAgent()) { getBroker()->getManagementAgent()->raiseEvent( @@ -97,17 +98,41 @@ void QueueRegistry::destroy( QueueMap::iterator i = queues.find(name); if (i != queues.end()) { q = i->second; - queues.erase(i); - if (getBroker()) { - // NOTE: queueDestroy and raiseEvent must be called with the - // lock held in order to ensure events are generated - // in the correct order. - getBroker()->getBrokerObservers().queueDestroy(q); - if (getBroker()->getManagementAgent()) - getBroker()->getManagementAgent()->raiseEvent( - _qmf::EventQueueDelete(connectionId, userId, name)); + eraseLH(i, q, name, connectionId, userId); + } + } +} + +void QueueRegistry::eraseLH(QueueMap::iterator i, Queue::shared_ptr q, const string& name, const string& connectionId, const string& userId) +{ + queues.erase(i); + if (getBroker()) { + // NOTE: queueDestroy and raiseEvent must be called with the + // lock held in order to ensure events are generated + // in the correct order. + getBroker()->getBrokerObservers().queueDestroy(q); + if (getBroker()->getManagementAgent()) + getBroker()->getManagementAgent()->raiseEvent( + _qmf::EventQueueDelete(connectionId, userId, name)); + } +} + + +bool QueueRegistry::destroyIfUntouched(const string& name, long version, + const string& connectionId, const string& userId) +{ + Queue::shared_ptr q; + { + qpid::sys::RWlock::ScopedWlock locker(lock); + QueueMap::iterator i = queues.find(name); + if (i != queues.end()) { + q = i->second; + if (q->version == version) { + eraseLH(i, q, name, connectionId, userId); + return true; } } + return false; } } diff --git a/qpid/cpp/src/qpid/broker/QueueRegistry.h b/qpid/cpp/src/qpid/broker/QueueRegistry.h index af4e8e50fb..0ff96b6989 100644 --- a/qpid/cpp/src/qpid/broker/QueueRegistry.h +++ b/qpid/cpp/src/qpid/broker/QueueRegistry.h @@ -80,15 +80,9 @@ class QueueRegistry : private QueueFactory { const std::string& connectionId=std::string(), const std::string& userId=std::string()); - template <class Test> bool destroyIf(const std::string& name, Test test) - { - if (test()) { - destroy(name); - return true; - } else { - return false; - } - } + QPID_BROKER_EXTERN bool destroyIfUntouched(const std::string& name, long version, + const std::string& connectionId=std::string(), + const std::string& userId=std::string()); /** * Find the named queue. Return 0 if not found. @@ -126,6 +120,8 @@ private: typedef std::map<std::string, boost::shared_ptr<Queue> > QueueMap; QueueMap queues; mutable qpid::sys::RWlock lock; + + void eraseLH(QueueMap::iterator, boost::shared_ptr<Queue>, const std::string& name, const std::string& connectionId, const std::string& userId); }; |