summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2009-01-19 19:25:29 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2009-01-19 19:25:29 +0000
commit141b05b3e7fbe0f49c586a7b4265ad2cb088b27b (patch)
treea057a4aa636e345417cc657d366211d4785f1746 /cpp/src
parent68aa95e82601c220d2fd13806615b04a37da8ec9 (diff)
downloadqpid-python-141b05b3e7fbe0f49c586a7b4265ad2cb088b27b.tar.gz
Put messages into LVQ FIFO if no key is specified.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@735776 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Queue.cpp14
-rw-r--r--cpp/src/tests/QueueTest.cpp24
2 files changed, 31 insertions, 7 deletions
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index 5acc474aa1..6e0d777276 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -206,8 +206,8 @@ void Queue::requeue(const QueuedMessage& msg){
}
void Queue::clearLVQIndex(const QueuedMessage& msg){
- if (lastValueQueue){
- const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
+ const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
+ if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
lvq.erase(key);
}
@@ -512,8 +512,8 @@ uint32_t Queue::move(const Queue::shared_ptr destq, uint32_t qty) {
void Queue::popMsg(QueuedMessage& qmsg)
{
- if (lastValueQueue){
- const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+ const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+ if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
lvq.erase(key);
}
@@ -529,15 +529,15 @@ void Queue::push(boost::intrusive_ptr<Message>& msg){
if (policy.get()) policy->tryEnqueue(qm);
LVQ::iterator i;
- if (lastValueQueue){
- const framing::FieldTable* ft = msg->getApplicationHeaders();
+ const framing::FieldTable* ft = msg->getApplicationHeaders();
+ if (lastValueQueue && ft){
string key = ft->getAsString(qpidVQMatchProperty);
i = lvq.find(key);
if (i == lvq.end()){
messages.push_back(qm);
listeners.populate(copy);
- lvq[key] = msg;
+ lvq[key] = msg;
}else {
i->second->setReplacementMessage(msg,this);
dequeued(QueuedMessage(qm.queue, i->second, qm.position));
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index f1771e26cd..dee6de83a7 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -356,6 +356,30 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
}
+QPID_AUTO_TEST_CASE(testLVQEmptyKey){
+
+ client::QueueOptions args;
+ // set queue mode
+ args.setOrdering(client::LVQ);
+
+ Queue::shared_ptr queue(new Queue("my-queue", true ));
+ queue->configure(args);
+
+ intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg2 = create_message("e", "B");
+
+ string key;
+ args.getLVQKey(key);
+ BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+
+
+ msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+ queue->deliver(msg1);
+ queue->deliver(msg2);
+ BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
+
+}
+
QPID_AUTO_TEST_CASE(testLVQAcquire){
client::QueueOptions args;