diff options
Diffstat (limited to 'cpp/src/qpid/broker/Queue.cpp')
-rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 70 |
1 files changed, 50 insertions, 20 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index c899a5befa..fb8bd1288f 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -51,6 +51,21 @@ using std::for_each; using std::mem_fun; namespace _qmf = qmf::org::apache::qpid::broker; + +namespace +{ + const std::string qpidMaxSize("qpid.max_size"); + const std::string qpidMaxCount("qpid.max_count"); + const std::string qpidNoLocal("no-local"); + const std::string qpidTraceIdentity("qpid.trace.id"); + const std::string qpidTraceExclude("qpid.trace.exclude"); + const std::string qpidLastValueQueue("qpid.last_value_queue"); + const std::string qpidOptimisticConsume("qpid.optimistic_consume"); + const std::string qpidPersistLastNode("qpid.persist_last_node"); + const std::string qpidVQMatchProperty("qpid.LVQ_key"); +} + + Queue::Queue(const string& _name, bool _autodelete, MessageStore* const _store, const OwnershipToken* const _owner, @@ -253,7 +268,7 @@ bool Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ptr c) if (c->filter(msg.payload)) { if (c->accept(msg.payload)) { m = msg; - messages.pop_front(); + popMsg(msg); return true; } else { //message(s) are available but consumer hasn't got enough credit @@ -371,7 +386,7 @@ QueuedMessage Queue::get(){ if(!messages.empty()){ msg = messages.front(); - messages.pop_front(); + popMsg(msg); } return msg; } @@ -406,22 +421,49 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) { QueuedMessage qmsg = messages.front(); boost::intrusive_ptr<Message> msg = qmsg.payload; destq->deliver(msg); // deliver message to the destination queue - messages.pop_front(); + popMsg(qmsg); dequeue(0, qmsg); count++; } return count; } +void Queue::popMsg(QueuedMessage& qmsg) +{ + if (lastValueQueue){ + const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); + lvq.erase(key); + } + messages.pop_front(); +} + void Queue::push(boost::intrusive_ptr<Message>& msg){ Listeners copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); if (policy.get()) policy->tryEnqueue(qm); - - messages.push_back(qm); - listeners.swap(copy); + + //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of existing message + LVQ::iterator i; + if (lastValueQueue){ + const framing::FieldTable* ft = msg->getApplicationHeaders(); + string key = ft->getString(qpidVQMatchProperty); + + i = lvq.find(key); + if (i == lvq.end()){ + messages.push_back(qm); + listeners.swap(copy); + lvq[key] = &messages.back(); + }else { + i->second->payload = msg; + } + }else { + + messages.push_back(qm); + listeners.swap(copy); + } } for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify)); } @@ -514,8 +556,8 @@ bool Queue::dequeue(TransactionContext* ctxt, const QueuedMessage& msg) void Queue::popAndDequeue() { QueuedMessage msg = messages.front(); - messages.pop_front(); - dequeue(0, msg); + popMsg(msg); + dequeue(0, msg); } /** @@ -529,18 +571,6 @@ void Queue::dequeued(const QueuedMessage& msg) } -namespace -{ - const std::string qpidMaxSize("qpid.max_size"); - const std::string qpidMaxCount("qpid.max_count"); - const std::string qpidNoLocal("no-local"); - const std::string qpidTraceIdentity("qpid.trace.id"); - const std::string qpidTraceExclude("qpid.trace.exclude"); - const std::string qpidLastValueQueue("qpid.last_value_queue"); - const std::string qpidOptimisticConsume("qpid.optimistic_consume"); - const std::string qpidPersistLastNode("qpid.persist_last_node"); -} - void Queue::create(const FieldTable& _settings) { settings = _settings; |