summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/Queue.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp70
1 files changed, 50 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index c899a5befa..fb8bd1288f 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -51,6 +51,21 @@ using std::for_each;
using std::mem_fun;
namespace _qmf = qmf::org::apache::qpid::broker;
+
+namespace
+{
+ const std::string qpidMaxSize("qpid.max_size");
+ const std::string qpidMaxCount("qpid.max_count");
+ const std::string qpidNoLocal("no-local");
+ const std::string qpidTraceIdentity("qpid.trace.id");
+ const std::string qpidTraceExclude("qpid.trace.exclude");
+ const std::string qpidLastValueQueue("qpid.last_value_queue");
+ const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+ const std::string qpidPersistLastNode("qpid.persist_last_node");
+ const std::string qpidVQMatchProperty("qpid.LVQ_key");
+}
+
+
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
const OwnershipToken* const _owner,
@@ -253,7 +268,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c)
if (c->filter(msg.payload)) {
if (c->accept(msg.payload)) {
m = msg;
- messages.pop_front();
+ popMsg(msg);
return true;
} else {
//message(s) are available but consumer hasn't got enough credit
@@ -371,7 +386,7 @@ QueuedMessage Queue::get(){
if(!messages.empty()){
msg = messages.front();
- messages.pop_front();
+ popMsg(msg);
}
return msg;
}
@@ -406,22 +421,49 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
QueuedMessage qmsg = messages.front();
boost::intrusive_ptr<Message> msg = qmsg.payload;
destq->deliver(msg); // deliver message to the destination queue
- messages.pop_front();
+ popMsg(qmsg);
dequeue(0, qmsg);
count++;
}
return count;
}
+void Queue::popMsg(QueuedMessage& qmsg)
+{
+ if (lastValueQueue){
+ const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+ string key = ft->getString(qpidVQMatchProperty);
+ lvq.erase(key);
+ }
+ messages.pop_front();
+}
+
void Queue::push(boost::intrusive_ptr<Message>& msg){
Listeners copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
if (policy.get()) policy->tryEnqueue(qm);
-
- messages.push_back(qm);
- listeners.swap(copy);
+
+ //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message
+ LVQ::iterator i;
+ if (lastValueQueue){
+ const framing::FieldTable* ft = msg->getApplicationHeaders();
+ string key = ft->getString(qpidVQMatchProperty);
+
+ i = lvq.find(key);
+ if (i == lvq.end()){
+ messages.push_back(qm);
+ listeners.swap(copy);
+ lvq[key] = &messages.back();
+ }else {
+ i->second->payload = msg;
+ }
+ }else {
+
+ messages.push_back(qm);
+ listeners.swap(copy);
+ }
}
for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
}
@@ -514,8 +556,8 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg)
void Queue::popAndDequeue()
{
QueuedMessage msg = messages.front();
- messages.pop_front();
- dequeue(0, msg);
+ popMsg(msg);
+ dequeue(0, msg);
}
/**
@@ -529,18 +571,6 @@ void Queue::dequeued(const QueuedMessage& msg)
}
-namespace
-{
- const std::string qpidMaxSize("qpid.max_size");
- const std::string qpidMaxCount("qpid.max_count");
- const std::string qpidNoLocal("no-local");
- const std::string qpidTraceIdentity("qpid.trace.id");
- const std::string qpidTraceExclude("qpid.trace.exclude");
- const std::string qpidLastValueQueue("qpid.last_value_queue");
- const std::string qpidOptimisticConsume("qpid.optimistic_consume");
- const std::string qpidPersistLastNode("qpid.persist_last_node");
-}
-
void Queue::create(const FieldTable& _settings)
{
settings = _settings;