diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-08-03 12:13:32 +0000 |
| commit | d43d1912b376322e27fdcda551a73f9ff5487972 (patch) | |
| tree | ce493e10baa95f44be8beb5778ce51783463196d /cpp/src/tests | |
| parent | 04877fec0c6346edec67072d7f2d247740cf2af5 (diff) | |
| download | qpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
38 files changed, 1846 insertions, 360 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 637442e128..29dfe3634f 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -149,6 +149,7 @@ set(unit_tests_to_build PollableCondition Variant ClientMessage + SystemInfo ${xml_tests} CACHE STRING "Which unit tests to build" ) diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 2fb284741a..66a16b9178 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -33,6 +33,8 @@ #include <iostream> #include "MessageUtils.h" +using std::string; + using boost::intrusive_ptr; using namespace qpid::broker; using namespace qpid::framing; diff --git a/cpp/src/tests/FieldTable.cpp b/cpp/src/tests/FieldTable.cpp index c79d110ae4..8aeeb031b3 100644 --- a/cpp/src/tests/FieldTable.cpp +++ b/cpp/src/tests/FieldTable.cpp @@ -20,6 +20,7 @@ */ #include <iostream> #include <algorithm> +#include "qpid/sys/alloca.h" #include "qpid/framing/Array.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/FieldValue.h" @@ -29,6 +30,8 @@ using namespace qpid::framing; +using std::string; + namespace qpid { namespace tests { @@ -73,11 +76,11 @@ QPID_AUTO_TEST_CASE(testAssignment) FieldTable c; c = a; - char* buff = static_cast<char*>(::alloca(c.encodedSize())); - Buffer wbuffer(buff, c.encodedSize()); + std::vector<char> buff(c.encodedSize()); + Buffer wbuffer(&buff[0], c.encodedSize()); wbuffer.put(c); - Buffer rbuffer(buff, c.encodedSize()); + Buffer rbuffer(&buff[0], c.encodedSize()); rbuffer.get(d); BOOST_CHECK_EQUAL(c, d); BOOST_CHECK(string("CCCC") == c.getAsString("A")); diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 66d2cdd5d5..cdc7429f3b 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -67,7 +67,7 @@ tmodule_LTLIBRARIES= TESTS+=unit_test check_PROGRAMS+=unit_test -unit_test_LDADD=-lboost_unit_test_framework \ +unit_test_LDADD=-lboost_unit_test_framework -lpthread \ $(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2) unit_test_SOURCES= unit_test.cpp unit_test.h \ @@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ Address.cpp \ ClientMessage.cpp \ Qmf2.cpp \ - BrokerOptions.cpp + BrokerOptions.cpp \ + SystemInfo.cpp if HAVE_XML unit_test_SOURCES+= XmlClientSessionTest.cpp @@ -150,7 +151,7 @@ endif # Test programs that are installed and therefore built as part of make, not make check qpidexectest_SCRIPTS += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh -EXTRA_DIST += qpid-cpp-benchmark install_env.sh +EXTRA_DIST += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh qpidexectest_PROGRAMS += receiver receiver_SOURCES = \ @@ -305,7 +306,7 @@ TESTS_ENVIRONMENT = \ system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \ run_msg_group_tests TESTS += start_broker $(system_tests) python_tests stop_broker \ - ha_tests.py run_federation_tests run_federation_sys_tests \ + run_ha_tests run_federation_tests run_federation_sys_tests \ run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ run_queue_flow_limit_tests ipv6_test @@ -352,7 +353,8 @@ EXTRA_DIST += \ run_queue_flow_limit_tests \ run_msg_group_tests \ ipv6_test \ - ha_tests.py \ + run_ha_tests \ + ha_tests.py \ test_env.ps1.in check_LTLIBRARIES += libdlclose_noop.la diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp index c3d40ed88a..9adb133d40 100644 --- a/cpp/src/tests/MessageBuilderTest.cpp +++ b/cpp/src/tests/MessageBuilderTest.cpp @@ -40,7 +40,7 @@ class MockMessageStore : public NullMessageStore uint64_t id; boost::intrusive_ptr<PersistableMessage> expectedMsg; - string expectedData; + std::string expectedData; std::list<Op> ops; void checkExpectation(Op actual) @@ -58,7 +58,7 @@ class MockMessageStore : public NullMessageStore ops.push_back(STAGE); } - void expectAppendContent(PersistableMessage& msg, const string& data) + void expectAppendContent(PersistableMessage& msg, const std::string& data) { expectedMsg = &msg; expectedData = data; @@ -73,7 +73,7 @@ class MockMessageStore : public NullMessageStore } void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg, - const string& data) + const std::string& data) { checkExpectation(APPEND); BOOST_CHECK_EQUAL(boost::static_pointer_cast<const PersistableMessage>(expectedMsg), msg); diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index 7d67c92b37..3a3ed061f9 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -24,7 +24,6 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/FieldValue.h" #include "qpid/framing/Uuid.h" -#include "qpid/sys/alloca.h" #include "unit_test.h" @@ -33,6 +32,8 @@ using namespace qpid::broker; using namespace qpid::framing; +using std::string; + namespace qpid { namespace tests { @@ -69,11 +70,11 @@ QPID_AUTO_TEST_CASE(testEncodeDecode) dProps->setDeliveryMode(PERSISTENT); BOOST_CHECK(msg->isPersistent()); - char* buff = static_cast<char*>(::alloca(msg->encodedSize())); - Buffer wbuffer(buff, msg->encodedSize()); + std::vector<char> buff(msg->encodedSize()); + Buffer wbuffer(&buff[0], msg->encodedSize()); msg->encode(wbuffer); - Buffer rbuffer(buff, msg->encodedSize()); + Buffer rbuffer(&buff[0], msg->encodedSize()); msg = new Message(); msg->decodeHeader(rbuffer); msg->decodeContent(rbuffer); diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index a1b140d484..991e2a2714 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -33,7 +33,7 @@ namespace tests { struct MessageUtils { - static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="", + static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="", const bool durable = false, const Uuid& messageId=Uuid(true), uint64_t contentSize = 0) { @@ -53,7 +53,7 @@ struct MessageUtils return msg; } - static void addContent(boost::intrusive_ptr<Message> msg, const string& data) + static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data) { AMQFrame content((AMQContentBody(data))); msg->getFrames().append(content); diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 968d55fd45..c8ee3aa401 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -1146,6 +1146,24 @@ QPID_AUTO_TEST_CASE(testLargeRoutingKey) BOOST_CHECK_THROW(fix.session.createReceiver(address), qpid::messaging::MessagingException); } +QPID_AUTO_TEST_CASE(testAlternateExchangeInLinkDeclare) +{ + MessagingFixture fix; + Sender s = fix.session.createSender("amq.direct/key"); + Receiver r1 = fix.session.createReceiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}"); + Receiver r2 = fix.session.createReceiver("amq.fanout"); + + for (uint i = 0; i < 10; ++i) { + s.send(Message((boost::format("Message_%1%") % (i+1)).str()), true); + } + r1.close();//orphans all messages in subscription queue, which should then be routed through alternate exchange + for (uint i = 0; i < 10; ++i) { + Message received; + BOOST_CHECK(r2.fetch(received, Duration::SECOND)); + BOOST_CHECK_EQUAL(received.getContent(), (boost::format("Message_%1%") % (i+1)).str()); + } +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index fb429ca981..3b4f74620f 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -31,6 +31,8 @@ #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/client/QueueOptions.h" #include "qpid/framing/AMQFrame.h" @@ -40,8 +42,11 @@ #include "qpid/broker/QueueFlowLimit.h" #include <iostream> -#include "boost/format.hpp" +#include <vector> +#include <boost/format.hpp> +#include <boost/lexical_cast.hpp> +using namespace std; using boost::intrusive_ptr; using namespace qpid; using namespace qpid::broker; @@ -83,7 +88,7 @@ public: Message& getMessage() { return *(msg.get()); } }; -intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) { +intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) { intrusive_ptr<Message> msg(new Message()); AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0))); AMQFrame header((AMQHeaderBody())); @@ -94,6 +99,16 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK return msg; } +intrusive_ptr<Message> contentMessage(string content) { + intrusive_ptr<Message> m(MessageUtils::createMessage()); + MessageUtils::addContent(m, content); + return m; +} + +string getContent(intrusive_ptr<Message> m) { + return m->getFrames().getContent(); +} + QPID_AUTO_TEST_SUITE(QueueTestSuite) QPID_AUTO_TEST_CASE(testAsyncMessage) { @@ -105,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { //Test basic delivery: - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process queue->process(msg1); sleep(2); @@ -120,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) { QPID_AUTO_TEST_CASE(testAsyncMessageCount){ Queue::shared_ptr queue(new Queue("my_test_queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process queue->process(msg1); @@ -145,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){ BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount()); //Test basic delivery: - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); queue->deliver(msg1); BOOST_CHECK(queue->dispatch(c1)); @@ -191,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){ QPID_AUTO_TEST_CASE(testDequeue){ Queue::shared_ptr queue(new Queue("my_queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); intrusive_ptr<Message> received; queue->deliver(msg1); @@ -265,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){ Queue::shared_ptr queue(new Queue("my-queue", true)); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -291,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){ Queue::shared_ptr queue(new Queue("my-queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -317,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){ Queue::shared_ptr queue(new Queue("my-queue", true)); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); //enqueue 2 messages queue->deliver(msg1); @@ -431,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); - intrusive_ptr<Message> msg4 = create_message("e", "D"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); + intrusive_ptr<Message> msg4 = createMessage("e", "D"); intrusive_ptr<Message> received; //set deliever match for LVQ a,b,c,a @@ -466,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){ received = queue->get().payload; BOOST_CHECK_EQUAL(msg3.get(), received.get()); - intrusive_ptr<Message> msg5 = create_message("e", "A"); - intrusive_ptr<Message> msg6 = create_message("e", "B"); - intrusive_ptr<Message> msg7 = create_message("e", "C"); + intrusive_ptr<Message> msg5 = createMessage("e", "A"); + intrusive_ptr<Message> msg6 = createMessage("e", "B"); + intrusive_ptr<Message> msg7 = createMessage("e", "C"); msg5->insertCustomProperty(key,"a"); msg6->insertCustomProperty(key,"b"); msg7->insertCustomProperty(key,"c"); @@ -498,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); string key; args.getLVQKey(key); @@ -524,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "B"); - intrusive_ptr<Message> msg3 = create_message("e", "C"); - intrusive_ptr<Message> msg4 = create_message("e", "D"); - intrusive_ptr<Message> msg5 = create_message("e", "F"); - intrusive_ptr<Message> msg6 = create_message("e", "G"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "C"); + intrusive_ptr<Message> msg4 = createMessage("e", "D"); + intrusive_ptr<Message> msg5 = createMessage("e", "F"); + intrusive_ptr<Message> msg6 = createMessage("e", "G"); //set deliever match for LVQ a,b,c,a @@ -601,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){ queue1->configure(args); queue2->configure(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "A"); string key; args.getLVQKey(key); @@ -645,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ intrusive_ptr<Message> received; queue1->create(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); - intrusive_ptr<Message> msg2 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); + intrusive_ptr<Message> msg2 = createMessage("e", "A"); // 2 string key; args.getLVQKey(key); @@ -673,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){ void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0) { for (uint i = 0; i < count; i++) { - intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl); + intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl); m->computeExpiration(new broker::ExpiryPolicy); queue.deliver(m); } @@ -736,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { std::string("b"), std::string("b"), std::string("b"), std::string("c"), std::string("c"), std::string("c") }; for (int i = 0; i < 9; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", groups[i]); msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); @@ -883,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { // Queue = a-2, // Owners= ^C3, - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", "a"); msg->insertCustomProperty("MY-ID", 9); queue->deliver(msg); @@ -894,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) { gotOne = queue->dispatch(c2); BOOST_CHECK( !gotOne ); - msg = create_message("e", "A"); + msg = createMessage("e", "A"); msg->insertCustomProperty("GROUP-ID", "b"); msg->insertCustomProperty("MY-ID", 10); queue->deliver(msg); @@ -925,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) { queue->configure(args); for (int i = 0; i < 3; ++i) { - intrusive_ptr<Message> msg = create_message("e", "A"); + intrusive_ptr<Message> msg = createMessage("e", "A"); // no "GROUP-ID" header msg->insertCustomProperty("MY-ID", i); queue->deliver(msg); @@ -988,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ Queue::shared_ptr queue2(new Queue("queue2", true, &testStore )); queue2->create(args); - intrusive_ptr<Message> msg1 = create_message("e", "A"); + intrusive_ptr<Message> msg1 = createMessage("e", "A"); queue1->deliver(msg1); queue2->deliver(msg1); @@ -1004,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue2->setLastNodeFailure(); BOOST_CHECK_EQUAL(testStore.enqCnt, 2u); - intrusive_ptr<Message> msg2 = create_message("e", "B"); + intrusive_ptr<Message> msg2 = createMessage("e", "B"); queue1->deliver(msg2); queue2->deliver(msg2); @@ -1019,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ queue1->clearLastNodeFailure(); queue2->clearLastNodeFailure(); - intrusive_ptr<Message> msg3 = create_message("e", "B"); + intrusive_ptr<Message> msg3 = createMessage("e", "B"); queue1->deliver(msg3); queue2->deliver(msg3); BOOST_CHECK_EQUAL(testStore.enqCnt, 4u); @@ -1033,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){ * internal details not part of the queue abstraction. // check requeue 1 - intrusive_ptr<Message> msg4 = create_message("e", "C"); - intrusive_ptr<Message> msg5 = create_message("e", "D"); + intrusive_ptr<Message> msg4 = createMessage("e", "C"); + intrusive_ptr<Message> msg5 = createMessage("e", "D"); framing::SequenceNumber sequence(1); QueuedMessage qmsg1(queue1.get(), msg4, sequence); @@ -1081,8 +1096,8 @@ not requeued to the store. queue1->create(args); // check requeue 1 - intrusive_ptr<Message> msg1 = create_message("e", "C"); - intrusive_ptr<Message> msg2 = create_message("e", "D"); + intrusive_ptr<Message> msg1 = createMessage("e", "C"); + intrusive_ptr<Message> msg2 = createMessage("e", "D"); queue1->recover(msg1); @@ -1114,7 +1129,7 @@ simulate store exception going into last node standing queue1->configure(args); // check requeue 1 - intrusive_ptr<Message> msg1 = create_message("e", "C"); + intrusive_ptr<Message> msg1 = createMessage("e", "C"); queue1->deliver(msg1); testStore.createError(); @@ -1401,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){ BOOST_CHECK_EQUAL(5u, tq9->getMessageCount()); } +QPID_AUTO_TEST_CASE(testSetPositionFifo) { + Queue::shared_ptr q(new Queue("my-queue", true)); + BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0)); + for (int i = 0; i < 10; ++i) + q->deliver(contentMessage(boost::lexical_cast<string>(i+1))); + + // Verify the front of the queue + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); + // Verify the back of the queue + QueuedMessage qm; + BOOST_CHECK_EQUAL(10u, q->getPosition()); + BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue + BOOST_CHECK_EQUAL("10", getContent(qm.payload)); + BOOST_CHECK_EQUAL(10u, q->getMessageCount()); + + // Using setPosition to introduce a gap in sequence numbers. + q->setPosition(15); + BOOST_CHECK_EQUAL(10u, q->getMessageCount()); + BOOST_CHECK_EQUAL(15u, q->getPosition()); + BOOST_CHECK(q->find(10, qm)); // Back of the queue + BOOST_CHECK_EQUAL("10", getContent(qm.payload)); + q->deliver(contentMessage("16")); + c->setPosition(9); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(10u, c->last.position); + BOOST_CHECK_EQUAL("10", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(16u, c->last.position); + BOOST_CHECK_EQUAL("16", getContent(c->last.payload)); + + // Using setPosition to trunkcate the queue + q->setPosition(5); + BOOST_CHECK_EQUAL(5u, q->getMessageCount()); + q->deliver(contentMessage("6a")); + c->setPosition(4); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(5u, c->last.position); + BOOST_CHECK_EQUAL("5", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(6u, c->last.position); + BOOST_CHECK_EQUAL("6a", getContent(c->last.payload)); + BOOST_CHECK(!q->dispatch(c)); // No more messages. +} + +QPID_AUTO_TEST_CASE(testSetPositionLvq) { + Queue::shared_ptr q(new Queue("my-queue", true)); + string key="key"; + framing::FieldTable args; + args.setString("qpid.last_value_queue_key", "key"); + q->configure(args); + + const char* values[] = { "a", "b", "c", "a", "b", "c" }; + for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) { + intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); + m->insertCustomProperty(key, values[i]); + q->deliver(m); + } + BOOST_CHECK_EQUAL(3u, q->getMessageCount()); + // Verify the front of the queue + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1 + BOOST_CHECK_EQUAL("4", getContent(c->last.payload)); + // Verify the back of the queue + QueuedMessage qm; + BOOST_CHECK_EQUAL(6u, q->getPosition()); + BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue + BOOST_CHECK_EQUAL("6", getContent(qm.payload)); + + q->setPosition(5); + c->setPosition(4); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1 + BOOST_CHECK(!q->dispatch(c)); +} + +QPID_AUTO_TEST_CASE(testSetPositionPriority) { + Queue::shared_ptr q(new Queue("my-queue", true)); + framing::FieldTable args; + args.setInt("qpid.priorities", 10); + q->configure(args); + + const int priorities[] = { 1, 2, 3, 2, 1, 3 }; + for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) { + intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1)); + m->getFrames().getHeaders()->get<DeliveryProperties>(true) + ->setPriority(priorities[i]); + q->deliver(m); + } + + // Truncation removes messages in fifo order, not priority order. + q->setPosition(3); + TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1u, c->last.position); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(2u, c->last.position); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(3u, c->last.position); + BOOST_CHECK(!q->dispatch(c)); + + intrusive_ptr<Message> m = contentMessage("4a"); + m->getFrames().getHeaders()->get<DeliveryProperties>(true) + ->setPriority(4); + q->deliver(m); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4u, c->last.position); + BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + + // But consumers see priority order + c.reset(new TestConsumer("test", true)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(4u, c->last.position); + BOOST_CHECK_EQUAL("4a", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(3u, c->last.position); + BOOST_CHECK_EQUAL("3", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(2u, c->last.position); + BOOST_CHECK_EQUAL("2", getContent(c->last.payload)); + BOOST_CHECK(q->dispatch(c)); + BOOST_CHECK_EQUAL(1u, c->last.position); + BOOST_CHECK_EQUAL("1", getContent(c->last.payload)); +} QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/RangeSet.cpp b/cpp/src/tests/RangeSet.cpp index db3a964086..285f432bf7 100644 --- a/cpp/src/tests/RangeSet.cpp +++ b/cpp/src/tests/RangeSet.cpp @@ -29,63 +29,71 @@ namespace tests { QPID_AUTO_TEST_SUITE(RangeSetTestSuite) -typedef qpid::Range<int> TestRange; -typedef qpid::RangeSet<int> TestRangeSet; +typedef qpid::Range<int> TR; // Test Range +typedef RangeSet<int> TRSet; QPID_AUTO_TEST_CASE(testEmptyRange) { - TestRange r; + TR r; + BOOST_CHECK_EQUAL(r, TR(0,0)); BOOST_CHECK(r.empty()); BOOST_CHECK(!r.contains(0)); - // BOOST_CHECK(r.contiguous(0)); } QPID_AUTO_TEST_CASE(testRangeSetAddPoint) { - TestRangeSet r; + TRSet r; BOOST_CHECK(r.empty()); r += 3; BOOST_CHECK_MESSAGE(r.contains(3), r); - BOOST_CHECK_MESSAGE(r.contains(TestRange(3,4)), r); + BOOST_CHECK_MESSAGE(r.contains(TR(3,4)), r); BOOST_CHECK(!r.empty()); r += 5; BOOST_CHECK_MESSAGE(r.contains(5), r); - BOOST_CHECK_MESSAGE(r.contains(TestRange(5,6)), r); - BOOST_CHECK_MESSAGE(!r.contains(TestRange(3,6)), r); + BOOST_CHECK_MESSAGE(r.contains(TR(5,6)), r); + BOOST_CHECK_MESSAGE(!r.contains(TR(3,6)), r); r += 4; - BOOST_CHECK_MESSAGE(r.contains(TestRange(3,6)), r); + BOOST_CHECK_MESSAGE(r.contains(TR(3,6)), r); } QPID_AUTO_TEST_CASE(testRangeSetAddRange) { - TestRangeSet r; - r += TestRange(0,3); - BOOST_CHECK(r.contains(TestRange(0,3))); - r += TestRange(4,6); - BOOST_CHECK_MESSAGE(r.contains(TestRange(4,6)), r); + TRSet r; + r += TR(0,3); + BOOST_CHECK(r.contains(TR(0,3))); + BOOST_CHECK(r.contiguous()); + r += TR(4,6); + BOOST_CHECK(!r.contiguous()); + BOOST_CHECK_MESSAGE(r.contains(TR(4,6)), r); r += 3; - BOOST_CHECK_MESSAGE(r.contains(TestRange(0,6)), r); + BOOST_CHECK_MESSAGE(r.contains(TR(0,6)), r); BOOST_CHECK(r.front() == 0); BOOST_CHECK(r.back() == 6); + + // Merging additions + r = TRSet(0,3)+TR(5,6); + TRSet e(0,6); + BOOST_CHECK_EQUAL(r + TR(3,5), e); + BOOST_CHECK(e.contiguous()); + r = TRSet(0,5)+TR(10,15)+TR(20,25)+TR(30,35)+TR(40,45); + BOOST_CHECK_EQUAL(r + TR(11,37), TRSet(0,5)+TR(11,37)+TR(40,45)); } QPID_AUTO_TEST_CASE(testRangeSetAddSet) { - TestRangeSet r; - TestRangeSet s = TestRangeSet(0,3)+TestRange(5,10); + TRSet r; + TRSet s = TRSet(0,3)+TR(5,10); r += s; BOOST_CHECK_EQUAL(r,s); - r += TestRangeSet(3,5) + TestRange(7,12) + 15; - BOOST_CHECK_EQUAL(r, TestRangeSet(0,12) + 15); + r += TRSet(3,5) + TR(7,12) + 15; + BOOST_CHECK_EQUAL(r, TRSet(0,12) + 15); r.clear(); BOOST_CHECK(r.empty()); - r += TestRange::makeClosed(6,10); - BOOST_CHECK_EQUAL(r, TestRangeSet(6,11)); - r += TestRangeSet(2,6)+8; - BOOST_CHECK_EQUAL(r, TestRangeSet(2,11)); + r += TR::makeClosed(6,10); + BOOST_CHECK_EQUAL(r, TRSet(6,11)); + r += TRSet(2,6)+8; + BOOST_CHECK_EQUAL(r, TRSet(2,11)); } QPID_AUTO_TEST_CASE(testRangeSetIterate) { - TestRangeSet r; - (((r += 1) += 10) += TestRange(4,7)) += 2; - BOOST_MESSAGE(r); + TRSet r = TRSet(1,3)+TR(4,7)+TR(10,11); std::vector<int> actual; std::copy(r.begin(), r.end(), std::back_inserter(actual)); std::vector<int> expect = boost::assign::list_of(1)(2)(4)(5)(6)(10); @@ -94,51 +102,51 @@ QPID_AUTO_TEST_CASE(testRangeSetIterate) { QPID_AUTO_TEST_CASE(testRangeSetRemove) { // points - BOOST_CHECK_EQUAL(TestRangeSet(0,5)-3, TestRangeSet(0,3)+TestRange(4,5)); - BOOST_CHECK_EQUAL(TestRangeSet(1,5)-5, TestRangeSet(1,5)); - BOOST_CHECK_EQUAL(TestRangeSet(1,5)-0, TestRangeSet(1,5)); + BOOST_CHECK_EQUAL(TRSet(0,5)-3, TRSet(0,3)+TR(4,5)); + BOOST_CHECK_EQUAL(TRSet(1,5)-5, TRSet(1,5)); + BOOST_CHECK_EQUAL(TRSet(1,5)-0, TRSet(1,5)); - TestRangeSet r(TestRangeSet(0,5)+TestRange(10,15)+TestRange(20,25)); + TRSet r(TRSet(0,5)+TR(10,15)+TR(20,25)); - // TestRanges - BOOST_CHECK_EQUAL(r-TestRange(0,5), TestRangeSet(10,15)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(10,15), TestRangeSet(0,5)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(20,25), TestRangeSet(0,5)+TestRange(10,15)); + // TRs + BOOST_CHECK_EQUAL(r-TR(0,5), TRSet(10,15)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(10,15), TRSet(0,5)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(20,25), TRSet(0,5)+TR(10,15)); - BOOST_CHECK_EQUAL(r-TestRange(-5, 30), TestRangeSet()); + BOOST_CHECK_EQUAL(r-TR(-5, 30), TRSet()); - BOOST_CHECK_EQUAL(r-TestRange(-5, 7), TestRangeSet(10,15)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(8,19), TestRangeSet(0,5)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(17,30), TestRangeSet(0,5)+TestRange(10,15)); - BOOST_CHECK_EQUAL(r-TestRange(17,30), TestRangeSet(0,5)+TestRange(10,15)); + BOOST_CHECK_EQUAL(r-TR(-5, 7), TRSet(10,15)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(8,19), TRSet(0,5)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(17,30), TRSet(0,5)+TR(10,15)); - BOOST_CHECK_EQUAL(r-TestRange(-5, 5), TestRangeSet(10,15)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(10,19), TestRangeSet(0,5)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(18,25), TestRangeSet(0,5)+TestRange(10,15)); - BOOST_CHECK_EQUAL(r-TestRange(23,25), TestRangeSet(0,5)+TestRange(10,15)+TestRange(20,23)); + BOOST_CHECK_EQUAL(r-TR(-5, 5), TRSet(10,15)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(10,19), TRSet(0,5)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(18,25), TRSet(0,5)+TR(10,15)); + BOOST_CHECK_EQUAL(r-TR(23,25), TRSet(0,5)+TR(10,15)+TR(20,23)); - BOOST_CHECK_EQUAL(r-TestRange(-3, 3), TestRangeSet(3,5)+TestRange(10,15)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(3, 7), TestRangeSet(0,2)+TestRange(10,15)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(3, 12), TestRangeSet(0,3)+TestRange(12,15)+TestRange(20,25)); - BOOST_CHECK_EQUAL(r-TestRange(3, 22), TestRangeSet(12,15)+TestRange(22,25)); - BOOST_CHECK_EQUAL(r-TestRange(12, 22), TestRangeSet(0,5)+TestRange(10,11)+TestRange(22,25)); + BOOST_CHECK_EQUAL(r-TR(-3, 3), TRSet(3,5)+TR(10,15)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(3, 7), TRSet(0,2)+TR(10,15)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(3, 12), TRSet(0,3)+TR(12,15)+TR(20,25)); + BOOST_CHECK_EQUAL(r-TR(3, 22), TRSet(12,15)+TR(22,25)); + BOOST_CHECK_EQUAL(r-TR(12, 22), TRSet(0,5)+TR(10,11)+TR(22,25)); // Sets - BOOST_CHECK_EQUAL(r-(TestRangeSet(-1,6)+TestRange(11,14)+TestRange(23,25)), - TestRangeSet(10,11)+TestRange(14,15)+TestRange(20,23)); -} - -QPID_AUTO_TEST_CASE(testRangeContaining) { - TestRangeSet r; - (((r += 1) += TestRange(3,5)) += 7); - BOOST_CHECK_EQUAL(r.rangeContaining(0), TestRange(0,0)); - BOOST_CHECK_EQUAL(r.rangeContaining(1), TestRange(1,2)); - BOOST_CHECK_EQUAL(r.rangeContaining(2), TestRange(2,2)); - BOOST_CHECK_EQUAL(r.rangeContaining(3), TestRange(3,5)); - BOOST_CHECK_EQUAL(r.rangeContaining(4), TestRange(3,5)); - BOOST_CHECK_EQUAL(r.rangeContaining(5), TestRange(5,5)); - BOOST_CHECK_EQUAL(r.rangeContaining(6), TestRange(6,6)); - BOOST_CHECK_EQUAL(r.rangeContaining(7), TestRange(7,8)); + BOOST_CHECK_EQUAL(r-(TRSet(-1,6)+TR(11,14)+TR(23,25)), + TRSet(10,11)+TR(14,15)+TR(20,23)); + // Split the ranges + BOOST_CHECK_EQUAL(r-(TRSet(2,3)+TR(11,13)+TR(21,23)), + TRSet(0,2)+TR(4,5)+ + TR(10,11)+TR(14,15)+ + TR(20,21)+TR(23,25)); + // Truncate the ranges + BOOST_CHECK_EQUAL(r-(TRSet(0,3)+TR(13,15)+TR(19,23)), + TRSet(3,5)+TR(10,13)+TR(20,23)); + // Remove multiple ranges with truncation + BOOST_CHECK_EQUAL(r-(TRSet(3,23)), TRSet(0,3)+TR(23,25)); + // Remove multiple ranges in middle + TRSet r2 = TRSet(0,5)+TR(10,15)+TR(20,25)+TR(30,35); + BOOST_CHECK_EQUAL(r2-TRSet(11,24), + TRSet(0,5)+TR(10,11)+TR(24,25)+TR(30,35)); } QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/ReplicationTest.cpp b/cpp/src/tests/ReplicationTest.cpp index 1219a6b59e..055f06579f 100644 --- a/cpp/src/tests/ReplicationTest.cpp +++ b/cpp/src/tests/ReplicationTest.cpp @@ -62,7 +62,7 @@ qpid::sys::Shlib plugin(getLibPath("REPLICATING_LISTENER_LIB", default_shlib)); qpid::broker::Broker::Options getBrokerOpts(const std::vector<std::string>& args) { std::vector<const char*> argv(args.size()); - transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1)); + transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1)); qpid::broker::Broker::Options opts; qpid::Plugin::addOptions(opts); @@ -72,7 +72,7 @@ qpid::broker::Broker::Options getBrokerOpts(const std::vector<std::string>& args QPID_AUTO_TEST_CASE(testReplicationExchange) { - qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<string>("qpidd") + qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<std::string>("qpidd") ("--replication-exchange-name=qpid.replication"))); SessionFixture f(brokerOpts); diff --git a/cpp/src/tests/SystemInfo.cpp b/cpp/src/tests/SystemInfo.cpp new file mode 100644 index 0000000000..12d8d3dba8 --- /dev/null +++ b/cpp/src/tests/SystemInfo.cpp @@ -0,0 +1,52 @@ +/* + * + * Copyright (c) 2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + + +#include "unit_test.h" +#include "test_tools.h" +#include "qpid/sys/SystemInfo.h" +#include <boost/assign.hpp> + +using namespace std; +using namespace qpid::sys; +using namespace boost::assign; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(SystemInfoTestSuite) + +QPID_AUTO_TEST_CASE(TestIsLocalHost) { + // Test that local hostname and addresses are considered local + Address a; + BOOST_ASSERT(SystemInfo::getLocalHostname(a)); + BOOST_ASSERT(SystemInfo::isLocalHost(a.host)); + std::vector<Address> addrs; + SystemInfo::getLocalIpAddresses(0, addrs); + for (std::vector<Address>::iterator i = addrs.begin(); i != addrs.end(); ++i) + BOOST_ASSERT(SystemInfo::isLocalHost(i->host)); + // Check some non-local addresses + BOOST_ASSERT(!SystemInfo::isLocalHost("123.4.5.6")); + BOOST_ASSERT(!SystemInfo::isLocalHost("nosuchhost")); + BOOST_ASSERT(SystemInfo::isLocalHost("127.0.0.1")); + BOOST_ASSERT(SystemInfo::isLocalHost("::1")); +} + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/cpp/src/tests/TestMessageStore.h b/cpp/src/tests/TestMessageStore.h index 20e0b755b2..0b63bc9c15 100644 --- a/cpp/src/tests/TestMessageStore.h +++ b/cpp/src/tests/TestMessageStore.h @@ -31,7 +31,7 @@ using namespace qpid::framing; namespace qpid { namespace tests { -typedef std::pair<string, boost::intrusive_ptr<PersistableMessage> > msg_queue_pair; +typedef std::pair<std::string, boost::intrusive_ptr<PersistableMessage> > msg_queue_pair; class TestMessageStore : public NullMessageStore { diff --git a/cpp/src/tests/TimerTest.cpp b/cpp/src/tests/TimerTest.cpp index 6a0a196f4e..fc5004dcb0 100644 --- a/cpp/src/tests/TimerTest.cpp +++ b/cpp/src/tests/TimerTest.cpp @@ -81,6 +81,8 @@ class TestTask : public TimerTask uint64_t difference = _abs64(expected - actual); #elif defined(_WIN32) uint64_t difference = labs(expected - actual); +#elif defined(__SUNPRO_CC) + uint64_t difference = llabs(expected - actual); #else uint64_t difference = abs(expected - actual); #endif diff --git a/cpp/src/tests/TopicExchangeTest.cpp b/cpp/src/tests/TopicExchangeTest.cpp index ff8931f9c9..d57951ea3f 100644 --- a/cpp/src/tests/TopicExchangeTest.cpp +++ b/cpp/src/tests/TopicExchangeTest.cpp @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ +#include "qpid/broker/TopicKeyNode.h" #include "qpid/broker/TopicExchange.h" #include "unit_test.h" #include "test_tools.h" @@ -32,14 +33,15 @@ class TopicExchange::TopicExchangeTester { public: typedef std::vector<std::string> BindingVec; + typedef TopicKeyNode<TopicExchange::BindingKey> TestBindingNode; private: // binding node iterator that collects all routes that are bound - class TestFinder : public TopicExchange::BindingNode::TreeIterator { + class TestFinder : public TestBindingNode::TreeIterator { public: TestFinder(BindingVec& m) : bv(m) {}; ~TestFinder() {}; - bool visit(BindingNode& node) { + bool visit(TestBindingNode& node) { if (!node.bindings.bindingVector.empty()) bv.push_back(node.routePattern); return true; @@ -53,7 +55,7 @@ public: ~TopicExchangeTester() {}; bool addBindingKey(const std::string& bKey) { string routingPattern = normalize(bKey); - BindingKey *bk = bindingTree.addBindingKey(routingPattern); + BindingKey *bk = bindingTree.add(routingPattern); if (bk) { // push a dummy binding to mark this node as "non-leaf" bk->bindingVector.push_back(Binding::shared_ptr()); @@ -64,12 +66,12 @@ public: bool removeBindingKey(const std::string& bKey){ string routingPattern = normalize(bKey); - BindingKey *bk = bindingTree.getBindingKey(routingPattern); + BindingKey *bk = bindingTree.get(routingPattern); if (bk) { bk->bindingVector.pop_back(); if (bk->bindingVector.empty()) { // no more bindings - remove this node - bindingTree.removeBindingKey(routingPattern); + bindingTree.remove(routingPattern); } return true; } @@ -87,7 +89,7 @@ public: } private: - TopicExchange::BindingNode bindingTree; + TestBindingNode bindingTree; }; } // namespace broker diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index 152581e4ba..a636646035 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -69,9 +69,9 @@ QPID_AUTO_TEST_CASE(testPrepare) //ensure messages are enqueued in store t.op.prepare(0); BOOST_CHECK_EQUAL((size_t) 2, t.store.enqueued.size()); - BOOST_CHECK_EQUAL(string("queue1"), t.store.enqueued[0].first); + BOOST_CHECK_EQUAL(std::string("queue1"), t.store.enqueued[0].first); BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second); - BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first); + BOOST_CHECK_EQUAL(std::string("queue2"), t.store.enqueued[1].first); BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second); BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete()); } diff --git a/cpp/src/tests/Uuid.cpp b/cpp/src/tests/Uuid.cpp index f85a297adc..aa9580e25e 100644 --- a/cpp/src/tests/Uuid.cpp +++ b/cpp/src/tests/Uuid.cpp @@ -19,7 +19,6 @@ #include "qpid/framing/Uuid.h" #include "qpid/framing/Buffer.h" #include "qpid/types/Uuid.h" -#include "qpid/sys/alloca.h" #include "unit_test.h" @@ -52,6 +51,11 @@ boost::array<uint8_t, 16> sample = {{0x1b, 0x4e, 0x28, 0xba, 0x2f, 0xa1, 0x11, const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb"); const string zeroStr("00000000-0000-0000-0000-000000000000"); +QPID_AUTO_TEST_CASE(testUuidStr) { + Uuid uuid(sampleStr); + BOOST_CHECK(uuid == sample); +} + QPID_AUTO_TEST_CASE(testUuidIstream) { Uuid uuid; istringstream in(sampleStr); @@ -92,12 +96,12 @@ QPID_AUTO_TEST_CASE(testUuidIOstream) { } QPID_AUTO_TEST_CASE(testUuidEncodeDecode) { - char* buff = static_cast<char*>(::alloca(Uuid::size())); - Buffer wbuf(buff, Uuid::size()); + std::vector<char> buff(Uuid::size()); + Buffer wbuf(&buff[0], Uuid::size()); Uuid uuid(sample.c_array()); uuid.encode(wbuf); - Buffer rbuf(buff, Uuid::size()); + Buffer rbuf(&buff[0], Uuid::size()); Uuid decoded; decoded.decode(rbuf); BOOST_CHECK_EQUAL(string(sample.begin(), sample.end()), diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py index 720b3b4216..0e096a6f5b 100755 --- a/cpp/src/tests/acl.py +++ b/cpp/src/tests/acl.py @@ -285,10 +285,38 @@ class ACLTests(TestBase010): if (result): self.fail(result) + def test_nested_groups(self): + """ + Test nested groups + """ + + aclf = self.get_acl_file() + aclf.write('group user-consume martin@QPID ted@QPID\n') + aclf.write('group group2 kim@QPID user-consume rob@QPID \n') + aclf.write('acl allow anonymous all all \n') + aclf.write('acl allow group2 create queue \n') + aclf.write('acl deny all all') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + session = self.get_session('rob','rob') + try: + session.queue_declare(queue="rob_queue") + except qpid.session.SessionException, e: + if (403 == e.args[0].error_code): + self.fail("ACL should allow queue create request"); + self.fail("Error during queue create request"); + + + def test_user_realm(self): """ Test a user defined without a realm Ex. group admin rajith + Note: a user name without a realm is interpreted as a group name """ aclf = self.get_acl_file() aclf.write('group admin bob\n') # shouldn't be allowed @@ -297,7 +325,7 @@ class ACLTests(TestBase010): aclf.close() result = self.reload_acl() - if (result.find("Username 'bob' must contain a realm",0,len(result)) == -1): + if (result.find("not defined yet.",0,len(result)) == -1): self.fail(result) def test_allowed_chars_for_username(self): @@ -1509,6 +1537,124 @@ class ACLTests(TestBase010): #===================================== + # QMF Topic Exchange tests + #===================================== + + def test_qmf_topic_exchange_tests(self): + """ + Test using QMF method hooks into ACL logic + """ + aclf = self.get_acl_file() + aclf.write('# begin hack alert: allow anonymous to access the lookup debug functions\n') + aclf.write('acl allow-log anonymous create queue\n') + aclf.write('acl allow-log anonymous all exchange name=qmf.*\n') + aclf.write('acl allow-log anonymous all exchange name=amq.direct\n') + aclf.write('acl allow-log anonymous all exchange name=qpid.management\n') + aclf.write('acl allow-log anonymous access method name=*\n') + aclf.write('# end hack alert\n') + aclf.write('acl allow-log uPlain1@COMPANY publish exchange name=X routingkey=ab.cd.e\n') + aclf.write('acl allow-log uPlain2@COMPANY publish exchange name=X routingkey=.\n') + aclf.write('acl allow-log uStar1@COMPANY publish exchange name=X routingkey=a.*.b\n') + aclf.write('acl allow-log uStar2@COMPANY publish exchange name=X routingkey=*.x\n') + aclf.write('acl allow-log uStar3@COMPANY publish exchange name=X routingkey=x.x.*\n') + aclf.write('acl allow-log uHash1@COMPANY publish exchange name=X routingkey=a.#.b\n') + aclf.write('acl allow-log uHash2@COMPANY publish exchange name=X routingkey=a.#\n') + aclf.write('acl allow-log uHash3@COMPANY publish exchange name=X routingkey=#.a\n') + aclf.write('acl allow-log uHash4@COMPANY publish exchange name=X routingkey=a.#.b.#.c\n') + aclf.write('acl allow-log uMixed1@COMPANY publish exchange name=X routingkey=*.x.#.y\n') + aclf.write('acl allow-log uMixed2@COMPANY publish exchange name=X routingkey=a.#.b.*\n') + aclf.write('acl allow-log uMixed3@COMPANY publish exchange name=X routingkey=*.*.*.#\n') + + aclf.write('acl allow-log all publish exchange name=X routingkey=MN.OP.Q\n') + aclf.write('acl allow-log all publish exchange name=X routingkey=M.*.N\n') + aclf.write('acl allow-log all publish exchange name=X routingkey=M.#.N\n') + aclf.write('acl allow-log all publish exchange name=X routingkey=*.M.#.N\n') + + aclf.write('acl deny-log all all\n') + aclf.close() + + result = self.reload_acl() + if (result): + self.fail(result) + + # aclKey: "ab.cd.e" + self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd.e", "allow-log") + self.LookupPublish("uPlain1@COMPANY", "X", "abx.cd.e", "deny-log") + self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd", "deny-log") + self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd..e.", "deny-log") + self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd.e.", "deny-log") + self.LookupPublish("uPlain1@COMPANY", "X", ".ab.cd.e", "deny-log") + # aclKey: "." + self.LookupPublish("uPlain2@COMPANY", "X", ".", "allow-log") + + # aclKey: "a.*.b" + self.LookupPublish("uStar1@COMPANY", "X", "a.xx.b", "allow-log") + self.LookupPublish("uStar1@COMPANY", "X", "a.b", "deny-log") + # aclKey: "*.x" + self.LookupPublish("uStar2@COMPANY", "X", "y.x", "allow-log") + self.LookupPublish("uStar2@COMPANY", "X", ".x", "allow-log") + self.LookupPublish("uStar2@COMPANY", "X", "x", "deny-log") + # aclKey: "x.x.*" + self.LookupPublish("uStar3@COMPANY", "X", "x.x.y", "allow-log") + self.LookupPublish("uStar3@COMPANY", "X", "x.x.", "allow-log") + self.LookupPublish("uStar3@COMPANY", "X", "x.x", "deny-log") + self.LookupPublish("uStar3@COMPANY", "X", "q.x.y", "deny-log") + + # aclKey: "a.#.b" + self.LookupPublish("uHash1@COMPANY", "X", "a.b", "allow-log") + self.LookupPublish("uHash1@COMPANY", "X", "a.x.b", "allow-log") + self.LookupPublish("uHash1@COMPANY", "X", "a..x.y.zz.b", "allow-log") + self.LookupPublish("uHash1@COMPANY", "X", "a.b.", "deny-log") + self.LookupPublish("uHash1@COMPANY", "X", "q.x.b", "deny-log") + + # aclKey: "a.#" + self.LookupPublish("uHash2@COMPANY", "X", "a", "allow-log") + self.LookupPublish("uHash2@COMPANY", "X", "a.b", "allow-log") + self.LookupPublish("uHash2@COMPANY", "X", "a.b.c", "allow-log") + + # aclKey: "#.a" + self.LookupPublish("uHash3@COMPANY", "X", "a", "allow-log") + self.LookupPublish("uHash3@COMPANY", "X", "x.y.a", "allow-log") + + # aclKey: "a.#.b.#.c" + self.LookupPublish("uHash4@COMPANY", "X", "a.b.c", "allow-log") + self.LookupPublish("uHash4@COMPANY", "X", "a.x.b.y.c", "allow-log") + self.LookupPublish("uHash4@COMPANY", "X", "a.x.x.b.y.y.c", "allow-log") + + # aclKey: "*.x.#.y" + self.LookupPublish("uMixed1@COMPANY", "X", "a.x.y", "allow-log") + self.LookupPublish("uMixed1@COMPANY", "X", "a.x.p.qq.y", "allow-log") + self.LookupPublish("uMixed1@COMPANY", "X", "a.a.x.y", "deny-log") + self.LookupPublish("uMixed1@COMPANY", "X", "aa.x.b.c", "deny-log") + + # aclKey: "a.#.b.*" + self.LookupPublish("uMixed2@COMPANY", "X", "a.b.x", "allow-log") + self.LookupPublish("uMixed2@COMPANY", "X", "a.x.x.x.b.x", "allow-log") + + # aclKey: "*.*.*.#" + self.LookupPublish("uMixed3@COMPANY", "X", "x.y.z", "allow-log") + self.LookupPublish("uMixed3@COMPANY", "X", "x.y.z.a.b.c", "allow-log") + self.LookupPublish("uMixed3@COMPANY", "X", "x.y", "deny-log") + self.LookupPublish("uMixed3@COMPANY", "X", "x", "deny-log") + + # Repeat the keys with wildcard user spec + self.LookupPublish("uPlain1@COMPANY", "X", "MN.OP.Q", "allow-log") + self.LookupPublish("uStar1@COMPANY" , "X", "M.xx.N", "allow-log") + self.LookupPublish("uHash1@COMPANY" , "X", "M.N", "allow-log") + self.LookupPublish("uHash1@COMPANY" , "X", "M.x.N", "allow-log") + self.LookupPublish("uHash1@COMPANY" , "X", "M..x.y.zz.N", "allow-log") + self.LookupPublish("uMixed1@COMPANY", "X", "a.M.N", "allow-log") + self.LookupPublish("uMixed1@COMPANY", "X", "a.M.p.qq.N", "allow-log") + + self.LookupPublish("dev@QPID", "X", "MN.OP.Q", "allow-log") + self.LookupPublish("dev@QPID", "X", "M.xx.N", "allow-log") + self.LookupPublish("dev@QPID", "X", "M.N", "allow-log") + self.LookupPublish("dev@QPID", "X", "M.x.N", "allow-log") + self.LookupPublish("dev@QPID", "X", "M..x.y.zz.N", "allow-log") + self.LookupPublish("dev@QPID", "X", "a.M.N", "allow-log") + self.LookupPublish("dev@QPID", "X", "a.M.p.qq.N", "allow-log") + + #===================================== # Connection limits #===================================== diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake index 795ea55cf7..51efa88bd3 100644 --- a/cpp/src/tests/asyncstore.cmake +++ b/cpp/src/tests/asyncstore.cmake @@ -45,7 +45,7 @@ if (UNIX) qpidbroker rt ) - add_test (Store_Perftools_Smoke_Test ${CMAKE_CURRENT_SOURCE_DIR}/storePerftools/storePerftoolsSmokeTest.sh) + add_test (jrnl2Perf_smoke_test ${CMAKE_CURRENT_SOURCE_DIR}/storePerftools/jrnl2Perf_smoke_test.sh) endif (UNIX) # Async store perf test (asyncPerf) @@ -77,4 +77,5 @@ if (UNIX) qpidtypes rt ) + add_test (asyncStorePerf_smoke_test ${CMAKE_CURRENT_SOURCE_DIR}/storePerftools/asyncStorePerf_smoke_test.sh) endif (UNIX) diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 8255fbe9ac..aea4460e5a 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -76,18 +76,20 @@ def error_line(filename, n=1): except: return "" return ":\n" + "".join(result) -def retry(function, timeout=10, delay=.01): - """Call function until it returns True or timeout expires. - Double the delay for each retry. Return True if function - returns true, False if timeout expires.""" +def retry(function, timeout=10, delay=.01, max_delay=1): + """Call function until it returns a true value or timeout expires. + Double the delay for each retry up to max_delay. + Returns what function returns if true, None if timeout expires.""" deadline = time.time() + timeout - while not function(): + ret = None + while True: + ret = function() + if ret: return ret remaining = deadline - time.time() if remaining <= 0: return False delay = min(delay, remaining) time.sleep(delay) - delay *= 2 - return True + delay = min(delay*2, max_delay) class AtomicCounter: def __init__(self): @@ -239,15 +241,13 @@ def find_in_file(str, filename): class Broker(Popen): "A broker process. Takes care of start, stop and logging." _broker_count = 0 + _log_count = 0 - def __str__(self): return "Broker<%s %s>"%(self.name, self.pname) + def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port()) def find_log(self): - self.log = "%s.log" % self.name - i = 1 - while (os.path.exists(self.log)): - self.log = "%s-%d.log" % (self.name, i) - i += 1 + self.log = "%03d:%s.log" % (Broker._log_count, self.name) + Broker._log_count += 1 def get_log(self): return os.path.abspath(self.log) @@ -298,9 +298,9 @@ class Broker(Popen): # Read port from broker process stdout if not already read. if (self._port == 0): try: self._port = int(self.stdout.readline()) - except ValueError: - raise Exception("Can't get port for broker %s (%s)%s" % - (self.name, self.pname, error_line(self.log,5))) + except ValueError, e: + raise Exception("Can't get port for broker %s (%s)%s: %s" % + (self.name, self.pname, error_line(self.log,5), e)) return self._port def unexpected(self,msg): @@ -572,7 +572,7 @@ class NumberedSender(Thread): """ Thread.__init__(self) cmd = ["qpid-send", - "--broker", url or broker.host_port(), + "--broker", url or broker.host_port(), "--address", "%s;{create:always}"%queue, "--connection-options", "{%s}"%(connection_options), "--content-stdin" @@ -647,6 +647,7 @@ class NumberedReceiver(Thread): self.error = None self.sender = sender self.received = 0 + self.queue = queue def read_message(self): n = int(self.receiver.stdout.readline()) @@ -657,7 +658,7 @@ class NumberedReceiver(Thread): m = self.read_message() while m != -1: self.receiver.assert_running() - assert(m <= self.received) # Check for missing messages + assert m <= self.received, "%s missing message %s>%s"%(self.queue, m, self.received) if (m == self.received): # Ignore duplicates self.received += 1 if self.sender: diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py index 003d82c619..22f2470590 100755 --- a/cpp/src/tests/cluster_test_logs.py +++ b/cpp/src/tests/cluster_test_logs.py @@ -66,7 +66,8 @@ def filter_log(log): 'debug Sending keepalive signal to watchdog', # Watchdog timer thread 'last broker standing joined by 1 replicas, updating queue policies.', 'Connection .* timed out: closing', # heartbeat connection close - "org.apache.qpid.broker:bridge:" # ignore bridge index + "org.apache.qpid.broker:bridge:", # ignore bridge index + "closed connection" ]) # Regex to match a UUID uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w' diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index 8952f5de7b..3c96b252df 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -227,6 +227,18 @@ acl deny all all self.assertEqual("x", cluster[0].get_message("q").content) self.assertEqual("y", cluster[1].get_message("q").content) + def test_other_mech(self): + """Test using a mechanism other than PLAIN/ANONYMOUS for cluster update authentication. + Regression test for https://issues.apache.org/jira/browse/QPID-3849""" + sasl_config=os.path.join(self.rootdir, "sasl_config") + cluster = self.cluster(2, args=["--auth", "yes", "--sasl-config", sasl_config, + "--cluster-username=zig", + "--cluster-password=zig", + "--cluster-mechanism=DIGEST-MD5"]) + cluster[0].connect() + cluster.start() # Before the fix this broker falied to join the cluster. + cluster[2].connect() + def test_link_events(self): """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543""" args = ["--mgmt-pub-interval", 1] # Publish management information every second. @@ -768,6 +780,35 @@ acl deny all all fetch(cluster[2]) + def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30): + """ Prove that traffic can pass between two federated brokers. + """ + tot_time = 0 + active = False + send_session = src_broker.connect().session() + sender = send_session.sender(src) + receive_session = dst_broker.connect().session() + receiver = receive_session.receiver(dst) + while not active and tot_time < timeout: + sender.send(Message("Hello from Source!")) + try: + receiver.fetch(timeout = 1) + receive_session.acknowledge() + # Get this far without Empty exception, and the link is good! + active = True + while True: + # Keep receiving msgs, as several may have accumulated + receiver.fetch(timeout = 1) + receive_session.acknowledge() + except Empty: + if not active: + tot_time += 1 + receiver.close() + receive_session.close() + sender.close() + send_session.close() + return active + def test_federation_failover(self): """ Verify that federation operates across failures occuring in a cluster. @@ -778,38 +819,6 @@ acl deny all all cluster to newly-added members """ - TIMEOUT = 30 - def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT): - """ Prove that traffic can pass from source fed broker to - destination fed broker - """ - tot_time = 0 - active = False - send_session = src_broker.connect().session() - sender = send_session.sender(src) - receive_session = dst_broker.connect().session() - receiver = receive_session.receiver(dst) - while not active and tot_time < timeout: - sender.send(Message("Hello from Source!")) - try: - receiver.fetch(timeout = 1) - receive_session.acknowledge() - # Get this far without Empty exception, and the link is good! - active = True - while True: - # Keep receiving msgs, as several may have accumulated - receiver.fetch(timeout = 1) - receive_session.acknowledge() - except Empty: - if not active: - tot_time += 1 - receiver.close() - receive_session.close() - sender.close() - send_session.close() - self.assertTrue(active, "Bridge failed to become active") - - # 2 node cluster source, 2 node cluster destination src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL) src_cluster.ready(); @@ -848,43 +857,145 @@ acl deny all all self.assertEqual(result.status, 0, result) # check that traffic passes - verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ") # add src[2] broker to source cluster src_cluster.start(expect=EXPECT_EXIT_FAIL); src_cluster.ready(); - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill src[0]. dst[0] should fail over to src[1] src_cluster[0].kill() for b in src_cluster[1:]: b.ready() - verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ") # Kill src[1], dst[0] should fail over to src[2] src_cluster[1].kill() for b in src_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ") # Kill dest[0], force failover to dest[1] dst_cluster[0].kill() for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Add dest[2] # dest[1] syncs dest[2] to current remote state dst_cluster.start(expect=EXPECT_EXIT_FAIL); for b in dst_cluster[1:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ") # Kill dest[1], force failover to dest[2] dst_cluster[1].kill() for b in dst_cluster[2:]: b.ready() - verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ") + assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ") for i in range(2, len(src_cluster)): src_cluster[i].kill() for i in range(2, len(dst_cluster)): dst_cluster[i].kill() + def test_federation_multilink_failover(self): + """ + Verify that multi-link federation operates across failures occuring in + a cluster. + """ + + # 1 node cluster source, 1 node cluster destination + src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + src_cluster.ready(); + dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL) + dst_cluster.ready(); + + # federate a direct binding across two separate links + + # first, create a direct exchange bound to two queues using different + # bindings + cmd = self.popen(["qpid-config", + "--broker", src_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "exchange", "direct", "FedX"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ1"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ1", "one"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "add", "queue", "destQ2"], + EXPECT_EXIT_OK) + cmd.wait() + + cmd = self.popen(["qpid-config", + "--broker", dst_cluster[0].host_port(), + "bind", "FedX", "destQ2", "two"], + EXPECT_EXIT_OK) + cmd.wait() + + # Create two separate links between the dst and source brokers, bind + # each to different keys + dst_cluster[0].startQmf() + dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0] + + for _l in [("link1", "bridge1", "one"), + ("link2", "bridge2", "two")]: + result = dst_broker.create("link", _l[0], + {"host":src_cluster[0].host(), + "port":src_cluster[0].port()}, + False) + self.assertEqual(result.status, 0, result); + result = dst_broker.create("bridge", _l[1], + {"link":_l[0], + "src":"FedX", + "dest":"FedX", + "key":_l[2]}, False) + self.assertEqual(result.status, 0); + + # check that traffic passes + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + # add new member, verify traffic + src_cluster.start(expect=EXPECT_EXIT_FAIL); + src_cluster.ready(); + + dst_cluster.start(expect=EXPECT_EXIT_FAIL); + dst_cluster.ready(); + + assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2") + + src_cluster[0].kill() + for b in src_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2") + + dst_cluster[0].kill() + for b in dst_cluster[1:]: b.ready() + + assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1") + assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2") + + for i in range(1, len(src_cluster)): src_cluster[i].kill() + for i in range(1, len(dst_cluster)): dst_cluster[i].kill() + + + # Some utility code for transaction tests XA_RBROLLBACK = 1 XA_RBTIMEOUT = 2 diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py index 7d613b98ce..dcd074eda9 100755 --- a/cpp/src/tests/federation.py +++ b/cpp/src/tests/federation.py @@ -23,6 +23,7 @@ from qpid.testlib import TestBase010 from qpid.datatypes import Message from qpid.queue import Empty from qpid.util import URL +import qpid.messaging from time import sleep, time @@ -94,6 +95,11 @@ class FederationTests(TestBase010): break self._brokers.append(_b) + # add a new-style messaging connection to each broker + for _b in self._brokers: + _b.connection = qpid.messaging.Connection(_b.url) + _b.connection.open() + def _teardown_brokers(self): """ Un-does _setup_brokers() """ @@ -103,7 +109,7 @@ class FederationTests(TestBase010): if not _b.client_session.error(): _b.client_session.close(timeout=10) _b.client_conn.close(timeout=10) - + _b.connection.close() def test_bridge_create_and_close(self): self.startQmf(); @@ -127,18 +133,28 @@ class FederationTests(TestBase010): self.verify_cleanup() def test_pull_from_exchange(self): + """ This test uses an alternative method to manage links and bridges + via the broker object. + """ session = self.session - + self.startQmf() qmf = self.qmf broker = qmf.getObjects(_class="broker")[0] - result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp") - self.assertEqual(result.status, 0, result) - link = qmf.getObjects(_class="link")[0] - result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0) + # create link + link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False, + "authMechanism":"PLAIN", "username":"guest", "password":"guest", + "transport":"tcp"} + result = broker.create("link", "test-link-1", link_args, False) self.assertEqual(result.status, 0, result) + link = qmf.getObjects(_class="link")[0] + # create bridge + bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout", + "key":"my-key"} + result = broker.create("bridge", "test-bridge-1", bridge_args, False); + self.assertEqual(result.status, 0, result) bridge = qmf.getObjects(_class="bridge")[0] #setup queue to receive messages from local broker @@ -164,9 +180,11 @@ class FederationTests(TestBase010): self.fail("Got unexpected message in queue: " + extra.body) except Empty: None - result = bridge.close() + + result = broker.delete("bridge", "test-bridge-1", {}) self.assertEqual(result.status, 0, result) - result = link.close() + + result = broker.delete("link", "test-link-1", {}) self.assertEqual(result.status, 0, result) self.verify_cleanup() @@ -376,6 +394,9 @@ class FederationTests(TestBase010): for i in range(1, 11): try: msg = queue.get(timeout=5) + mp = msg.get("message_properties").application_headers + self.assertEqual(mp.__class__, dict) + self.assertEqual(mp['x-qpid.trace'], 'REMOTE') # check that the federation-tag override works self.assertEqual("Message %d" % i, msg.body) except Empty: self.fail("Failed to find expected message containing 'Message %d'" % i) @@ -2153,3 +2174,433 @@ class FederationTests(TestBase010): self.verify_cleanup() + def test_multilink_direct(self): + """ Verify that two distinct links can be created between federated + brokers. + """ + self.startQmf() + qmf = self.qmf + self._setup_brokers() + src_broker = self._brokers[0] + dst_broker = self._brokers[1] + + # create a direct exchange on each broker + for _b in [src_broker, dst_broker]: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + + # create destination queues + for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: + dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True) + dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) + + # create two connections, one for high priority traffic + for _q in ["HiPri", "Traffic"]: + result = dst_broker.qmf_object.create("link", _q, + {"host":src_broker.host, + "port":src_broker.port}, + False) + self.assertEqual(result.status, 0); + + links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link") + for _l in links: + if _l.name == "HiPri": + hi_link = _l + elif _l.name == "Traffic": + data_link = _l + else: + self.fail("Unexpected Link found: " + _l.name) + + # now create a route for messages sent with key "high" to use the + # hi_link + result = dst_broker.qmf_object.create("bridge", "HiPriBridge", + {"link":hi_link.name, + "src":"fedX.direct", + "dest":"fedX.direct", + "key":"high"}, False) + self.assertEqual(result.status, 0); + + + # create routes for the "medium" and "low" links to use the normal + # data_link + for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]: + result = dst_broker.qmf_object.create("bridge", _b[0], + {"link":data_link.name, + "src":"fedX.direct", + "dest":"fedX.direct", + "key":_b[1]}, False) + self.assertEqual(result.status, 0); + + # now wait for the links to become operational + for _l in [hi_link, data_link]: + expire_time = time() + 30 + while _l.state != "Operational" and time() < expire_time: + _l.update() + self.assertEqual(_l.state, "Operational", "Link failed to become operational") + + # verify each link uses a different connection + self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef, + "Different links using the same connection") + + hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=hi_link.connectionRef)[0] + data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=data_link.connectionRef)[0] + + + # send hi data, verify only goes over hi link + + r_ssn = dst_broker.connection.session() + hi_receiver = r_ssn.receiver("HiQ"); + med_receiver = r_ssn.receiver("MedQ"); + low_receiver = r_ssn.receiver("LoQ"); + + for _c in [hi_conn, data_conn]: + _c.update() + self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received") + + s_ssn = src_broker.connection.session() + hi_sender = s_ssn.sender("fedX.direct/high") + med_sender = s_ssn.sender("fedX.direct/medium") + low_sender = s_ssn.sender("fedX.direct/low") + + try: + hi_sender.send(qpid.messaging.Message(content="hi priority")) + msg = hi_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "hi priority"); + except: + self.fail("Hi Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages") + + # send low and medium, verify it does not go over hi link + + try: + med_sender.send(qpid.messaging.Message(content="medium priority")) + msg = med_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "medium priority"); + except: + self.fail("Medium Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message") + + try: + low_sender.send(qpid.messaging.Message(content="low priority")) + msg = low_receiver.fetch(timeout=10) + r_ssn.acknowledge() + self.assertEqual(msg.content, "low priority"); + except: + self.fail("Low Pri message failure") + + hi_conn.update() + data_conn.update() + self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message") + self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message") + + # cleanup + + for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]: + dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1]) + dst_broker.client_session.queue_delete(queue=_q[0]) + + for _b in [src_broker, dst_broker]: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + + + def test_multilink_shared_queue(self): + """ Verify that two distinct links can be created between federated + brokers. + """ + self.startQmf() + qmf = self.qmf + self._setup_brokers() + src_broker = self._brokers[0] + dst_broker = self._brokers[1] + + # create a topic exchange on the destination broker + dst_broker.client_session.exchange_declare(exchange="fedX.topic", type="topic") + self.assertEqual(dst_broker.client_session.exchange_query(name="fedX.topic").type, + "topic", "exchange_declare failed!") + + # create a destination queue + dst_broker.client_session.queue_declare(queue="destQ", auto_delete=True) + dst_broker.client_session.exchange_bind(queue="destQ", exchange="fedX.topic", binding_key="srcQ") + + # create a single source queue + src_broker.client_session.queue_declare(queue="srcQ", auto_delete=True) + + # create two connections + for _q in ["Link1", "Link2"]: + result = dst_broker.qmf_object.create("link", _q, + {"host":src_broker.host, + "port":src_broker.port}, + False) + self.assertEqual(result.status, 0); + + links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link") + self.assertEqual(len(links), 2) + + # now create two "parallel" queue routes from the source queue to the + # destination exchange. + result = dst_broker.qmf_object.create("bridge", "Bridge1", + {"link":"Link1", + "src":"srcQ", + "dest":"fedX.topic", + "srcIsQueue": True}, + False) + self.assertEqual(result.status, 0); + result = dst_broker.qmf_object.create("bridge", "Bridge2", + {"link":"Link2", + "src":"srcQ", + "dest":"fedX.topic", + "srcIsQueue": True}, + False) + self.assertEqual(result.status, 0); + + + # now wait for the links to become operational + for _l in links: + expire_time = time() + 30 + while _l.state != "Operational" and time() < expire_time: + _l.update() + self.assertEqual(_l.state, "Operational", "Link failed to become operational") + + # verify each link uses a different connection + self.assertNotEqual(links[0].connectionRef, links[1].connectionRef, + "Different links using the same connection") + + conn1 = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=links[0].connectionRef)[0] + conn2 = qmf.getObjects(_broker=dst_broker.qmf_broker, + _objectId=links[1].connectionRef)[0] + + # verify messages sent to the queue are pulled by each connection + + r_ssn = dst_broker.connection.session() + receiver = r_ssn.receiver("destQ"); + + for _c in [conn1, conn2]: + _c.update() + self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received") + + s_ssn = src_broker.connection.session() + sender = s_ssn.sender("srcQ") + + try: + for x in range(5): + sender.send(qpid.messaging.Message(content="hello")) + for x in range(5): + msg = receiver.fetch(timeout=10) + self.assertEqual(msg.content, "hello"); + r_ssn.acknowledge() + except: + self.fail("Message failure") + + # expect messages to be split over each connection. + conn1.update() + conn2.update() + self.assertNotEqual(conn1.msgsToClient, 0, "No messages sent") + self.assertNotEqual(conn2.msgsToClient, 0, "No messages sent") + self.assertEqual(conn2.msgsToClient + conn1.msgsToClient, 5, + "Expected 5 messages total") + + for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + dst_broker.client_session.exchange_unbind(queue="destQ", exchange="fedX.topic", binding_key="srcQ") + dst_broker.client_session.exchange_delete(exchange="fedX.topic") + + self._teardown_brokers() + + self.verify_cleanup() + + + def test_dynamic_direct_shared_queue(self): + """ + Route Topology: + + +<--- B1 + B0 <---+<--- B2 + +<--- B3 + """ + session = self.session + + # create the federation + + self.startQmf() + qmf = self.qmf + + self._setup_brokers() + + # create direct exchange on each broker, and retrieve the corresponding + # management object for that exchange + + exchanges=[] + for _b in self._brokers: + _b.client_session.exchange_declare(exchange="fedX.direct", type="direct") + self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type, + "direct", "exchange_declare failed!") + # pull the exchange out of qmf... + retries = 0 + my_exchange = None + while my_exchange is None: + objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange") + for ooo in objs: + if ooo.name == "fedX.direct": + my_exchange = ooo + break + if my_exchange is None: + retries += 1 + self.failIfEqual(retries, 10, + "QMF failed to find new exchange!") + sleep(1) + exchanges.append(my_exchange) + + self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!") + + # Create 2 links per each source broker (1,2,3) to the downstream + # broker 0: + for _b in range(1,4): + for _l in ["dynamic", "queue"]: + result = self._brokers[0].qmf_object.create( "link", + "Link-%d-%s" % (_b, _l), + {"host":self._brokers[_b].host, + "port":self._brokers[_b].port}, False) + self.assertEqual(result.status, 0) + + # create queue on source brokers for use by the dynamic route + self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True) + + for _l in range(1,4): + # for each dynamic link, create a dynamic bridge for the "fedX.direct" + # exchanges, using the fedSrcQ on each upstream source broker + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-%d-dynamic" % _l, + {"link":"Link-%d-dynamic" % _l, + "src":"fedX.direct", + "dest":"fedX.direct", + "dynamic":True, + "queue":"fedSrcQ"}, False) + self.assertEqual(result.status, 0) + + # create a queue route that shares the queue used by the dynamic route + result = self._brokers[0].qmf_object.create("bridge", + "Bridge-%d-queue" % _l, + {"link":"Link-%d-queue" % _l, + "src":"fedSrcQ", + "dest":"fedX.direct", + "srcIsQueue":True}, False) + self.assertEqual(result.status, 0) + + + # wait for the inter-broker links to become operational + retries = 0 + operational = False + while not operational: + operational = True + for _l in qmf.getObjects(_class="link"): + #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state))) + if _l.state != "Operational": + operational = False + if not operational: + retries += 1 + self.failIfEqual(retries, 10, + "inter-broker links failed to become operational.") + sleep(1) + + # @todo - There is no way to determine when the bridge objects become + # active. Hopefully, this is long enough! + sleep(6) + + # create a queue on B0, bound to "spudboy" + self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True) + self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy") + + # subscribe to messages arriving on B2's queue + self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1") + queue = self._brokers[0].client_session.incoming("f1") + + # wait until the binding key has propagated to each broker + + binding_counts = [1, 1, 1, 1] + self.assertEqual(len(binding_counts), len(exchanges), "Update Test!") + for i in range(3,-1,-1): + retries = 0 + exchanges[i].update() + while exchanges[i].bindingCount < binding_counts[i]: + retries += 1 + self.failIfEqual(retries, 10, + "binding failed to propagate to broker %d" + % i) + sleep(3) + exchanges[i].update() + + for _b in range(1,4): + # send 3 msgs from each source broker + for i in range(3): + dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy") + self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i)) + + # get exactly 9 (3 per broker) on B0 + for i in range(9): + msg = queue.get(timeout=5) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in queue: " + extra.body) + except Empty: None + + # verify that messages went across every link + for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker, + _class="link"): + for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker, + _objectId=_l.connectionRef): + self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.") + + # cleanup + + self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy") + self._brokers[0].client_session.message_cancel(destination="f1") + self._brokers[0].client_session.queue_delete(queue="DestQ") + + for _b in qmf.getObjects(_class="bridge"): + result = _b.close() + self.assertEqual(result.status, 0) + + for _l in qmf.getObjects(_class="link"): + result = _l.close() + self.assertEqual(result.status, 0) + + for _b in self._brokers: + _b.client_session.exchange_delete(exchange="fedX.direct") + + self._teardown_brokers() + + self.verify_cleanup() + diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py index 827cb7dca9..d25281eed5 100755 --- a/cpp/src/tests/ha_tests.py +++ b/cpp/src/tests/ha_tests.py @@ -18,59 +18,123 @@ # under the License. # -import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math -from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection +import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest +import traceback +from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED from qpid.datatypes import uuid4 from brokertest import * from threading import Thread, Lock, Condition -from logging import getLogger, WARN, ERROR, DEBUG +from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent +from uuid import UUID log = getLogger(__name__) +class QmfAgent(object): + """Access to a QMF broker agent.""" + def __init__(self, address, **kwargs): + self._connection = Connection.establish( + address, client_properties={"qpid.ha-admin":1}, **kwargs) + self._agent = BrokerAgent(self._connection) + assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address) + + def __getattr__(self, name): + a = getattr(self._agent, name) + return a + +class Credentials(object): + """SASL credentials: username, password, and mechanism""" + def __init__(self, username, password, mechanism): + (self.username, self.password, self.mechanism) = (username, password, mechanism) + + def __str__(self): return "Credentials%s"%(self.tuple(),) + + def tuple(self): return (self.username, self.password, self.mechanism) + + def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url) + class HaBroker(Broker): - def __init__(self, test, args=[], broker_url=None, ha_cluster=True, - ha_replicate="all", **kwargs): + """Start a broker with HA enabled + @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker. + """ + def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all", + client_credentials=None, **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=debug+:ha::", + "--log-enable=debug+:ha::", # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] if ha_replicate is not None: args += [ "--ha-replicate=%s"%ha_replicate ] - if broker_url: args.extend([ "--ha-brokers", broker_url ]) + if brokers_url: args += [ "--ha-brokers-url", brokers_url ] Broker.__init__(self, test, args, **kwargs) - self.commands=os.getenv("PYTHON_COMMANDS") - assert os.path.isdir(self.commands) + self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") + assert os.path.exists(self.qpid_ha_path) + self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") + assert os.path.exists(self.qpid_config_path) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. - - def promote(self): - assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 - - def set_client_url(self, url): - assert os.system( - "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 - - def set_broker_url(self, url): - assert os.system( - "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 - - def replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + self.qpid_ha_script=import_script(self.qpid_ha_path) + self._agent = None + self.client_credentials = client_credentials + + def __str__(self): return Broker.__str__(self) + + def qpid_ha(self, args): + cred = self.client_credentials + url = self.host_port() + if cred: + url =cred.add_user(url) + args = args + ["--sasl-mechanism", cred.mechanism] + self.qpid_ha_script.main_except(["", "-b", url]+args) + + def promote(self): self.qpid_ha(["promote"]) + def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url]) + def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url]) + def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + + def agent(self): + if not self._agent: + cred = self.client_credentials + if cred: + self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism) + else: + self._agent = QmfAgent(self.host_port()) + return self._agent + + def ha_status(self): + hb = self.agent().getHaBroker() + hb.update() + return hb.status + + def wait_status(self, status): + def try_get_status(): + # Ignore ConnectionError, the broker may not be up yet. + try: return self.ha_status() == status; + except ConnectionError: return False + assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status) + + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + def qpid_config(self, args): + assert subprocess.call( + [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 def config_replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) def config_declare(self, queue, replication): - assert os.system( - "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + self.qpid_config(["add", "queue", queue, "--replicate", replication]) def connect_admin(self, **kwargs): - return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) + cred = self.client_credentials + if cred: + return Broker.connect( + self, client_properties={"qpid.ha-admin":1}, + username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism, + **kwargs) + else: + return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) def wait_backup(self, address): """Wait for address to become valid on a backup broker.""" @@ -78,6 +142,14 @@ class HaBroker(Broker): try: wait_address(bs, address) finally: bs.connection.close() + def assert_browse(self, queue, expected, **kwargs): + """Verify queue contents by browsing.""" + bs = self.connect().session() + try: + wait_address(bs, queue) + assert_browse_retry(bs, queue, expected, **kwargs) + finally: bs.connection.close() + def assert_browse_backup(self, queue, expected, **kwargs): """Combines wait_backup and assert_browse_retry.""" bs = self.connect_admin().session() @@ -86,33 +158,70 @@ class HaBroker(Broker): assert_browse_retry(bs, queue, expected, **kwargs) finally: bs.connection.close() + def assert_connect_fail(self): + try: + self.connect() + self.test.fail("Expected ConnectionError") + except ConnectionError: pass + + def try_connect(self): + try: return self.connect() + except ConnectionError: return None + class HaCluster(object): _cluster_count = 0 - def __init__(self, test, n, **kwargs): + def __init__(self, test, n, promote=True, **kwargs): """Start a cluster of n brokers""" self.test = test - self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + self.kwargs = kwargs + self._brokers = [] + self.id = HaCluster._cluster_count + self.broker_id = 0 HaCluster._cluster_count += 1 - self.url = ",".join([b.host_port() for b in self]) - for b in self: b.set_broker_url(self.url) + for i in xrange(n): self.start(False) + self.update_urls() self[0].promote() + def next_name(self): + name="cluster%s-%s"%(self.id, self.broker_id) + self.broker_id += 1 + return name + + def start(self, update_urls=True, args=[]): + """Start a new broker in the cluster""" + b = HaBroker(self.test, name=self.next_name(), **self.kwargs) + self._brokers.append(b) + if update_urls: self.update_urls() + return b + + def update_urls(self): + self.url = ",".join([b.host_port() for b in self]) + if len(self) > 1: # No failover addresses on a 1 cluster. + for b in self: b.set_brokers_url(self.url) + def connect(self, i): """Connect with reconnect_urls""" return self[i].connect(reconnect=True, reconnect_urls=self.url.split(",")) - def kill(self, i): + def kill(self, i, promote_next=True): """Kill broker i, promote broker i+1""" - self[i].kill() self[i].expect = EXPECT_EXIT_FAIL - self[(i+1) % len(self)].promote() + self[i].kill() + if promote_next: self[(i+1) % len(self)].promote() + + def restart(self, i): + """Start a broker with the same port, name and data directory. It will get + a separate log file: foo.n.log""" + b = self._brokers[i] + self._brokers[i] = HaBroker( + self.test, name=b.name, port=b.port(), brokers_url=self.url, + **self.kwargs) - def bounce(self, i): + def bounce(self, i, promote_next=True): """Stop and restart a broker in a cluster.""" - self.kill(i) - b = self[i] - self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + self.kill(i, promote_next) + self.restart(i) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -128,12 +237,12 @@ def wait_address(session, address): except NotFound: return False assert retry(check), "Timed out waiting for address %s"%(address) -def assert_missing(session, address): - """Assert that the address is _not_ valid""" +def valid_address(session, address): + """Test if an address is valid""" try: session.receiver(address) - self.fail("Expected NotFound: %s"%(address)) - except NotFound: pass + return True + except NotFound: return False class ReplicationTests(BrokerTest): """Correctness tests for HA replication.""" @@ -180,7 +289,7 @@ class ReplicationTests(BrokerTest): self.assert_browse_retry(b, prefix+"q1", ["1", "4"]) self.assert_browse_retry(b, prefix+"q2", []) # configuration only - assert_missing(b, prefix+"q3") + assert not valid_address(b, prefix+"q3") b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"]) b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration @@ -195,7 +304,7 @@ class ReplicationTests(BrokerTest): # Create config, send messages before starting the backup, to test catch-up replication. setup(p, "1", primary) - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) # Create config, send messages after starting the backup, to test steady-state replication. setup(p, "2", primary) @@ -233,10 +342,10 @@ class ReplicationTests(BrokerTest): s = p.sender("q;{create:always}") for m in [str(i) for i in range(0,10)]: s.send(m) s.sync() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port()) for m in [str(i) for i in range(10,20)]: s.send(m) s.sync() - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port()) for m in [str(i) for i in range(20,30)]: s.send(m) s.sync() @@ -276,7 +385,7 @@ class ReplicationTests(BrokerTest): """Verify that backups rejects connections and that fail-over works in python client""" primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) # Check that backup rejects normal connections try: backup.connect().session() @@ -294,14 +403,15 @@ class ReplicationTests(BrokerTest): primary.kill() assert retry(lambda: not is_running(primary.pid)) backup.promote() - self.assert_browse_retry(s, "q", ["foo"]) + sender.send("bar") + self.assert_browse_retry(s, "q", ["foo", "bar"]) c.close() def test_failover_cpp(self): """Verify that failover works in the C++ client.""" primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL) primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) url="%s,%s"%(primary.host_port(), backup.host_port()) primary.connect().session().sender("q;{create:always}") backup.wait_backup("q") @@ -344,6 +454,7 @@ class ReplicationTests(BrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary", ha_cluster=False) pc = primary.connect() ps = pc.session().sender("q;{create:always}") @@ -393,7 +504,7 @@ class ReplicationTests(BrokerTest): """Verify that we replicate to an LVQ correctly""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}") def send(key,value): s.send(Message(content=value,properties={"lvq-key":key})) for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]: @@ -410,7 +521,7 @@ class ReplicationTests(BrokerTest): """Test replication with the ring queue policy""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}") for i in range(10): s.send(Message(str(i))) backup.assert_browse_backup("q", [str(i) for i in range(5,10)]) @@ -419,18 +530,20 @@ class ReplicationTests(BrokerTest): """Test replication with the reject queue policy""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}") try: for i in range(10): s.send(Message(str(i)), sync=False) except qpid.messaging.exceptions.TargetCapacityExceeded: pass backup.assert_browse_backup("q", [str(i) for i in range(0,5)]) + # Detach, don't close as there is a broken session + s.session.connection.detach() def test_priority(self): """Verify priority queues replicate correctly""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) session = primary.connect().session() s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] @@ -445,7 +558,7 @@ class ReplicationTests(BrokerTest): """Verify priority queues replicate correctly""" primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) session = primary.connect().session() levels = 8 priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3] @@ -464,7 +577,7 @@ class ReplicationTests(BrokerTest): def test_priority_ring(self): primary = HaBroker(self, name="primary") primary.promote() - backup = HaBroker(self, name="backup", broker_url=primary.host_port()) + backup = HaBroker(self, name="backup", brokers_url=primary.host_port()) s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}") priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2] for p in priorities: s.send(Message(priority=p)) @@ -475,8 +588,10 @@ class ReplicationTests(BrokerTest): # correct result, the uncommented one is for the actualy buggy # result. See https://issues.apache.org/jira/browse/QPID-3866 # - # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority) - backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority) + # expect = sorted(priorities,reverse=True)[0:5] + expect = [9,9,9,9,2] + primary.assert_browse("q", expect, transform=lambda m: m.priority) + backup.assert_browse_backup("q", expect, transform=lambda m: m.priority) def test_backup_acquired(self): """Verify that acquired messages are backed up, for all queue types.""" @@ -509,11 +624,11 @@ class ReplicationTests(BrokerTest): primary = HaBroker(self, name="primary") primary.promote() - backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port()) + backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port()) c = primary.connect() for t in tests: t.send(c) # Send messages, leave one unacknowledged. - backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port()) + backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port()) # Wait for backups to catch up. for t in tests: t.wait(self, backup1) @@ -538,11 +653,13 @@ class ReplicationTests(BrokerTest): self.fail("Excpected no-such-queue exception") except NotFound: pass - def test_invalid_default(self): - """Verify that a queue with an invalid qpid.replicate gets default treatment""" - cluster = HaCluster(self, 2, ha_replicate="all") - c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") - cluster[1].wait_backup("q") + def test_invalid_replication(self): + """Verify that we reject an attempt to declare a queue with invalid replication value.""" + cluster = HaCluster(self, 1, ha_replicate="all") + try: + c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}") + self.fail("Expected ConnectionError") + except ConnectionError: pass def test_exclusive_queue(self): """Ensure that we can back-up exclusive queues, i.e. the replicating @@ -559,6 +676,136 @@ class ReplicationTests(BrokerTest): test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + def test_auto_delete_exclusive(self): + """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues""" + cluster = HaCluster(self,2) + s = cluster[0].connect().session() + s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}") + s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}") + s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}") + s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}") + s.receiver("q;{create:always}") + + s = cluster[1].connect_admin().session() + cluster[1].wait_backup("q") + assert not valid_address(s, "exad") + assert valid_address(s, "ex") + assert valid_address(s, "ad") + assert valid_address(s, "time") + + def test_broker_info(self): + """Check that broker information is correctly published via management""" + cluster = HaCluster(self, 3) + + for broker in cluster: # Make sure HA system-id matches broker's + qmf = broker.agent().getHaBroker() + self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef)) + + cluster_ports = map(lambda b: b.port(), cluster) + cluster_ports.sort() + def ports(qmf): + qmf.update() + return sorted(map(lambda b: b["port"], qmf.members)) + # Check that all brokers have the same membership as the cluster + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker) + # Add a new broker, check it is updated everywhere + b = cluster.start() + cluster_ports.append(b.port()) + cluster_ports.sort() + for broker in cluster: + qmf = broker.agent().getHaBroker() + assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf)) + + def test_auth(self): + """Verify that authentication does not interfere with replication.""" + # FIXME aconway 2012-07-09: generate test sasl config portably for cmake + sasl_config=os.path.join(self.rootdir, "sasl_config") + if not os.path.exists(sasl_config): + print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config + return + acl=os.path.join(os.getcwd(), "policy.acl") + aclf=file(acl,"w") + # Verify that replication works with auth=yes and HA user has at least the following + # privileges: + aclf.write(""" +acl allow zag@QPID access queue +acl allow zag@QPID create queue +acl allow zag@QPID consume queue +acl allow zag@QPID delete queue +acl allow zag@QPID access exchange +acl allow zag@QPID create exchange +acl allow zag@QPID bind exchange +acl allow zag@QPID publish exchange +acl allow zag@QPID delete exchange +acl allow zag@QPID access method +acl allow zag@QPID create link +acl deny all all + """) + aclf.close() + cluster = HaCluster( + self, 2, + args=["--auth", "yes", "--sasl-config", sasl_config, + "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"), + "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN" + ], + client_credentials=Credentials("zag", "zag", "PLAIN")) + s0 = cluster[0].connect(username="zag", password="zag").session(); + s0.receiver("q;{create:always}") + s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}") + cluster[1].wait_backup("q") + cluster[1].wait_backup("ex") + s1 = cluster[1].connect_admin().session(); # Uses Credentials above. + s1.sender("ex").send("foo"); + self.assertEqual(s1.receiver("q").fetch().content, "foo") + + def test_alternate_exchange(self): + """Verify that alternate-exchange on exchanges and queues is propagated + to new members of a cluster. """ + cluster = HaCluster(self, 2) + s = cluster[0].connect().session() + # altex exchange: acts as alternate exchange + s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}") + # altq queue bound to altex, collect re-routed messages. + s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}") + # 0ex exchange with alternate-exchange altex and no queues bound + s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}") + # create queue q with alternate-exchange altex + s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}") + # create a bunch of exchanges to ensure we don't clean up prematurely if the + # response comes in multiple fragments. + for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i) + + def verify(broker): + s = broker.connect().session() + # Verify unmatched message goes to ex's alternate. + s.sender("0ex").send("foo") + altq = s.receiver("altq") + self.assertEqual("foo", altq.fetch(timeout=0).content) + s.acknowledge() + # Verify rejected message goes to q's alternate. + s.sender("q").send("bar") + msg = s.receiver("q").fetch(timeout=0) + self.assertEqual("bar", msg.content) + s.acknowledge(msg, Disposition(REJECTED)) # Reject the message + self.assertEqual("bar", altq.fetch(timeout=0).content) + s.acknowledge() + + # Sanity check: alternate exchanges on original broker + verify(cluster[0]) + # Check backup that was connected during setup. + cluster[1].wait_backup("0ex") + cluster[1].wait_backup("q") + cluster.bounce(0) + verify(cluster[1]) + # Check a newly started backup. + cluster.start() + cluster[2].wait_backup("0ex") + cluster[2].wait_backup("q") + cluster.bounce(1) + verify(cluster[2]) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -601,49 +848,135 @@ class LongTests(BrokerTest): if d: return float(d)*60 else: return 3 # Default is to be quick - - def disable_test_failover(self): + def test_failover_send_receive(self): """Test failover with continuous send-receive""" - # FIXME aconway 2012-02-03: fails due to dropped messages, - # known issue: sending messages to new primary before - # backups are ready. Enable when fixed. - - # Start a cluster, all members will be killed during the test. - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) - for name in ["ha0","ha1","ha2"] ] - url = ",".join([b.host_port() for b in brokers]) - for b in brokers: b.set_broker_url(url) - brokers[0].promote() + brokers = HaCluster(self, 3) # Start sender and receiver threads - sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False) - receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) - receiver.start() - sender.start() - # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) + n = 10 + senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False, + queue="test%s"%(i)) for i in xrange(n)] + receivers = [NumberedReceiver(brokers[0], sender=senders[i], + failover_updates=False, + queue="test%s"%(i)) for i in xrange(n)] + for r in receivers: r.start() + for s in senders: s.start() + + def wait_passed(r, n): + """Wait for receiver r to pass n""" + def check(): + r.check() # Verify no exceptions + return r.received > n + assert retry(check), "Stalled %s at %s"%(r.queue, n) + + for r in receivers: wait_passed(r, 0) + # Kill and restart brokers in a cycle: endtime = time.time() + self.duration() i = 0 - while time.time() < endtime or i < 3: # At least 3 iterations - sender.sender.assert_running() - receiver.receiver.assert_running() - port = brokers[i].port() - brokers[i].kill() - brokers.append( - HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, - expect=EXPECT_EXIT_FAIL)) - i += 1 - brokers[i].promote() - n = receiver.received # Verify we're still running - def enough(): - receiver.check() # Verify no exceptions - return receiver.received > n + 100 - assert retry(enough, timeout=5) - - sender.stop() - receiver.stop() - for b in brokers[i:]: b.kill() + try: + while time.time() < endtime or i < 3: # At least 3 iterations + for s in senders: s.sender.assert_running() + for r in receivers: r.receiver.assert_running() + checkpoint = [ r.received for r in receivers ] + # Don't kill primary till it is active and the next + # backup is ready, otherwise we can lose messages. + brokers[i%3].wait_status("active") + brokers[(i+1)%3].wait_status("ready") + brokers.bounce(i%3) + i += 1 + map(wait_passed, receivers, checkpoint) # Wait for all receivers + except: + traceback.print_exc() + raise + finally: + for s in senders: s.stop() + for r in receivers: r.stop() + dead = [] + for i in xrange(3): + if not brokers[i].is_running(): dead.append(i) + brokers.kill(i, False) + if dead: raise Exception("Brokers not running: %s"%dead) + +class RecoveryTests(BrokerTest): + """Tests for recovery after a failure.""" + + def test_queue_hold(self): + """Verify that the broker holds queues without sufficient backup, + i.e. does not complete messages sent to those queues.""" + + # We don't want backups to time out for this test, set long timeout. + cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]); + # Wait for the primary to be ready + cluster[0].wait_status("active") + # Create a queue before the failure. + s1 = cluster.connect(0).session().sender("q1;{create:always}") + for b in cluster: b.wait_backup("q1") + for i in xrange(100): s1.send(str(i)) + # Kill primary and 2 backups + for i in [0,1,2]: cluster.kill(i, False) + cluster[3].promote() # New primary, backups will be 1 and 2 + cluster[3].wait_status("recovering") + + def assertSyncTimeout(s): + try: + s.sync(timeout=.01) + self.fail("Expected Timeout exception") + except Timeout: pass + + # Create a queue after the failure + s2 = cluster.connect(3).session().sender("q2;{create:always}") + + # Verify that messages sent are not completed + for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + + # Verify we can receive even if sending is on hold: + cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)]) + + # Restart backups, verify queues are released only when both backups are up + cluster.restart(1) + assertSyncTimeout(s1) + self.assertEqual(s1.unsettled(), 100) + assertSyncTimeout(s2) + self.assertEqual(s2.unsettled(), 100) + self.assertEqual(cluster[3].ha_status(), "recovering") + cluster.restart(2) + + # Verify everything is up to date and active + def settled(sender): sender.sync(); return sender.unsettled() == 0; + assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled()) + assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled()) + cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)]) + cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)]) + cluster[3].wait_status("active"), + s1.session.connection.close() + s2.session.connection.close() + + def test_expected_backup_timeout(self): + """Verify that we time-out expected backups and release held queues + after a configured interval. Verify backup is demoted to catch-up, + but can still rejoin. + """ + cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]); + cluster[0].wait_status("active") # Primary ready + for b in cluster[1:4]: b.wait_status("ready") # Backups ready + for i in [0,1]: cluster.kill(i, False) + cluster[2].promote() # New primary, backups will be 1 and 2 + cluster[2].wait_status("recovering") + # Should not go active till the expected backup connects or times out. + self.assertEqual(cluster[2].ha_status(), "recovering") + # Messages should be held expected backup times out + s = cluster[2].connect().session().sender("q;{create:always}") + for i in xrange(100): s.send(str(i), sync=False) + # Verify message held initially. + try: s.sync(timeout=.01); self.fail("Expected Timeout exception") + except Timeout: pass + s.sync(timeout=1) # And released after the timeout. + self.assertEqual(cluster[2].ha_status(), "active") if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) diff --git a/cpp/src/tests/ipv6_test b/cpp/src/tests/ipv6_test index 6becfd8c96..9d1cb2acdd 100755 --- a/cpp/src/tests/ipv6_test +++ b/cpp/src/tests/ipv6_test @@ -19,6 +19,19 @@ # under the License. # +# Check whether we have any globally configured IPv6 addresses +# - if not then we can't run the tests because ipv6 lookups won't +# work within the qpid code. This is a deliberate feature to avoid +# getting addresses that can't be routed by the machine. + +if ip -f inet6 -o addr | cut -f 9 -s -d' ' | grep global > /dev/null ; then + echo "IPv6 addresses configured continuing" +else + echo "No global IPv6 addresses configured - skipping test" + exit 0 +fi + + # Run a simple test over IPv6 source ./test_env.sh diff --git a/cpp/src/tests/logging.cpp b/cpp/src/tests/logging.cpp index 5d5bb1feef..a29714c002 100644 --- a/cpp/src/tests/logging.cpp +++ b/cpp/src/tests/logging.cpp @@ -258,7 +258,7 @@ QPID_AUTO_TEST_CASE(testOverhead) { Statement statement( Level level, const char* file="", int line=0, const char* fn=0) { - Statement s={0, file, line, fn, level}; + Statement s={0, file, line, fn, level, ::qpid::log::unspecified}; return s; } @@ -347,11 +347,11 @@ QPID_AUTO_TEST_CASE(testLoggerStateure) { }; opts.parse(ARGC(argv), const_cast<char**>(argv)); l.configure(opts); - QPID_LOG(critical, "foo"); int srcline=__LINE__; + QPID_LOG_CAT(critical, test, "foo"); int srcline=__LINE__; ifstream log("logging.tmp"); string line; getline(log, line); - string expect=(format("critical %s:%d: foo")%__FILE__%srcline).str(); + string expect=(format("[Test] critical %s:%d: foo")%__FILE__%srcline).str(); BOOST_CHECK_EQUAL(expect, line); log.close(); unlink("logging.tmp"); @@ -375,11 +375,11 @@ QPID_AUTO_TEST_CASE(testQuoteNonPrintable) { char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff"; string str(s, sizeof(s)); - QPID_LOG(critical, str); + QPID_LOG_CAT(critical, test, str); ifstream log("logging.tmp"); string line; getline(log, line, '\0'); - string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n"; + string expect="[Test] critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n"; BOOST_CHECK_EQUAL(expect, line); log.close(); unlink("logging.tmp"); diff --git a/cpp/src/tests/ping_broker b/cpp/src/tests/ping_broker new file mode 100755 index 0000000000..6c391027a3 --- /dev/null +++ b/cpp/src/tests/ping_broker @@ -0,0 +1,127 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os +from optparse import OptionParser, OptionGroup +import sys +import locale +import socket +import re +from qpid.messaging import Connection + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs import BrokerAgent +from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong + + +class Config: + def __init__(self): + self._host = "localhost" + self._connTimeout = 10 + +config = Config() +conn_options = {} + +def OptionsAndArguments(argv): + """ Set global variables for options, return arguments """ + + global config + global conn_options + + usage = "%prog [options]" + + parser = OptionParser(usage=usage) + + parser.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>", + help="URL of the broker to query") + parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", + help="Maximum time to wait for broker connection (in seconds)") + parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", + help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.") + parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)") + parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)") + parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.") + + opts, args = parser.parse_args(args=argv) + + config._host = opts.broker + config._connTimeout = opts.timeout + + if opts.sasl_mechanism: + conn_options['sasl_mechanisms'] = opts.sasl_mechanism + if opts.ssl_certificate: + conn_options['ssl_certfile'] = opts.ssl_certificate + if opts.ssl_key: + conn_options['ssl_key'] = opts.ssl_key + if opts.ha_admin: + conn_options['client_properties'] = {'qpid.ha-admin' : 1} + return args + +class BrokerManager: + def __init__(self): + self.brokerName = None + self.connection = None + self.broker = None + self.cluster = None + + def SetBroker(self, brokerUrl): + self.url = brokerUrl + self.connection = Connection.establish(self.url, **conn_options) + self.broker = BrokerAgent(self.connection) + + def Disconnect(self): + """ Release any allocated brokers. Ignore any failures as the tool is + shutting down. + """ + try: + connection.close() + except: + pass + + def Ping(self, args): + for sequence in range(10): + result = self.broker.echo(sequence, "ECHO BODY") + if result['sequence'] != sequence: + raise Exception("Invalid Sequence") + + +def main(argv=None): + + args = OptionsAndArguments(argv) + bm = BrokerManager() + + try: + bm.SetBroker(config._host) + bm.Ping(args) + bm.Disconnect() + return 0 + except KeyboardInterrupt: + print + except Exception,e: + print "Failed: %s - %s" % (e.__class__.__name__, e) + + bm.Disconnect() # try to deallocate brokers + return 1 + +if __name__ == "__main__": + sys.exit(main()) diff --git a/cpp/src/tests/qpid-latency-test.cpp b/cpp/src/tests/qpid-latency-test.cpp index 20eb4568f3..2343cb1d77 100644 --- a/cpp/src/tests/qpid-latency-test.cpp +++ b/cpp/src/tests/qpid-latency-test.cpp @@ -359,19 +359,29 @@ void Sender::sendByRate() } uint64_t interval = TIME_SEC/opts.rate; int64_t timeLimit = opts.timeLimit * TIME_SEC; - uint64_t sent = 0, missedRate = 0; + uint64_t sent = 0; AbsTime start = now(); + AbsTime last = start; while (true) { AbsTime sentAt=now(); msg.getDeliveryProperties().setTimestamp(Duration(EPOCH, sentAt)); async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); if (opts.sync) session.sync(); ++sent; + if (Duration(last, sentAt) > TIME_SEC*2) { + Duration t(start, now()); + //check rate actually achieved thus far + uint actualRate = sent / (t/TIME_SEC); + //report inability to stay within 1% of desired rate + if (actualRate < opts.rate && opts.rate - actualRate > opts.rate/100) { + std::cerr << "WARNING: Desired send rate: " << opts.rate << ", actual send rate: " << actualRate << std::endl; + } + last = sentAt; + } + AbsTime waitTill(start, sent*interval); Duration delay(sentAt, waitTill); - if (delay < 0) - ++missedRate; - else + if (delay > 0) sys::usleep(delay / TIME_USEC); if (timeLimit != 0 && Duration(start, now()) > timeLimit) { session.sync(); diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp index 6deeb566dc..7a02b871db 100644 --- a/cpp/src/tests/qpid-receive.cpp +++ b/cpp/src/tests/qpid-receive.cpp @@ -68,6 +68,7 @@ struct Options : public qpid::Options bool reportHeader; string readyAddress; uint receiveRate; + std::string replyto; Options(const std::string& argv0=std::string()) : qpid::Options("Options"), @@ -114,6 +115,7 @@ struct Options : public qpid::Options ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.") ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive") ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.") + ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages") ("help", qpid::optValue(help), "print this usage statement"); add(log); } @@ -246,6 +248,9 @@ int main(int argc, char ** argv) s = session.createSender(msg.getReplyTo()); s.setCapacity(opts.capacity); } + if (!opts.replyto.empty()) { + msg.setReplyTo(Address(opts.replyto)); + } s.send(msg); } if (opts.receiveRate) { diff --git a/cpp/src/tests/run_acl_tests b/cpp/src/tests/run_acl_tests index 3a8c03eda6..25241ad75e 100755 --- a/cpp/src/tests/run_acl_tests +++ b/cpp/src/tests/run_acl_tests @@ -30,9 +30,9 @@ trap stop_brokers INT TERM QUIT start_brokers() { ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port LOCAL_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-ip 2 --log-to-file locali.log > qpiddi.port + ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port LOCAL_PORTI=`cat qpiddi.port` - ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-user 2 --log-to-file localu.log > qpiddu.port + ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port LOCAL_PORTU=`cat qpiddu.port` } diff --git a/cpp/src/tests/run_federation_tests b/cpp/src/tests/run_federation_tests index 7735b559cf..c2ee550226 100755 --- a/cpp/src/tests/run_federation_tests +++ b/cpp/src/tests/run_federation_tests @@ -36,10 +36,10 @@ fi QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no --log-enable=info+ --log-enable=debug+:Bridge --log-to-file" start_brokers() { rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log - LOCAL_PORT=$($QPIDD_CMD fed_local.log) - REMOTE_PORT=$($QPIDD_CMD fed_remote.log) - REMOTE_B1=$($QPIDD_CMD fed_b1.log) - REMOTE_B2=$($QPIDD_CMD fed_b2.log) + LOCAL_PORT=$($QPIDD_CMD fed_local.log --federation-tag LOCAL) + REMOTE_PORT=$($QPIDD_CMD fed_remote.log --federation-tag REMOTE) + REMOTE_B1=$($QPIDD_CMD fed_b1.log --federation-tag B1) + REMOTE_B2=$($QPIDD_CMD fed_b2.log --federation-tag B2) } stop_brokers() { diff --git a/cpp/src/tests/run_ha_tests b/cpp/src/tests/run_ha_tests new file mode 100755 index 0000000000..1a469646c9 --- /dev/null +++ b/cpp/src/tests/run_ha_tests @@ -0,0 +1,29 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# Make sure the python tools are available. They will be if we are building in +# a checkoug, they may not be in a distribution. +test -d $PYTHON_COMMANDS -a -x $PYTHON_COMMANDS/qpid-ha -a -x $PYTHON_COMMANDS/qpid-config || { echo "Skipping HA tests, qpid-ha or qpid-config not available."; exit 0; } + +srcdir=`dirname $0` +$srcdir/ha_tests.py + diff --git a/cpp/src/tests/sasl_test_setup.sh b/cpp/src/tests/sasl_test_setup.sh index 3e69c0f02b..3947986517 100755 --- a/cpp/src/tests/sasl_test_setup.sh +++ b/cpp/src/tests/sasl_test_setup.sh @@ -30,7 +30,7 @@ pwcheck_method: auxprop auxprop_plugin: sasldb sasldb_path: $PWD/sasl_config/qpidd.sasldb sql_select: dummy select -mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL +mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL CRAM-MD5 EOF # Populate temporary sasl db. diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test index 91ff8eec1e..19a316a483 100755 --- a/cpp/src/tests/ssl_test +++ b/cpp/src/tests/ssl_test @@ -148,6 +148,11 @@ URL=$TEST_HOSTNAME:$PORT MSG=`./qpid-receive -b $URL --connection-options '{transport:ssl,heartbeat:2}' -a "foo;{create:always}" --messages 1` test "$MSG" = "hello again" || { echo "receive failed '$MSG' != 'hello again'"; exit 1; } +## Test using the Python client +echo "Testing Non-Authenticating with Python Client..." +URL=amqps://$TEST_HOSTNAME:$PORT +if `$top_srcdir/src/tests/ping_broker -b $URL`; then echo " Passed"; else { echo " Failed"; exit 1; }; fi + #### Client Authentication tests start_authenticating_broker diff --git a/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh b/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh index 592e1e60a0..9e8880e128 100755 --- a/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh +++ b/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh @@ -9,11 +9,16 @@ run_test() { fi } -NUM_MSGS=1000 +NUM_MSGS=10000 TEST_PROG="./asyncStorePerf" +# Default (no args) run_test "${TEST_PROG}" + +# Help run_test "${TEST_PROG} --help" + +# Limited combinations of major params for q in 1 2; do for p in 1 2; do for c in 1 2; do @@ -27,25 +32,3 @@ for q in 1 2; do done done done - - -NUM_MSGS=1000 -TEST_PROG="./jrnl2Perf" - - -run_test "${TEST_PROG}" - -# This test returns 1, don't use run_test until this is fixed. -cmd="${TEST_PROG} --help" -echo $cmd -$cmd - -for q in 1 2; do - for p in 1 2; do - for c in 1; do # BUG - this will fail for c > 1 - run_test "./jrnl2Perf --num_queues $q --num_msgs ${NUM_MSGS} --num_enq_threads_per_queue $p --num_deq_threads_per_queue $c" - done - done -done - -#exit 0 diff --git a/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh b/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh new file mode 100755 index 0000000000..23fe0fea9b --- /dev/null +++ b/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +run_test() { + local cmd=$1 + echo $cmd + $cmd + if (( $? != 0 )); then + exit 1 + fi +} + +NUM_MSGS=10000 +TEST_PROG="./jrnl2Perf" + +# Default (no args) +run_test "${TEST_PROG}" + +# Help +# This test returns 1, don't use run_test until this is fixed. +cmd="${TEST_PROG} --help" +echo $cmd +$cmd + +# Limited combinations of major params +for q in 1 2; do + for p in 1 2; do + for c in 1; do # BUG - this will fail for c > 1 + run_test "./jrnl2Perf --num_queues $q --num_msgs ${NUM_MSGS} --num_enq_threads_per_queue $p --num_deq_threads_per_queue $c" + done + done +done + +#exit 0 diff --git a/cpp/src/tests/txjob.cpp b/cpp/src/tests/txjob.cpp index a7a905c1b7..29394c3415 100644 --- a/cpp/src/tests/txjob.cpp +++ b/cpp/src/tests/txjob.cpp @@ -38,9 +38,9 @@ namespace tests { struct Args : public qpid::TestOptions { - string workQueue; - string source; - string dest; + std::string workQueue; + std::string source; + std::string dest; uint messages; uint jobs; bool quit; diff --git a/cpp/src/tests/txshift.cpp b/cpp/src/tests/txshift.cpp index 882d3716d8..bf85bee986 100644 --- a/cpp/src/tests/txshift.cpp +++ b/cpp/src/tests/txshift.cpp @@ -39,7 +39,7 @@ namespace tests { struct Args : public qpid::TestOptions { - string workQueue; + std::string workQueue; size_t workers; Args() : workQueue("txshift-control"), workers(1) |
