summaryrefslogtreecommitdiff
path: root/cpp/src/tests/QueueTest.cpp
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/tests/QueueTest.cpp
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r--cpp/src/tests/QueueTest.cpp244
1 files changed, 193 insertions, 51 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index fb429ca981..3b4f74620f 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/framing/AMQFrame.h"
@@ -40,8 +42,11 @@
#include "qpid/broker/QueueFlowLimit.h"
#include <iostream>
-#include "boost/format.hpp"
+#include <vector>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+using namespace std;
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
@@ -83,7 +88,7 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
+intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
@@ -94,6 +99,16 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK
return msg;
}
+intrusive_ptr<Message> contentMessage(string content) {
+ intrusive_ptr<Message> m(MessageUtils::createMessage());
+ MessageUtils::addContent(m, content);
+ return m;
+}
+
+string getContent(intrusive_ptr<Message> m) {
+ return m->getFrames().getContent();
+}
+
QPID_AUTO_TEST_SUITE(QueueTestSuite)
QPID_AUTO_TEST_CASE(testAsyncMessage) {
@@ -105,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
//Test basic delivery:
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -120,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
queue->process(msg1);
@@ -145,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){
BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
//Test basic delivery:
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
@@ -191,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){
QPID_AUTO_TEST_CASE(testDequeue){
Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
intrusive_ptr<Message> received;
queue->deliver(msg1);
@@ -265,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -291,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){
Queue::shared_ptr queue(new Queue("my-queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -317,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){
Queue::shared_ptr queue(new Queue("my-queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -431,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
- intrusive_ptr<Message> msg4 = create_message("e", "D");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
+ intrusive_ptr<Message> msg4 = createMessage("e", "D");
intrusive_ptr<Message> received;
//set deliever match for LVQ a,b,c,a
@@ -466,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg3.get(), received.get());
- intrusive_ptr<Message> msg5 = create_message("e", "A");
- intrusive_ptr<Message> msg6 = create_message("e", "B");
- intrusive_ptr<Message> msg7 = create_message("e", "C");
+ intrusive_ptr<Message> msg5 = createMessage("e", "A");
+ intrusive_ptr<Message> msg6 = createMessage("e", "B");
+ intrusive_ptr<Message> msg7 = createMessage("e", "C");
msg5->insertCustomProperty(key,"a");
msg6->insertCustomProperty(key,"b");
msg7->insertCustomProperty(key,"c");
@@ -498,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
string key;
args.getLVQKey(key);
@@ -524,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
- intrusive_ptr<Message> msg4 = create_message("e", "D");
- intrusive_ptr<Message> msg5 = create_message("e", "F");
- intrusive_ptr<Message> msg6 = create_message("e", "G");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
+ intrusive_ptr<Message> msg4 = createMessage("e", "D");
+ intrusive_ptr<Message> msg5 = createMessage("e", "F");
+ intrusive_ptr<Message> msg6 = createMessage("e", "G");
//set deliever match for LVQ a,b,c,a
@@ -601,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
queue1->configure(args);
queue2->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "A");
string key;
args.getLVQKey(key);
@@ -645,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
intrusive_ptr<Message> received;
queue1->create(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "A");
// 2
string key;
args.getLVQKey(key);
@@ -673,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
+ intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
@@ -736,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
std::string("b"), std::string("b"), std::string("b"),
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", groups[i]);
msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
@@ -883,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Queue = a-2,
// Owners= ^C3,
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", "a");
msg->insertCustomProperty("MY-ID", 9);
queue->deliver(msg);
@@ -894,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- msg = create_message("e", "A");
+ msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", "b");
msg->insertCustomProperty("MY-ID", 10);
queue->deliver(msg);
@@ -925,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
queue->configure(args);
for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
// no "GROUP-ID" header
msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
@@ -988,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
queue2->create(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -1004,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
- intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
queue1->deliver(msg2);
queue2->deliver(msg2);
@@ -1019,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue1->clearLastNodeFailure();
queue2->clearLastNodeFailure();
- intrusive_ptr<Message> msg3 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "B");
queue1->deliver(msg3);
queue2->deliver(msg3);
BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
@@ -1033,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
* internal details not part of the queue abstraction.
// check requeue 1
- intrusive_ptr<Message> msg4 = create_message("e", "C");
- intrusive_ptr<Message> msg5 = create_message("e", "D");
+ intrusive_ptr<Message> msg4 = createMessage("e", "C");
+ intrusive_ptr<Message> msg5 = createMessage("e", "D");
framing::SequenceNumber sequence(1);
QueuedMessage qmsg1(queue1.get(), msg4, sequence);
@@ -1081,8 +1096,8 @@ not requeued to the store.
queue1->create(args);
// check requeue 1
- intrusive_ptr<Message> msg1 = create_message("e", "C");
- intrusive_ptr<Message> msg2 = create_message("e", "D");
+ intrusive_ptr<Message> msg1 = createMessage("e", "C");
+ intrusive_ptr<Message> msg2 = createMessage("e", "D");
queue1->recover(msg1);
@@ -1114,7 +1129,7 @@ simulate store exception going into last node standing
queue1->configure(args);
// check requeue 1
- intrusive_ptr<Message> msg1 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "C");
queue1->deliver(msg1);
testStore.createError();
@@ -1401,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
}
+QPID_AUTO_TEST_CASE(testSetPositionFifo) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
+ for (int i = 0; i < 10; ++i)
+ q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+
+ // Verify the front of the queue
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ // Verify the back of the queue
+ QueuedMessage qm;
+ BOOST_CHECK_EQUAL(10u, q->getPosition());
+ BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+ BOOST_CHECK_EQUAL(10u, q->getMessageCount());
+
+ // Using setPosition to introduce a gap in sequence numbers.
+ q->setPosition(15);
+ BOOST_CHECK_EQUAL(10u, q->getMessageCount());
+ BOOST_CHECK_EQUAL(15u, q->getPosition());
+ BOOST_CHECK(q->find(10, qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+ q->deliver(contentMessage("16"));
+ c->setPosition(9);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(10u, c->last.position);
+ BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(16u, c->last.position);
+ BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+
+ // Using setPosition to trunkcate the queue
+ q->setPosition(5);
+ BOOST_CHECK_EQUAL(5u, q->getMessageCount());
+ q->deliver(contentMessage("6a"));
+ c->setPosition(4);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(5u, c->last.position);
+ BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(6u, c->last.position);
+ BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+ BOOST_CHECK(!q->dispatch(c)); // No more messages.
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionLvq) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ string key="key";
+ framing::FieldTable args;
+ args.setString("qpid.last_value_queue_key", "key");
+ q->configure(args);
+
+ const char* values[] = { "a", "b", "c", "a", "b", "c" };
+ for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+ intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
+ m->insertCustomProperty(key, values[i]);
+ q->deliver(m);
+ }
+ BOOST_CHECK_EQUAL(3u, q->getMessageCount());
+ // Verify the front of the queue
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+ // Verify the back of the queue
+ QueuedMessage qm;
+ BOOST_CHECK_EQUAL(6u, q->getPosition());
+ BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("6", getContent(qm.payload));
+
+ q->setPosition(5);
+ c->setPosition(4);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1
+ BOOST_CHECK(!q->dispatch(c));
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionPriority) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ framing::FieldTable args;
+ args.setInt("qpid.priorities", 10);
+ q->configure(args);
+
+ const int priorities[] = { 1, 2, 3, 2, 1, 3 };
+ for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
+ intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
+ m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+ ->setPriority(priorities[i]);
+ q->deliver(m);
+ }
+
+ // Truncation removes messages in fifo order, not priority order.
+ q->setPosition(3);
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1u, c->last.position);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(2u, c->last.position);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(3u, c->last.position);
+ BOOST_CHECK(!q->dispatch(c));
+
+ intrusive_ptr<Message> m = contentMessage("4a");
+ m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+ ->setPriority(4);
+ q->deliver(m);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4u, c->last.position);
+ BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+
+ // But consumers see priority order
+ c.reset(new TestConsumer("test", true));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4u, c->last.position);
+ BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(3u, c->last.position);
+ BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(2u, c->last.position);
+ BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1u, c->last.position);
+ BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+}
QPID_AUTO_TEST_SUITE_END()