diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
| commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
| tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/broker/MessageMap.cpp | |
| parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
| download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/MessageMap.cpp')
| -rw-r--r-- | cpp/src/qpid/broker/MessageMap.cpp | 78 |
1 files changed, 52 insertions, 26 deletions
diff --git a/cpp/src/qpid/broker/MessageMap.cpp b/cpp/src/qpid/broker/MessageMap.cpp index 048df45434..9b164d4e5c 100644 --- a/cpp/src/qpid/broker/MessageMap.cpp +++ b/cpp/src/qpid/broker/MessageMap.cpp @@ -20,6 +20,7 @@ */ #include "qpid/broker/MessageMap.h" #include "qpid/broker/QueuedMessage.h" +#include "qpid/log/Statement.h" namespace qpid { namespace broker { @@ -27,7 +28,16 @@ namespace { const std::string EMPTY; } -bool MessageMap::deleted(const QueuedMessage&) { return true; } +bool MessageMap::deleted(const QueuedMessage& message) +{ + Ordering::iterator i = messages.find(message.position); + if (i != messages.end()) { + erase(i); + return true; + } else { + return false; + } +} std::string MessageMap::getKey(const QueuedMessage& message) { @@ -38,30 +48,32 @@ std::string MessageMap::getKey(const QueuedMessage& message) size_t MessageMap::size() { - return messages.size(); + size_t count(0); + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) ++count; + } + return count; } bool MessageMap::empty() { - return messages.empty(); + return size() == 0;//TODO: more efficient implementation } void MessageMap::release(const QueuedMessage& message) { - std::string key = getKey(message); - Index::iterator i = index.find(key); - if (i == index.end()) { - index[key] = message; - messages[message.position] = message; - } //else message has already been replaced + Ordering::iterator i = messages.find(message.position); + if (i != messages.end() && i->second.status == QueuedMessage::ACQUIRED) { + i->second.status = QueuedMessage::AVAILABLE; + } } bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; message = i->second; - erase(i); return true; } else { return false; @@ -71,7 +83,7 @@ bool MessageMap::acquire(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& message) { Ordering::iterator i = messages.find(position); - if (i != messages.end()) { + if (i != messages.end() && i->second.status == QueuedMessage::AVAILABLE) { message = i->second; return true; } else { @@ -79,10 +91,10 @@ bool MessageMap::find(const framing::SequenceNumber& position, QueuedMessage& me } } -bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool) +bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& message, bool unacquired) { Ordering::iterator i = messages.lower_bound(position+1); - if (i != messages.end()) { + if (i != messages.end() && (i->second.status == QueuedMessage::AVAILABLE || (!unacquired && i->second.status == QueuedMessage::ACQUIRED))) { message = i->second; return true; } else { @@ -92,14 +104,14 @@ bool MessageMap::browse(const framing::SequenceNumber& position, QueuedMessage& bool MessageMap::consume(QueuedMessage& message) { - Ordering::iterator i = messages.begin(); - if (i != messages.end()) { - message = i->second; - erase(i); - return true; - } else { - return false; + for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->second.status == QueuedMessage::AVAILABLE) { + i->second.status = QueuedMessage::ACQUIRED; + message = i->second; + return true; + } } + return false; } const QueuedMessage& MessageMap::replace(const QueuedMessage& original, const QueuedMessage& update) @@ -115,12 +127,17 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) if (result.second) { //there was no previous message for this key; nothing needs to //be removed, just add the message into its correct position - messages[added.position] = added; + QueuedMessage& a = messages[added.position]; + a = added; + a.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Added message at " << a.position); return false; } else { //there is already a message with that key which needs to be replaced removed = result.first->second; result.first->second = replace(result.first->second, added); + result.first->second.status = QueuedMessage::AVAILABLE; + QPID_LOG(debug, "Displaced message at " << removed.position << " with " << result.first->second.position << ": " << result.first->first); return true; } } @@ -128,15 +145,24 @@ bool MessageMap::push(const QueuedMessage& added, QueuedMessage& removed) void MessageMap::foreach(Functor f) { for (Ordering::iterator i = messages.begin(); i != messages.end(); ++i) { - f(i->second); + if (i->second.status == QueuedMessage::AVAILABLE) f(i->second); } } void MessageMap::removeIf(Predicate p) { - for (Ordering::iterator i = messages.begin(); i != messages.end(); i++) { - if (p(i->second)) { - erase(i); + for (Ordering::iterator i = messages.begin(); i != messages.end();) { + if (i->second.status == QueuedMessage::AVAILABLE && p(i->second)) { + index.erase(getKey(i->second)); + //Note: Removing from messages means that the subsequent + //call to deleted() for the same message will return + //false. At present that is not a problem. If this were + //changed to hold onto the message until dequeued + //(e.g. with REMOVED state), then the erase() below would + //need to take that into account. + messages.erase(i++); + } else { + ++i; } } } |
