summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/QueuePolicy.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/QueuePolicy.cpp144
1 files changed, 70 insertions, 74 deletions
diff --git a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
index 39afe90134..a8aa674c53 100644
--- a/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/qpid/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -28,8 +28,8 @@
using namespace qpid::broker;
using namespace qpid::framing;
-QueuePolicy::QueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false) {}
+QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {}
void QueuePolicy::enqueued(uint64_t _size)
{
@@ -39,18 +39,15 @@ void QueuePolicy::enqueued(uint64_t _size)
void QueuePolicy::dequeued(uint64_t _size)
{
- //Note: underflow detection is not reliable in the face of
- //concurrent updates (at present locking in Queue.cpp prevents
- //these anyway); updates are atomic and are safe regardless.
if (maxCount) {
- if (count.get() > 0) {
+ if (count > 0) {
--count;
} else {
throw Exception(QPID_MSG("Attempted count underflow on dequeue(" << _size << "): " << *this));
}
}
if (maxSize) {
- if (_size > size.get()) {
+ if (_size > size) {
throw Exception(QPID_MSG("Attempted size underflow on dequeue(" << _size << "): " << *this));
} else {
size -= _size;
@@ -58,47 +55,47 @@ void QueuePolicy::dequeued(uint64_t _size)
}
}
-bool QueuePolicy::checkLimit(const QueuedMessage& m)
+bool QueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- bool sizeExceeded = maxSize && (size.get() + m.payload->contentSize()) > maxSize;
- bool countExceeded = maxCount && (count.get() + 1) > maxCount;
+ bool sizeExceeded = maxSize && (size + m->contentSize()) > maxSize;
+ bool countExceeded = maxCount && (count + 1) > maxCount;
bool exceeded = sizeExceeded || countExceeded;
if (exceeded) {
if (!policyExceeded) {
- policyExceeded = true;
- if (m.queue) {
- if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << m.queue->getName());
- if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << m.queue->getName());
- }
+ policyExceeded = true;
+ if (sizeExceeded) QPID_LOG(info, "Queue cumulative message size exceeded policy for " << name);
+ if (countExceeded) QPID_LOG(info, "Queue message count exceeded policy for " << name);
}
} else {
if (policyExceeded) {
policyExceeded = false;
- if (m.queue) {
- QPID_LOG(info, "Queue cumulative message size and message count within policy for " << m.queue->getName());
- }
+ QPID_LOG(info, "Queue cumulative message size and message count within policy for " << name);
}
}
return !exceeded;
}
-void QueuePolicy::tryEnqueue(const QueuedMessage& m)
+void QueuePolicy::tryEnqueue(boost::intrusive_ptr<Message> m)
{
if (checkLimit(m)) {
- enqueued(m);
+ enqueued(m->contentSize());
} else {
- std::string queue = m.queue ? m.queue->getName() : std::string("unknown queue");
- throw ResourceLimitExceededException(
- QPID_MSG("Policy exceeded on " << queue << " by message " << m.position
- << " of size " << m.payload->contentSize() << " , policy: " << *this));
+ throw ResourceLimitExceededException(QPID_MSG("Policy exceeded on " << name << ", policy: " << *this));
}
}
-void QueuePolicy::enqueued(const QueuedMessage& m)
+void QueuePolicy::recoverEnqueued(boost::intrusive_ptr<Message> m)
{
- enqueued(m.payload->contentSize());
+ enqueued(m->contentSize());
}
+void QueuePolicy::enqueueAborted(boost::intrusive_ptr<Message> m)
+{
+ dequeued(m->contentSize());
+}
+
+void QueuePolicy::enqueued(const QueuedMessage&) {}
+
void QueuePolicy::dequeued(const QueuedMessage& m)
{
dequeued(m.payload->contentSize());
@@ -132,7 +129,7 @@ std::string QueuePolicy::getType(const FieldTable& settings)
std::transform(t.begin(), t.end(), t.begin(), tolower);
if (t == REJECT || t == FLOW_TO_DISK || t == RING || t == RING_STRICT) return t;
}
- return FLOW_TO_DISK;
+ return REJECT;
}
void QueuePolicy::setDefaultMaxSize(uint64_t s)
@@ -140,6 +137,7 @@ void QueuePolicy::setDefaultMaxSize(uint64_t s)
defaultMaxSize = s;
}
+void QueuePolicy::getPendingDequeues(Messages&) {}
@@ -148,8 +146,8 @@ void QueuePolicy::encode(Buffer& buffer) const
{
buffer.putLong(maxCount);
buffer.putLongLong(maxSize);
- buffer.putLong(count.get());
- buffer.putLongLong(size.get());
+ buffer.putLong(count);
+ buffer.putLongLong(size);
}
void QueuePolicy::decode ( Buffer& buffer )
@@ -179,16 +177,18 @@ const std::string QueuePolicy::RING("ring");
const std::string QueuePolicy::RING_STRICT("ring_strict");
uint64_t QueuePolicy::defaultMaxSize(0);
-FlowToDiskPolicy::FlowToDiskPolicy(uint32_t _maxCount, uint64_t _maxSize) :
- QueuePolicy(_maxCount, _maxSize, FLOW_TO_DISK) {}
+FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize) :
+ QueuePolicy(_name, _maxCount, _maxSize, FLOW_TO_DISK) {}
-bool FlowToDiskPolicy::checkLimit(const QueuedMessage& m)
+bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- return QueuePolicy::checkLimit(m) || m.queue->releaseMessageContent(m);
+ if (!QueuePolicy::checkLimit(m)) m->requestContentRelease();
+ return true;
}
-RingQueuePolicy::RingQueuePolicy(uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- QueuePolicy(_maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
+RingQueuePolicy::RingQueuePolicy(const std::string& _name,
+ uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
+ QueuePolicy(_name, _maxCount, _maxSize, _type), strict(_type == RING_STRICT) {}
bool before(const QueuedMessage& a, const QueuedMessage& b)
{
@@ -197,15 +197,12 @@ bool before(const QueuedMessage& a, const QueuedMessage& b)
void RingQueuePolicy::enqueued(const QueuedMessage& m)
{
- QueuePolicy::enqueued(m);
- qpid::sys::Mutex::ScopedLock l(lock);
//need to insert in correct location based on position
queue.insert(lower_bound(queue.begin(), queue.end(), m, before), m);
}
void RingQueuePolicy::dequeued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//find and remove m from queue
if (find(m, pendingDequeues, true) || find(m, queue, true)) {
//now update count and size
@@ -215,49 +212,32 @@ void RingQueuePolicy::dequeued(const QueuedMessage& m)
bool RingQueuePolicy::isEnqueued(const QueuedMessage& m)
{
- qpid::sys::Mutex::ScopedLock l(lock);
//for non-strict ring policy, a message can be replaced (and
//therefore dequeued) before it is accepted or released by
//subscriber; need to detect this
return find(m, pendingDequeues, false) || find(m, queue, false);
}
-bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
+bool RingQueuePolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
if (QueuePolicy::checkLimit(m)) return true;//if haven't hit limit, ok to accept
QueuedMessage oldest;
- {
- qpid::sys::Mutex::ScopedLock l(lock);
- if (queue.empty()) {
- QPID_LOG(debug, "Message too large for ring queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << " [" << *this << "] "
- << ": message size = " << m.payload->contentSize() << " bytes");
- return false;
- }
- oldest = queue.front();
+ if (queue.empty()) {
+ QPID_LOG(debug, "Message too large for ring queue " << name
+ << " [" << *this << "] "
+ << ": message size = " << m->contentSize() << " bytes");
+ return false;
}
+ oldest = queue.front();
if (oldest.queue->acquire(oldest) || !strict) {
- {
- //TODO: fix this! In the current code, this method is
- //only ever called with the Queue lock already taken. This
- //should not be relied upon going forward however and
- //clearly the locking in this class is insufficient as
- //there is no guarantee that the message previously atthe
- //front is still there.
- qpid::sys::Mutex::ScopedLock l(lock);
- queue.pop_front();
- pendingDequeues.push_back(oldest);
- }
- oldest.queue->addPendingDequeue(oldest);
- QPID_LOG(debug, "Ring policy triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
- << ": removed message " << oldest.position << " to make way for " << m.position);
+ queue.pop_front();
+ pendingDequeues.push_back(oldest);
+ QPID_LOG(debug, "Ring policy triggered in " << name
+ << ": removed message " << oldest.position << " to make way for new message");
return true;
} else {
- QPID_LOG(debug, "Ring policy could not be triggered in queue "
- << (m.queue ? m.queue->getName() : std::string("unknown queue"))
+ QPID_LOG(debug, "Ring policy could not be triggered in " << name
<< ": oldest message (seq-no=" << oldest.position << ") has been delivered but not yet acknowledged or requeued");
//in strict mode, if oldest message has been delivered (hence
//cannot be acquired) but not yet acked, it should not be
@@ -266,6 +246,11 @@ bool RingQueuePolicy::checkLimit(const QueuedMessage& m)
}
}
+void RingQueuePolicy::getPendingDequeues(Messages& result)
+{
+ result = pendingDequeues;
+}
+
bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
{
for (Messages::iterator i = q.begin(); i != q.end(); i++) {
@@ -277,25 +262,36 @@ bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove)
return false;
}
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+{
+ return createQueuePolicy("<unspecified>", maxCount, maxSize, type);
+}
+
std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings)
{
+ return createQueuePolicy("<unspecified>", settings);
+}
+
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings)
+{
uint32_t maxCount = getInt(settings, maxCountKey, 0);
uint32_t maxSize = getInt(settings, maxSizeKey, defaultMaxSize);
if (maxCount || maxSize) {
- return createQueuePolicy(maxCount, maxSize, getType(settings));
+ return createQueuePolicy(name, maxCount, maxSize, getType(settings));
} else {
return std::auto_ptr<QueuePolicy>();
}
}
-std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(uint32_t maxCount, uint64_t maxSize, const std::string& type)
+std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const std::string& name,
+ uint32_t maxCount, uint64_t maxSize, const std::string& type)
{
if (type == RING || type == RING_STRICT) {
- return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new RingQueuePolicy(name, maxCount, maxSize, type));
} else if (type == FLOW_TO_DISK) {
- return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(maxCount, maxSize));
+ return std::auto_ptr<QueuePolicy>(new FlowToDiskPolicy(name, maxCount, maxSize));
} else {
- return std::auto_ptr<QueuePolicy>(new QueuePolicy(maxCount, maxSize, type));
+ return std::auto_ptr<QueuePolicy>(new QueuePolicy(name, maxCount, maxSize, type));
}
}
@@ -305,10 +301,10 @@ namespace qpid {
std::ostream& operator<<(std::ostream& out, const QueuePolicy& p)
{
- if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size.get();
+ if (p.maxSize) out << "size: max=" << p.maxSize << ", current=" << p.size;
else out << "size: unlimited";
out << "; ";
- if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count.get();
+ if (p.maxCount) out << "count: max=" << p.maxCount << ", current=" << p.count;
else out << "count: unlimited";
out << "; type=" << p.type;
return out;