diff options
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/broker/Consumer.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 4 |
2 files changed, 5 insertions, 1 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h index d21dbb19f0..662b0f937d 100644 --- a/cpp/src/qpid/broker/Consumer.h +++ b/cpp/src/qpid/broker/Consumer.h @@ -86,6 +86,8 @@ class Consumer : public QueueCursor { */ virtual bool isCounted() { return true; } + QueueCursor getCursor() const { return *this; } + void setCursor(const QueueCursor& qc) { static_cast<QueueCursor&>(*this) = qc; } protected: //framing::SequenceNumber position; diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 9a0e4a96f4..81e26d0b48 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -366,7 +366,8 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) while (true) { //TODO: reduce lock scope Mutex::ScopedLock locker(messageLock); - Message* msg = messages->next(*c); + QueueCursor cursor = c->getCursor(); // Save current position. + Message* msg = messages->next(*c); // Advances c. if (msg) { if (msg->hasExpired()) { QPID_LOG(debug, "Message expired from queue '" << name << "'"); @@ -405,6 +406,7 @@ bool Queue::getNextMessage(Message& m, Consumer::shared_ptr& c) } else { //message(s) are available but consumer hasn't got enough credit QPID_LOG(debug, "Consumer can't currently accept message from '" << name << "'"); + c->setCursor(cursor); // Restore cursor, will try again with credit if (c->preAcquires()) { //let someone else try listeners.populate(set); |
