diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/QueuePolicy.cpp | 144 |
1 files changed, 70 insertions, 74 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp index 39afe90134..a8aa674c53 100644 --- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp @@ -28,8 +28,8 @@ using namespace qpid::broker; using namespace qpid::framing; -QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {} +QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {} void QueuePolicy::enqueued(uint64_t _size) { @@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size) void QueuePolicy::dequeued(uint64_t _size) { - //Note: underflow detection is not reliable in the face of - //concurrent updates (at present locking in Queue.cpp prevents - //these anyway); updates are atomic and are safe regardless. if (maxCount) { - if (count.get() > 0) { + if (count > 0) { --count; } else { throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this)); } } if (maxSize) { - if (_size > size.get()) { + if (_size > size) { throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this)); } else { size -= _size; @@ -58,47 +55,47 @@ void QueuePolicy::dequeued(uint64_t _size) } } -bool QueuePolicy::checkLimit(const QueuedMessage& m) +bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { - bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize; - bool countExceeded = maxCount && (count.get() + 1) > maxCount; + bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize; + bool countExceeded = maxCount && (count + 1) > maxCount; bool exceeded = sizeExceeded || countExceeded; if (exceeded) { if (!policyExceeded) { - policyExceeded = true; - if (m.queue) { - if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName()); - if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName()); - } + policyExceeded = true; + if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name); + if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name); } } else { if (policyExceeded) { policyExceeded = false; - if (m.queue) { - QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName()); - } + QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name); } } return !exceeded; } -void QueuePolicy::tryEnqueue(const QueuedMessage& m) +void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m) { if (checkLimit(m)) { - enqueued(m); + enqueued(m->contentSize()); } else { - std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue"); - throw ResourceLimitExceededException( - QPID_MSG("Policy exceeded on " << queue << " by message " << m.position - << " of size " << m.payload->contentSize() << " , policy: " << *this)); + throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this)); } } -void QueuePolicy::enqueued(const QueuedMessage& m) +void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m) { - enqueued(m.payload->contentSize()); + enqueued(m->contentSize()); } +void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m) +{ + dequeued(m->contentSize()); +} + +void QueuePolicy::enqueued(const QueuedMessage&) {} + void QueuePolicy::dequeued(const QueuedMessage& m) { dequeued(m.payload->contentSize()); @@ -132,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings) std::transform(t.begin(), t.end(), t.begin(), tolower); if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t; } - return FLOW_TO_DISK; + return REJECT; } void QueuePolicy::setDefaultMaxSize(uint64_t s) @@ -140,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s) defaultMaxSize = s; } +void QueuePolicy::getPendingDequeues(Messages&) {} @@ -148,8 +146,8 @@ void QueuePolicy::encode(Buffer& buffer) const { buffer.putLong(maxCount); buffer.putLongLong(maxSize); - buffer.putLong(count.get()); - buffer.putLongLong(size.get()); + buffer.putLong(count); + buffer.putLongLong(size); } void QueuePolicy::decode ( Buffer& buffer ) @@ -179,16 +177,18 @@ const std::string QueuePolicy::RING("ring"); const std::string QueuePolicy::RING_STRICT("ring_strict"); uint64_t QueuePolicy::defaultMaxSize(0); -FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) : - QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {} +FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) : + QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {} -bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m) +bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { - return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m); + if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); + return true; } -RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} +RingQueuePolicy::RingQueuePolicy(const std::string& _name, + uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : + QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {} bool before(const QueuedMessage& a, const QueuedMessage& b) { @@ -197,15 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b) void RingQueuePolicy::enqueued(const QueuedMessage& m) { - QueuePolicy::enqueued(m); - qpid::sys::Mutex::ScopedLock l(lock); //need to insert in correct location based on position queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m); } void RingQueuePolicy::dequeued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue if (find(m, pendingDequeues, true) || find(m, queue, true)) { //now update count and size @@ -215,49 +212,32 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m) bool RingQueuePolicy::isEnqueued(const QueuedMessage& m) { - qpid::sys::Mutex::ScopedLock l(lock); //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this return find(m, pendingDequeues, false) || find(m, queue, false); } -bool RingQueuePolicy::checkLimit(const QueuedMessage& m) +bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m) { if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept QueuedMessage oldest; - { - qpid::sys::Mutex::ScopedLock l(lock); - if (queue.empty()) { - QPID_LOG(debug, "Message too large for ring queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << " [" << *this << "] " - << ": message size = " << m.payload->contentSize() << " bytes"); - return false; - } - oldest = queue.front(); + if (queue.empty()) { + QPID_LOG(debug, "Message too large for ring queue " << name + << " [" << *this << "] " + << ": message size = " << m->contentSize() << " bytes"); + return false; } + oldest = queue.front(); if (oldest.queue->acquire(oldest) || !strict) { - { - //TODO: fix this! In the current code, this method is - //only ever called with the Queue lock already taken. This - //should not be relied upon going forward however and - //clearly the locking in this class is insufficient as - //there is no guarantee that the message previously atthe - //front is still there. - qpid::sys::Mutex::ScopedLock l(lock); - queue.pop_front(); - pendingDequeues.push_back(oldest); - } - oldest.queue->addPendingDequeue(oldest); - QPID_LOG(debug, "Ring policy triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) - << ": removed message " << oldest.position << " to make way for " << m.position); + queue.pop_front(); + pendingDequeues.push_back(oldest); + QPID_LOG(debug, "Ring policy triggered in " << name + << ": removed message " << oldest.position << " to make way for new message"); return true; } else { - QPID_LOG(debug, "Ring policy could not be triggered in queue " - << (m.queue ? m.queue->getName() : std::string("unknown queue")) + QPID_LOG(debug, "Ring policy could not be triggered in " << name << ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued"); //in strict mode, if oldest message has been delivered (hence //cannot be acquired) but not yet acked, it should not be @@ -266,6 +246,11 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m) } } +void RingQueuePolicy::getPendingDequeues(Messages& result) +{ + result = pendingDequeues; +} + bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) { for (Messages::iterator i = q.begin(); i != q.end(); i++) { @@ -277,25 +262,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) return false; } +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +{ + return createQueuePolicy("<unspecified>", maxCount, maxSize, type); +} + std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) { + return createQueuePolicy("<unspecified>", settings); +} + +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings) +{ uint32_t maxCount = getInt(settings, maxCountKey, 0); uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize); if (maxCount || maxSize) { - return createQueuePolicy(maxCount, maxSize, getType(settings)); + return createQueuePolicy(name, maxCount, maxSize, getType(settings)); } else { return std::auto_ptr<QueuePolicy>(); } } -std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type) +std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, + uint32_t maxCount, uint64_t maxSize, const std::string& type) { if (type == RING || type == RING_STRICT) { - return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type)); } else if (type == FLOW_TO_DISK) { - return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize)); + return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize)); } else { - return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type)); + return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type)); } } @@ -305,10 +301,10 @@ namespace qpid { std::ostream& operator<<(std::ostream& out, const QueuePolicy& p) { - if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get(); + if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size; else out << "size: unlimited"; out << "; "; - if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get(); + if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count; else out << "count: unlimited"; out << "; type=" << p.type; return out; |