diff options
author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
---|---|---|
committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/tests/QueueTest.cpp | |
parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/QueueTest.cpp')
-rw-r--r-- | cpp/src/tests/QueueTest.cpp | 244 |
1 files changed, 193 insertions, 51 deletions
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index fb429ca981..3b4f74620f 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/AMQFrame.h" @@ -40,8 +42,11 @@ #include "qpid/broker/QueueFlowLimit.h" #include <iostream> -#include "boost/format.hpp" +#include <vector> +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> +using namespace std; using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; @@ -83,7 +88,7 @@ public: Message& getMessage() { return *(msg.get()); } }; -intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { +intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) { intrusive_ptr<Message> msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); @@ -94,6 +99,16 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK return msg; } +intrusive_ptr<Message> contentMessage(string content) { + intrusive_ptr<Message> m(MessageUtils::createMessage()); + MessageUtils::addContent(m, content); + return m; +} + +string getContent(intrusive_ptr<Message> m) { + return m->getFrames().getContent(); +} + QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testAsyncMessage) { @@ -105,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { //Test basic delivery: - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); @@ -120,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process queue->process(msg1); @@ -145,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){ BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); //Test basic delivery: - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); @@ -191,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){ QPID_AUTO_TEST_CASE(testDequeue){ Queue::shared_ptr queue(new Queue("my_queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); intrusive_ptr<Message> received; queue->deliver(msg1); @@ -265,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -291,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){ Queue::shared_ptr queue(new Queue("my-queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -317,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){ Queue::shared_ptr queue(new Queue("my-queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -431,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); - intrusive_ptr<Message> msg4 = create_message("e", "D"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); + intrusive_ptr<Message> msg4 = createMessage("e", "D"); intrusive_ptr<Message> received; //set deliever match for LVQ a,b,c,a @@ -466,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); - intrusive_ptr<Message> msg5 = create_message("e", "A"); - intrusive_ptr<Message> msg6 = create_message("e", "B"); - intrusive_ptr<Message> msg7 = create_message("e", "C"); + intrusive_ptr<Message> msg5 = createMessage("e", "A"); + intrusive_ptr<Message> msg6 = createMessage("e", "B"); + intrusive_ptr<Message> msg7 = createMessage("e", "C"); msg5->insertCustomProperty(key,"a"); msg6->insertCustomProperty(key,"b"); msg7->insertCustomProperty(key,"c"); @@ -498,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); string key; args.getLVQKey(key); @@ -524,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); - intrusive_ptr<Message> msg4 = create_message("e", "D"); - intrusive_ptr<Message> msg5 = create_message("e", "F"); - intrusive_ptr<Message> msg6 = create_message("e", "G"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); + intrusive_ptr<Message> msg4 = createMessage("e", "D"); + intrusive_ptr<Message> msg5 = createMessage("e", "F"); + intrusive_ptr<Message> msg6 = createMessage("e", "G"); //set deliever match for LVQ a,b,c,a @@ -601,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ queue1->configure(args); queue2->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "A"); string key; args.getLVQKey(key); @@ -645,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ intrusive_ptr<Message> received; queue1->create(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "A"); // 2 string key; args.getLVQKey(key); @@ -673,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); + intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); m->computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } @@ -736,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { std::string("b"), std::string("b"), std::string("b"), std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", groups[i]); msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); @@ -883,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Queue = a-2, // Owners= ^C3, - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", "a"); msg->insertCustomProperty("MY-ID", 9); queue->deliver(msg); @@ -894,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); - msg = create_message("e", "A"); + msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", "b"); msg->insertCustomProperty("MY-ID", 10); queue->deliver(msg); @@ -925,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { queue->configure(args); for (int i = 0; i < 3; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); // no "GROUP-ID" header msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); @@ -988,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); queue2->create(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); queue1->deliver(msg1); queue2->deliver(msg1); @@ -1004,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); - intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); queue1->deliver(msg2); queue2->deliver(msg2); @@ -1019,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue1->clearLastNodeFailure(); queue2->clearLastNodeFailure(); - intrusive_ptr<Message> msg3 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "B"); queue1->deliver(msg3); queue2->deliver(msg3); BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); @@ -1033,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ * internal details not part of the queue abstraction. // check requeue 1 - intrusive_ptr<Message> msg4 = create_message("e", "C"); - intrusive_ptr<Message> msg5 = create_message("e", "D"); + intrusive_ptr<Message> msg4 = createMessage("e", "C"); + intrusive_ptr<Message> msg5 = createMessage("e", "D"); framing::SequenceNumber sequence(1); QueuedMessage qmsg1(queue1.get(), msg4, sequence); @@ -1081,8 +1096,8 @@ not requeued to the store. queue1->create(args); // check requeue 1 - intrusive_ptr<Message> msg1 = create_message("e", "C"); - intrusive_ptr<Message> msg2 = create_message("e", "D"); + intrusive_ptr<Message> msg1 = createMessage("e", "C"); + intrusive_ptr<Message> msg2 = createMessage("e", "D"); queue1->recover(msg1); @@ -1114,7 +1129,7 @@ simulate store exception going into last node standing queue1->configure(args); // check requeue 1 - intrusive_ptr<Message> msg1 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "C"); queue1->deliver(msg1); testStore.createError(); @@ -1401,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); } +QPID_AUTO_TEST_CASE(testSetPositionFifo) { + Queue::shared_ptr q(new Queue("my-queue", true)); + BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0)); + for (int i = 0; i < 10; ++i) + q->deliver(contentMessage(boost::lexical_cast<string>(i+1))); + + // Verify the front of the queue + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); + // Verify the back of the queue + QueuedMessage qm; + BOOST_CHECK_EQUAL(10u, q->getPosition()); + BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue + BOOST_CHECK_EQUAL("10", getContent(qm.payload)); + BOOST_CHECK_EQUAL(10u, q->getMessageCount()); + + // Using setPosition to introduce a gap in sequence numbers. + q->setPosition(15); + BOOST_CHECK_EQUAL(10u, q->getMessageCount()); + BOOST_CHECK_EQUAL(15u, q->getPosition()); + BOOST_CHECK(q->find(10, qm)); // Back of the queue + BOOST_CHECK_EQUAL("10", getContent(qm.payload)); + q->deliver(contentMessage("16")); + c->setPosition(9); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(10u, c->last.position); + BOOST_CHECK_EQUAL("10", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(16u, c->last.position); + BOOST_CHECK_EQUAL("16", getContent(c->last.payload)); + + // Using setPosition to trunkcate the queue + q->setPosition(5); + BOOST_CHECK_EQUAL(5u, q->getMessageCount()); + q->deliver(contentMessage("6a")); + c->setPosition(4); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(5u, c->last.position); + BOOST_CHECK_EQUAL("5", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(6u, c->last.position); + BOOST_CHECK_EQUAL("6a", getContent(c->last.payload)); + BOOST_CHECK(!q->dispatch(c)); // No more messages. +} + +QPID_AUTO_TEST_CASE(testSetPositionLvq) { + Queue::shared_ptr q(new Queue("my-queue", true)); + string key="key"; + framing::FieldTable args; + args.setString("qpid.last_value_queue_key", "key"); + q->configure(args); + + const char* values[] = { "a", "b", "c", "a", "b", "c" }; + for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { + intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); + m->insertCustomProperty(key, values[i]); + q->deliver(m); + } + BOOST_CHECK_EQUAL(3u, q->getMessageCount()); + // Verify the front of the queue + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL("4", getContent(c->last.payload)); + // Verify the back of the queue + QueuedMessage qm; + BOOST_CHECK_EQUAL(6u, q->getPosition()); + BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue + BOOST_CHECK_EQUAL("6", getContent(qm.payload)); + + q->setPosition(5); + c->setPosition(4); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1 + BOOST_CHECK(!q->dispatch(c)); +} + +QPID_AUTO_TEST_CASE(testSetPositionPriority) { + Queue::shared_ptr q(new Queue("my-queue", true)); + framing::FieldTable args; + args.setInt("qpid.priorities", 10); + q->configure(args); + + const int priorities[] = { 1, 2, 3, 2, 1, 3 }; + for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) { + intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); + m->getFrames().getHeaders()->get<DeliveryProperties>(true) + ->setPriority(priorities[i]); + q->deliver(m); + } + + // Truncation removes messages in fifo order, not priority order. + q->setPosition(3); + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1u, c->last.position); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(2u, c->last.position); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(3u, c->last.position); + BOOST_CHECK(!q->dispatch(c)); + + intrusive_ptr<Message> m = contentMessage("4a"); + m->getFrames().getHeaders()->get<DeliveryProperties>(true) + ->setPriority(4); + q->deliver(m); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4u, c->last.position); + BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + + // But consumers see priority order + c.reset(new TestConsumer("test", true)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4u, c->last.position); + BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(3u, c->last.position); + BOOST_CHECK_EQUAL("3", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(2u, c->last.position); + BOOST_CHECK_EQUAL("2", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1u, c->last.position); + BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); +} QPID_AUTO_TEST_SUITE_END() |