summaryrefslogtreecommitdiff
path: root/cpp/src/tests
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
committerKim van der Riet <kpvdr@apache.org>2012-08-03 12:13:32 +0000
commitd43d1912b376322e27fdcda551a73f9ff5487972 (patch)
treece493e10baa95f44be8beb5778ce51783463196d /cpp/src/tests
parent04877fec0c6346edec67072d7f2d247740cf2af5 (diff)
downloadqpid-python-d43d1912b376322e27fdcda551a73f9ff5487972.tar.gz
QPID-3858: Updated branch - merged from trunk r.1368650
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1368910 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests')
-rw-r--r--cpp/src/tests/CMakeLists.txt1
-rw-r--r--cpp/src/tests/ExchangeTest.cpp2
-rw-r--r--cpp/src/tests/FieldTable.cpp9
-rw-r--r--cpp/src/tests/Makefile.am12
-rw-r--r--cpp/src/tests/MessageBuilderTest.cpp6
-rw-r--r--cpp/src/tests/MessageTest.cpp9
-rw-r--r--cpp/src/tests/MessageUtils.h4
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp18
-rw-r--r--cpp/src/tests/QueueTest.cpp244
-rw-r--r--cpp/src/tests/RangeSet.cpp134
-rw-r--r--cpp/src/tests/ReplicationTest.cpp4
-rw-r--r--cpp/src/tests/SystemInfo.cpp52
-rw-r--r--cpp/src/tests/TestMessageStore.h2
-rw-r--r--cpp/src/tests/TimerTest.cpp2
-rw-r--r--cpp/src/tests/TopicExchangeTest.cpp14
-rw-r--r--cpp/src/tests/TxPublishTest.cpp4
-rw-r--r--cpp/src/tests/Uuid.cpp12
-rwxr-xr-xcpp/src/tests/acl.py148
-rw-r--r--cpp/src/tests/asyncstore.cmake3
-rw-r--r--cpp/src/tests/brokertest.py37
-rwxr-xr-xcpp/src/tests/cluster_test_logs.py3
-rwxr-xr-xcpp/src/tests/cluster_tests.py189
-rwxr-xr-xcpp/src/tests/federation.py467
-rwxr-xr-xcpp/src/tests/ha_tests.py539
-rwxr-xr-xcpp/src/tests/ipv6_test13
-rw-r--r--cpp/src/tests/logging.cpp10
-rwxr-xr-xcpp/src/tests/ping_broker127
-rw-r--r--cpp/src/tests/qpid-latency-test.cpp18
-rw-r--r--cpp/src/tests/qpid-receive.cpp5
-rwxr-xr-xcpp/src/tests/run_acl_tests4
-rwxr-xr-xcpp/src/tests/run_federation_tests8
-rwxr-xr-xcpp/src/tests/run_ha_tests29
-rwxr-xr-xcpp/src/tests/sasl_test_setup.sh2
-rwxr-xr-xcpp/src/tests/ssl_test5
-rwxr-xr-xcpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh (renamed from cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh)29
-rwxr-xr-xcpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh33
-rw-r--r--cpp/src/tests/txjob.cpp6
-rw-r--r--cpp/src/tests/txshift.cpp2
38 files changed, 1846 insertions, 360 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index 637442e128..29dfe3634f 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -149,6 +149,7 @@ set(unit_tests_to_build
PollableCondition
Variant
ClientMessage
+ SystemInfo
${xml_tests}
CACHE STRING "Which unit tests to build"
)
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 2fb284741a..66a16b9178 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -33,6 +33,8 @@
#include <iostream>
#include "MessageUtils.h"
+using std::string;
+
using boost::intrusive_ptr;
using namespace qpid::broker;
using namespace qpid::framing;
diff --git a/cpp/src/tests/FieldTable.cpp b/cpp/src/tests/FieldTable.cpp
index c79d110ae4..8aeeb031b3 100644
--- a/cpp/src/tests/FieldTable.cpp
+++ b/cpp/src/tests/FieldTable.cpp
@@ -20,6 +20,7 @@
*/
#include <iostream>
#include <algorithm>
+#include "qpid/sys/alloca.h"
#include "qpid/framing/Array.h"
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/FieldValue.h"
@@ -29,6 +30,8 @@
using namespace qpid::framing;
+using std::string;
+
namespace qpid {
namespace tests {
@@ -73,11 +76,11 @@ QPID_AUTO_TEST_CASE(testAssignment)
FieldTable c;
c = a;
- char* buff = static_cast<char*>(::alloca(c.encodedSize()));
- Buffer wbuffer(buff, c.encodedSize());
+ std::vector<char> buff(c.encodedSize());
+ Buffer wbuffer(&buff[0], c.encodedSize());
wbuffer.put(c);
- Buffer rbuffer(buff, c.encodedSize());
+ Buffer rbuffer(&buff[0], c.encodedSize());
rbuffer.get(d);
BOOST_CHECK_EQUAL(c, d);
BOOST_CHECK(string("CCCC") == c.getAsString("A"));
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 66d2cdd5d5..cdc7429f3b 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -67,7 +67,7 @@ tmodule_LTLIBRARIES=
TESTS+=unit_test
check_PROGRAMS+=unit_test
-unit_test_LDADD=-lboost_unit_test_framework \
+unit_test_LDADD=-lboost_unit_test_framework -lpthread \
$(lib_messaging) $(lib_broker) $(lib_console) $(lib_qmf2)
unit_test_SOURCES= unit_test.cpp unit_test.h \
@@ -124,7 +124,8 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \
Address.cpp \
ClientMessage.cpp \
Qmf2.cpp \
- BrokerOptions.cpp
+ BrokerOptions.cpp \
+ SystemInfo.cpp
if HAVE_XML
unit_test_SOURCES+= XmlClientSessionTest.cpp
@@ -150,7 +151,7 @@ endif
# Test programs that are installed and therefore built as part of make, not make check
qpidexectest_SCRIPTS += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh
-EXTRA_DIST += qpid-cpp-benchmark install_env.sh
+EXTRA_DIST += qpid-cpp-benchmark qpid-cluster-benchmark install_env.sh
qpidexectest_PROGRAMS += receiver
receiver_SOURCES = \
@@ -305,7 +306,7 @@ TESTS_ENVIRONMENT = \
system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest \
run_msg_group_tests
TESTS += start_broker $(system_tests) python_tests stop_broker \
- ha_tests.py run_federation_tests run_federation_sys_tests \
+ run_ha_tests run_federation_tests run_federation_sys_tests \
run_acl_tests run_cli_tests replication_test dynamic_log_level_test \
run_queue_flow_limit_tests ipv6_test
@@ -352,7 +353,8 @@ EXTRA_DIST += \
run_queue_flow_limit_tests \
run_msg_group_tests \
ipv6_test \
- ha_tests.py \
+ run_ha_tests \
+ ha_tests.py \
test_env.ps1.in
check_LTLIBRARIES += libdlclose_noop.la
diff --git a/cpp/src/tests/MessageBuilderTest.cpp b/cpp/src/tests/MessageBuilderTest.cpp
index c3d40ed88a..9adb133d40 100644
--- a/cpp/src/tests/MessageBuilderTest.cpp
+++ b/cpp/src/tests/MessageBuilderTest.cpp
@@ -40,7 +40,7 @@ class MockMessageStore : public NullMessageStore
uint64_t id;
boost::intrusive_ptr<PersistableMessage> expectedMsg;
- string expectedData;
+ std::string expectedData;
std::list<Op> ops;
void checkExpectation(Op actual)
@@ -58,7 +58,7 @@ class MockMessageStore : public NullMessageStore
ops.push_back(STAGE);
}
- void expectAppendContent(PersistableMessage& msg, const string& data)
+ void expectAppendContent(PersistableMessage& msg, const std::string& data)
{
expectedMsg = &msg;
expectedData = data;
@@ -73,7 +73,7 @@ class MockMessageStore : public NullMessageStore
}
void appendContent(const boost::intrusive_ptr<const PersistableMessage>& msg,
- const string& data)
+ const std::string& data)
{
checkExpectation(APPEND);
BOOST_CHECK_EQUAL(boost::static_pointer_cast<const PersistableMessage>(expectedMsg), msg);
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 7d67c92b37..3a3ed061f9 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -24,7 +24,6 @@
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/framing/Uuid.h"
-#include "qpid/sys/alloca.h"
#include "unit_test.h"
@@ -33,6 +32,8 @@
using namespace qpid::broker;
using namespace qpid::framing;
+using std::string;
+
namespace qpid {
namespace tests {
@@ -69,11 +70,11 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
dProps->setDeliveryMode(PERSISTENT);
BOOST_CHECK(msg->isPersistent());
- char* buff = static_cast<char*>(::alloca(msg->encodedSize()));
- Buffer wbuffer(buff, msg->encodedSize());
+ std::vector<char> buff(msg->encodedSize());
+ Buffer wbuffer(&buff[0], msg->encodedSize());
msg->encode(wbuffer);
- Buffer rbuffer(buff, msg->encodedSize());
+ Buffer rbuffer(&buff[0], msg->encodedSize());
msg = new Message();
msg->decodeHeader(rbuffer);
msg->decodeContent(rbuffer);
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
index a1b140d484..991e2a2714 100644
--- a/cpp/src/tests/MessageUtils.h
+++ b/cpp/src/tests/MessageUtils.h
@@ -33,7 +33,7 @@ namespace tests {
struct MessageUtils
{
- static boost::intrusive_ptr<Message> createMessage(const string& exchange="", const string& routingKey="",
+ static boost::intrusive_ptr<Message> createMessage(const std::string& exchange="", const std::string& routingKey="",
const bool durable = false, const Uuid& messageId=Uuid(true),
uint64_t contentSize = 0)
{
@@ -53,7 +53,7 @@ struct MessageUtils
return msg;
}
- static void addContent(boost::intrusive_ptr<Message> msg, const string& data)
+ static void addContent(boost::intrusive_ptr<Message> msg, const std::string& data)
{
AMQFrame content((AMQContentBody(data)));
msg->getFrames().append(content);
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 968d55fd45..c8ee3aa401 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -1146,6 +1146,24 @@ QPID_AUTO_TEST_CASE(testLargeRoutingKey)
BOOST_CHECK_THROW(fix.session.createReceiver(address), qpid::messaging::MessagingException);
}
+QPID_AUTO_TEST_CASE(testAlternateExchangeInLinkDeclare)
+{
+ MessagingFixture fix;
+ Sender s = fix.session.createSender("amq.direct/key");
+ Receiver r1 = fix.session.createReceiver("amq.direct/key;{link:{x-declare:{alternate-exchange:'amq.fanout'}}}");
+ Receiver r2 = fix.session.createReceiver("amq.fanout");
+
+ for (uint i = 0; i < 10; ++i) {
+ s.send(Message((boost::format("Message_%1%") % (i+1)).str()), true);
+ }
+ r1.close();//orphans all messages in subscription queue, which should then be routed through alternate exchange
+ for (uint i = 0; i < 10; ++i) {
+ Message received;
+ BOOST_CHECK(r2.fetch(received, Duration::SECOND));
+ BOOST_CHECK_EQUAL(received.getContent(), (boost::format("Message_%1%") % (i+1)).str());
+ }
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index fb429ca981..3b4f74620f 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -31,6 +31,8 @@
#include "qpid/broker/QueueRegistry.h"
#include "qpid/broker/NullMessageStore.h"
#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/client/QueueOptions.h"
#include "qpid/framing/AMQFrame.h"
@@ -40,8 +42,11 @@
#include "qpid/broker/QueueFlowLimit.h"
#include <iostream>
-#include "boost/format.hpp"
+#include <vector>
+#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+using namespace std;
using boost::intrusive_ptr;
using namespace qpid;
using namespace qpid::broker;
@@ -83,7 +88,7 @@ public:
Message& getMessage() { return *(msg.get()); }
};
-intrusive_ptr<Message> create_message(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
+intrusive_ptr<Message> createMessage(std::string exchange, std::string routingKey, uint64_t ttl = 0) {
intrusive_ptr<Message> msg(new Message());
AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange, 0, 0)));
AMQFrame header((AMQHeaderBody()));
@@ -94,6 +99,16 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK
return msg;
}
+intrusive_ptr<Message> contentMessage(string content) {
+ intrusive_ptr<Message> m(MessageUtils::createMessage());
+ MessageUtils::addContent(m, content);
+ return m;
+}
+
+string getContent(intrusive_ptr<Message> m) {
+ return m->getFrames().getContent();
+}
+
QPID_AUTO_TEST_SUITE(QueueTestSuite)
QPID_AUTO_TEST_CASE(testAsyncMessage) {
@@ -105,7 +120,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
//Test basic delivery:
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -120,7 +135,7 @@ QPID_AUTO_TEST_CASE(testAsyncMessage) {
QPID_AUTO_TEST_CASE(testAsyncMessageCount){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
msg1->enqueueAsync(queue, (MessageStore*)0);//this is done on enqueue which is not called from process
queue->process(msg1);
@@ -145,9 +160,9 @@ QPID_AUTO_TEST_CASE(testConsumers){
BOOST_CHECK_EQUAL(uint32_t(2), queue->getConsumerCount());
//Test basic delivery:
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
queue->deliver(msg1);
BOOST_CHECK(queue->dispatch(c1));
@@ -191,9 +206,9 @@ QPID_AUTO_TEST_CASE(testRegistry){
QPID_AUTO_TEST_CASE(testDequeue){
Queue::shared_ptr queue(new Queue("my_queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
intrusive_ptr<Message> received;
queue->deliver(msg1);
@@ -265,9 +280,9 @@ QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
Queue::shared_ptr queue(new Queue("my-queue", true));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -291,9 +306,9 @@ QPID_AUTO_TEST_CASE(testSeek){
Queue::shared_ptr queue(new Queue("my-queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -317,9 +332,9 @@ QPID_AUTO_TEST_CASE(testSearch){
Queue::shared_ptr queue(new Queue("my-queue", true));
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
//enqueue 2 messages
queue->deliver(msg1);
@@ -431,10 +446,10 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
- intrusive_ptr<Message> msg4 = create_message("e", "D");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
+ intrusive_ptr<Message> msg4 = createMessage("e", "D");
intrusive_ptr<Message> received;
//set deliever match for LVQ a,b,c,a
@@ -466,9 +481,9 @@ QPID_AUTO_TEST_CASE(testLVQOrdering){
received = queue->get().payload;
BOOST_CHECK_EQUAL(msg3.get(), received.get());
- intrusive_ptr<Message> msg5 = create_message("e", "A");
- intrusive_ptr<Message> msg6 = create_message("e", "B");
- intrusive_ptr<Message> msg7 = create_message("e", "C");
+ intrusive_ptr<Message> msg5 = createMessage("e", "A");
+ intrusive_ptr<Message> msg6 = createMessage("e", "B");
+ intrusive_ptr<Message> msg7 = createMessage("e", "C");
msg5->insertCustomProperty(key,"a");
msg6->insertCustomProperty(key,"b");
msg7->insertCustomProperty(key,"c");
@@ -498,8 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQEmptyKey){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
string key;
args.getLVQKey(key);
@@ -524,12 +539,12 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){
Queue::shared_ptr queue(new Queue("my-queue", true ));
queue->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "B");
- intrusive_ptr<Message> msg3 = create_message("e", "C");
- intrusive_ptr<Message> msg4 = create_message("e", "D");
- intrusive_ptr<Message> msg5 = create_message("e", "F");
- intrusive_ptr<Message> msg6 = create_message("e", "G");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "C");
+ intrusive_ptr<Message> msg4 = createMessage("e", "D");
+ intrusive_ptr<Message> msg5 = createMessage("e", "F");
+ intrusive_ptr<Message> msg6 = createMessage("e", "G");
//set deliever match for LVQ a,b,c,a
@@ -601,8 +616,8 @@ QPID_AUTO_TEST_CASE(testLVQMultiQueue){
queue1->configure(args);
queue2->configure(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "A");
string key;
args.getLVQKey(key);
@@ -645,8 +660,8 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
intrusive_ptr<Message> received;
queue1->create(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
- intrusive_ptr<Message> msg2 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
+ intrusive_ptr<Message> msg2 = createMessage("e", "A");
// 2
string key;
args.getLVQKey(key);
@@ -673,7 +688,7 @@ QPID_AUTO_TEST_CASE(testLVQRecover){
void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint evenTtl = 0)
{
for (uint i = 0; i < count; i++) {
- intrusive_ptr<Message> m = create_message("exchange", "key", i % 2 ? oddTtl : evenTtl);
+ intrusive_ptr<Message> m = createMessage("exchange", "key", i % 2 ? oddTtl : evenTtl);
m->computeExpiration(new broker::ExpiryPolicy);
queue.deliver(m);
}
@@ -736,7 +751,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
std::string("b"), std::string("b"), std::string("b"),
std::string("c"), std::string("c"), std::string("c") };
for (int i = 0; i < 9; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", groups[i]);
msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
@@ -883,7 +898,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
// Queue = a-2,
// Owners= ^C3,
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", "a");
msg->insertCustomProperty("MY-ID", 9);
queue->deliver(msg);
@@ -894,7 +909,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumer) {
gotOne = queue->dispatch(c2);
BOOST_CHECK( !gotOne );
- msg = create_message("e", "A");
+ msg = createMessage("e", "A");
msg->insertCustomProperty("GROUP-ID", "b");
msg->insertCustomProperty("MY-ID", 10);
queue->deliver(msg);
@@ -925,7 +940,7 @@ QPID_AUTO_TEST_CASE(testGroupsMultiConsumerDefaults) {
queue->configure(args);
for (int i = 0; i < 3; ++i) {
- intrusive_ptr<Message> msg = create_message("e", "A");
+ intrusive_ptr<Message> msg = createMessage("e", "A");
// no "GROUP-ID" header
msg->insertCustomProperty("MY-ID", i);
queue->deliver(msg);
@@ -988,7 +1003,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
Queue::shared_ptr queue2(new Queue("queue2", true, &testStore ));
queue2->create(args);
- intrusive_ptr<Message> msg1 = create_message("e", "A");
+ intrusive_ptr<Message> msg1 = createMessage("e", "A");
queue1->deliver(msg1);
queue2->deliver(msg1);
@@ -1004,7 +1019,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue2->setLastNodeFailure();
BOOST_CHECK_EQUAL(testStore.enqCnt, 2u);
- intrusive_ptr<Message> msg2 = create_message("e", "B");
+ intrusive_ptr<Message> msg2 = createMessage("e", "B");
queue1->deliver(msg2);
queue2->deliver(msg2);
@@ -1019,7 +1034,7 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
queue1->clearLastNodeFailure();
queue2->clearLastNodeFailure();
- intrusive_ptr<Message> msg3 = create_message("e", "B");
+ intrusive_ptr<Message> msg3 = createMessage("e", "B");
queue1->deliver(msg3);
queue2->deliver(msg3);
BOOST_CHECK_EQUAL(testStore.enqCnt, 4u);
@@ -1033,8 +1048,8 @@ QPID_AUTO_TEST_CASE(testMultiQueueLastNode){
* internal details not part of the queue abstraction.
// check requeue 1
- intrusive_ptr<Message> msg4 = create_message("e", "C");
- intrusive_ptr<Message> msg5 = create_message("e", "D");
+ intrusive_ptr<Message> msg4 = createMessage("e", "C");
+ intrusive_ptr<Message> msg5 = createMessage("e", "D");
framing::SequenceNumber sequence(1);
QueuedMessage qmsg1(queue1.get(), msg4, sequence);
@@ -1081,8 +1096,8 @@ not requeued to the store.
queue1->create(args);
// check requeue 1
- intrusive_ptr<Message> msg1 = create_message("e", "C");
- intrusive_ptr<Message> msg2 = create_message("e", "D");
+ intrusive_ptr<Message> msg1 = createMessage("e", "C");
+ intrusive_ptr<Message> msg2 = createMessage("e", "D");
queue1->recover(msg1);
@@ -1114,7 +1129,7 @@ simulate store exception going into last node standing
queue1->configure(args);
// check requeue 1
- intrusive_ptr<Message> msg1 = create_message("e", "C");
+ intrusive_ptr<Message> msg1 = createMessage("e", "C");
queue1->deliver(msg1);
testStore.createError();
@@ -1401,6 +1416,133 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocking){
BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
}
+QPID_AUTO_TEST_CASE(testSetPositionFifo) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ BOOST_CHECK_EQUAL(q->getPosition(), SequenceNumber(0));
+ for (int i = 0; i < 10; ++i)
+ q->deliver(contentMessage(boost::lexical_cast<string>(i+1)));
+
+ // Verify the front of the queue
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1u, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+ // Verify the back of the queue
+ QueuedMessage qm;
+ BOOST_CHECK_EQUAL(10u, q->getPosition());
+ BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+ BOOST_CHECK_EQUAL(10u, q->getMessageCount());
+
+ // Using setPosition to introduce a gap in sequence numbers.
+ q->setPosition(15);
+ BOOST_CHECK_EQUAL(10u, q->getMessageCount());
+ BOOST_CHECK_EQUAL(15u, q->getPosition());
+ BOOST_CHECK(q->find(10, qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("10", getContent(qm.payload));
+ q->deliver(contentMessage("16"));
+ c->setPosition(9);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(10u, c->last.position);
+ BOOST_CHECK_EQUAL("10", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(16u, c->last.position);
+ BOOST_CHECK_EQUAL("16", getContent(c->last.payload));
+
+ // Using setPosition to trunkcate the queue
+ q->setPosition(5);
+ BOOST_CHECK_EQUAL(5u, q->getMessageCount());
+ q->deliver(contentMessage("6a"));
+ c->setPosition(4);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(5u, c->last.position);
+ BOOST_CHECK_EQUAL("5", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(6u, c->last.position);
+ BOOST_CHECK_EQUAL("6a", getContent(c->last.payload));
+ BOOST_CHECK(!q->dispatch(c)); // No more messages.
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionLvq) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ string key="key";
+ framing::FieldTable args;
+ args.setString("qpid.last_value_queue_key", "key");
+ q->configure(args);
+
+ const char* values[] = { "a", "b", "c", "a", "b", "c" };
+ for (size_t i = 0; i < sizeof(values)/sizeof(values[0]); ++i) {
+ intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
+ m->insertCustomProperty(key, values[i]);
+ q->deliver(m);
+ }
+ BOOST_CHECK_EQUAL(3u, q->getMessageCount());
+ // Verify the front of the queue
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Don't acquire
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4u, c->last.position); // Numbered from 1
+ BOOST_CHECK_EQUAL("4", getContent(c->last.payload));
+ // Verify the back of the queue
+ QueuedMessage qm;
+ BOOST_CHECK_EQUAL(6u, q->getPosition());
+ BOOST_CHECK(q->find(q->getPosition(), qm)); // Back of the queue
+ BOOST_CHECK_EQUAL("6", getContent(qm.payload));
+
+ q->setPosition(5);
+ c->setPosition(4);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(5u, c->last.position); // Numbered from 1
+ BOOST_CHECK(!q->dispatch(c));
+}
+
+QPID_AUTO_TEST_CASE(testSetPositionPriority) {
+ Queue::shared_ptr q(new Queue("my-queue", true));
+ framing::FieldTable args;
+ args.setInt("qpid.priorities", 10);
+ q->configure(args);
+
+ const int priorities[] = { 1, 2, 3, 2, 1, 3 };
+ for (size_t i = 0; i < sizeof(priorities)/sizeof(priorities[0]); ++i) {
+ intrusive_ptr<Message> m = contentMessage(boost::lexical_cast<string>(i+1));
+ m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+ ->setPriority(priorities[i]);
+ q->deliver(m);
+ }
+
+ // Truncation removes messages in fifo order, not priority order.
+ q->setPosition(3);
+ TestConsumer::shared_ptr c(new TestConsumer("test", false)); // Browse in FIFO order
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1u, c->last.position);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(2u, c->last.position);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(3u, c->last.position);
+ BOOST_CHECK(!q->dispatch(c));
+
+ intrusive_ptr<Message> m = contentMessage("4a");
+ m->getFrames().getHeaders()->get<DeliveryProperties>(true)
+ ->setPriority(4);
+ q->deliver(m);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4u, c->last.position);
+ BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+
+ // But consumers see priority order
+ c.reset(new TestConsumer("test", true));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(4u, c->last.position);
+ BOOST_CHECK_EQUAL("4a", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(3u, c->last.position);
+ BOOST_CHECK_EQUAL("3", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(2u, c->last.position);
+ BOOST_CHECK_EQUAL("2", getContent(c->last.payload));
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK_EQUAL(1u, c->last.position);
+ BOOST_CHECK_EQUAL("1", getContent(c->last.payload));
+}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/RangeSet.cpp b/cpp/src/tests/RangeSet.cpp
index db3a964086..285f432bf7 100644
--- a/cpp/src/tests/RangeSet.cpp
+++ b/cpp/src/tests/RangeSet.cpp
@@ -29,63 +29,71 @@ namespace tests {
QPID_AUTO_TEST_SUITE(RangeSetTestSuite)
-typedef qpid::Range<int> TestRange;
-typedef qpid::RangeSet<int> TestRangeSet;
+typedef qpid::Range<int> TR; // Test Range
+typedef RangeSet<int> TRSet;
QPID_AUTO_TEST_CASE(testEmptyRange) {
- TestRange r;
+ TR r;
+ BOOST_CHECK_EQUAL(r, TR(0,0));
BOOST_CHECK(r.empty());
BOOST_CHECK(!r.contains(0));
- // BOOST_CHECK(r.contiguous(0));
}
QPID_AUTO_TEST_CASE(testRangeSetAddPoint) {
- TestRangeSet r;
+ TRSet r;
BOOST_CHECK(r.empty());
r += 3;
BOOST_CHECK_MESSAGE(r.contains(3), r);
- BOOST_CHECK_MESSAGE(r.contains(TestRange(3,4)), r);
+ BOOST_CHECK_MESSAGE(r.contains(TR(3,4)), r);
BOOST_CHECK(!r.empty());
r += 5;
BOOST_CHECK_MESSAGE(r.contains(5), r);
- BOOST_CHECK_MESSAGE(r.contains(TestRange(5,6)), r);
- BOOST_CHECK_MESSAGE(!r.contains(TestRange(3,6)), r);
+ BOOST_CHECK_MESSAGE(r.contains(TR(5,6)), r);
+ BOOST_CHECK_MESSAGE(!r.contains(TR(3,6)), r);
r += 4;
- BOOST_CHECK_MESSAGE(r.contains(TestRange(3,6)), r);
+ BOOST_CHECK_MESSAGE(r.contains(TR(3,6)), r);
}
QPID_AUTO_TEST_CASE(testRangeSetAddRange) {
- TestRangeSet r;
- r += TestRange(0,3);
- BOOST_CHECK(r.contains(TestRange(0,3)));
- r += TestRange(4,6);
- BOOST_CHECK_MESSAGE(r.contains(TestRange(4,6)), r);
+ TRSet r;
+ r += TR(0,3);
+ BOOST_CHECK(r.contains(TR(0,3)));
+ BOOST_CHECK(r.contiguous());
+ r += TR(4,6);
+ BOOST_CHECK(!r.contiguous());
+ BOOST_CHECK_MESSAGE(r.contains(TR(4,6)), r);
r += 3;
- BOOST_CHECK_MESSAGE(r.contains(TestRange(0,6)), r);
+ BOOST_CHECK_MESSAGE(r.contains(TR(0,6)), r);
BOOST_CHECK(r.front() == 0);
BOOST_CHECK(r.back() == 6);
+
+ // Merging additions
+ r = TRSet(0,3)+TR(5,6);
+ TRSet e(0,6);
+ BOOST_CHECK_EQUAL(r + TR(3,5), e);
+ BOOST_CHECK(e.contiguous());
+ r = TRSet(0,5)+TR(10,15)+TR(20,25)+TR(30,35)+TR(40,45);
+ BOOST_CHECK_EQUAL(r + TR(11,37), TRSet(0,5)+TR(11,37)+TR(40,45));
}
QPID_AUTO_TEST_CASE(testRangeSetAddSet) {
- TestRangeSet r;
- TestRangeSet s = TestRangeSet(0,3)+TestRange(5,10);
+ TRSet r;
+ TRSet s = TRSet(0,3)+TR(5,10);
r += s;
BOOST_CHECK_EQUAL(r,s);
- r += TestRangeSet(3,5) + TestRange(7,12) + 15;
- BOOST_CHECK_EQUAL(r, TestRangeSet(0,12) + 15);
+ r += TRSet(3,5) + TR(7,12) + 15;
+ BOOST_CHECK_EQUAL(r, TRSet(0,12) + 15);
r.clear();
BOOST_CHECK(r.empty());
- r += TestRange::makeClosed(6,10);
- BOOST_CHECK_EQUAL(r, TestRangeSet(6,11));
- r += TestRangeSet(2,6)+8;
- BOOST_CHECK_EQUAL(r, TestRangeSet(2,11));
+ r += TR::makeClosed(6,10);
+ BOOST_CHECK_EQUAL(r, TRSet(6,11));
+ r += TRSet(2,6)+8;
+ BOOST_CHECK_EQUAL(r, TRSet(2,11));
}
QPID_AUTO_TEST_CASE(testRangeSetIterate) {
- TestRangeSet r;
- (((r += 1) += 10) += TestRange(4,7)) += 2;
- BOOST_MESSAGE(r);
+ TRSet r = TRSet(1,3)+TR(4,7)+TR(10,11);
std::vector<int> actual;
std::copy(r.begin(), r.end(), std::back_inserter(actual));
std::vector<int> expect = boost::assign::list_of(1)(2)(4)(5)(6)(10);
@@ -94,51 +102,51 @@ QPID_AUTO_TEST_CASE(testRangeSetIterate) {
QPID_AUTO_TEST_CASE(testRangeSetRemove) {
// points
- BOOST_CHECK_EQUAL(TestRangeSet(0,5)-3, TestRangeSet(0,3)+TestRange(4,5));
- BOOST_CHECK_EQUAL(TestRangeSet(1,5)-5, TestRangeSet(1,5));
- BOOST_CHECK_EQUAL(TestRangeSet(1,5)-0, TestRangeSet(1,5));
+ BOOST_CHECK_EQUAL(TRSet(0,5)-3, TRSet(0,3)+TR(4,5));
+ BOOST_CHECK_EQUAL(TRSet(1,5)-5, TRSet(1,5));
+ BOOST_CHECK_EQUAL(TRSet(1,5)-0, TRSet(1,5));
- TestRangeSet r(TestRangeSet(0,5)+TestRange(10,15)+TestRange(20,25));
+ TRSet r(TRSet(0,5)+TR(10,15)+TR(20,25));
- // TestRanges
- BOOST_CHECK_EQUAL(r-TestRange(0,5), TestRangeSet(10,15)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(10,15), TestRangeSet(0,5)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(20,25), TestRangeSet(0,5)+TestRange(10,15));
+ // TRs
+ BOOST_CHECK_EQUAL(r-TR(0,5), TRSet(10,15)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(10,15), TRSet(0,5)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(20,25), TRSet(0,5)+TR(10,15));
- BOOST_CHECK_EQUAL(r-TestRange(-5, 30), TestRangeSet());
+ BOOST_CHECK_EQUAL(r-TR(-5, 30), TRSet());
- BOOST_CHECK_EQUAL(r-TestRange(-5, 7), TestRangeSet(10,15)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(8,19), TestRangeSet(0,5)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(17,30), TestRangeSet(0,5)+TestRange(10,15));
- BOOST_CHECK_EQUAL(r-TestRange(17,30), TestRangeSet(0,5)+TestRange(10,15));
+ BOOST_CHECK_EQUAL(r-TR(-5, 7), TRSet(10,15)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(8,19), TRSet(0,5)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(17,30), TRSet(0,5)+TR(10,15));
- BOOST_CHECK_EQUAL(r-TestRange(-5, 5), TestRangeSet(10,15)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(10,19), TestRangeSet(0,5)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(18,25), TestRangeSet(0,5)+TestRange(10,15));
- BOOST_CHECK_EQUAL(r-TestRange(23,25), TestRangeSet(0,5)+TestRange(10,15)+TestRange(20,23));
+ BOOST_CHECK_EQUAL(r-TR(-5, 5), TRSet(10,15)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(10,19), TRSet(0,5)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(18,25), TRSet(0,5)+TR(10,15));
+ BOOST_CHECK_EQUAL(r-TR(23,25), TRSet(0,5)+TR(10,15)+TR(20,23));
- BOOST_CHECK_EQUAL(r-TestRange(-3, 3), TestRangeSet(3,5)+TestRange(10,15)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(3, 7), TestRangeSet(0,2)+TestRange(10,15)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(3, 12), TestRangeSet(0,3)+TestRange(12,15)+TestRange(20,25));
- BOOST_CHECK_EQUAL(r-TestRange(3, 22), TestRangeSet(12,15)+TestRange(22,25));
- BOOST_CHECK_EQUAL(r-TestRange(12, 22), TestRangeSet(0,5)+TestRange(10,11)+TestRange(22,25));
+ BOOST_CHECK_EQUAL(r-TR(-3, 3), TRSet(3,5)+TR(10,15)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(3, 7), TRSet(0,2)+TR(10,15)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(3, 12), TRSet(0,3)+TR(12,15)+TR(20,25));
+ BOOST_CHECK_EQUAL(r-TR(3, 22), TRSet(12,15)+TR(22,25));
+ BOOST_CHECK_EQUAL(r-TR(12, 22), TRSet(0,5)+TR(10,11)+TR(22,25));
// Sets
- BOOST_CHECK_EQUAL(r-(TestRangeSet(-1,6)+TestRange(11,14)+TestRange(23,25)),
- TestRangeSet(10,11)+TestRange(14,15)+TestRange(20,23));
-}
-
-QPID_AUTO_TEST_CASE(testRangeContaining) {
- TestRangeSet r;
- (((r += 1) += TestRange(3,5)) += 7);
- BOOST_CHECK_EQUAL(r.rangeContaining(0), TestRange(0,0));
- BOOST_CHECK_EQUAL(r.rangeContaining(1), TestRange(1,2));
- BOOST_CHECK_EQUAL(r.rangeContaining(2), TestRange(2,2));
- BOOST_CHECK_EQUAL(r.rangeContaining(3), TestRange(3,5));
- BOOST_CHECK_EQUAL(r.rangeContaining(4), TestRange(3,5));
- BOOST_CHECK_EQUAL(r.rangeContaining(5), TestRange(5,5));
- BOOST_CHECK_EQUAL(r.rangeContaining(6), TestRange(6,6));
- BOOST_CHECK_EQUAL(r.rangeContaining(7), TestRange(7,8));
+ BOOST_CHECK_EQUAL(r-(TRSet(-1,6)+TR(11,14)+TR(23,25)),
+ TRSet(10,11)+TR(14,15)+TR(20,23));
+ // Split the ranges
+ BOOST_CHECK_EQUAL(r-(TRSet(2,3)+TR(11,13)+TR(21,23)),
+ TRSet(0,2)+TR(4,5)+
+ TR(10,11)+TR(14,15)+
+ TR(20,21)+TR(23,25));
+ // Truncate the ranges
+ BOOST_CHECK_EQUAL(r-(TRSet(0,3)+TR(13,15)+TR(19,23)),
+ TRSet(3,5)+TR(10,13)+TR(20,23));
+ // Remove multiple ranges with truncation
+ BOOST_CHECK_EQUAL(r-(TRSet(3,23)), TRSet(0,3)+TR(23,25));
+ // Remove multiple ranges in middle
+ TRSet r2 = TRSet(0,5)+TR(10,15)+TR(20,25)+TR(30,35);
+ BOOST_CHECK_EQUAL(r2-TRSet(11,24),
+ TRSet(0,5)+TR(10,11)+TR(24,25)+TR(30,35));
}
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/ReplicationTest.cpp b/cpp/src/tests/ReplicationTest.cpp
index 1219a6b59e..055f06579f 100644
--- a/cpp/src/tests/ReplicationTest.cpp
+++ b/cpp/src/tests/ReplicationTest.cpp
@@ -62,7 +62,7 @@ qpid::sys::Shlib plugin(getLibPath("REPLICATING_LISTENER_LIB", default_shlib));
qpid::broker::Broker::Options getBrokerOpts(const std::vector<std::string>& args)
{
std::vector<const char*> argv(args.size());
- transform(args.begin(), args.end(), argv.begin(), boost::bind(&string::c_str, _1));
+ transform(args.begin(), args.end(), argv.begin(), boost::bind(&std::string::c_str, _1));
qpid::broker::Broker::Options opts;
qpid::Plugin::addOptions(opts);
@@ -72,7 +72,7 @@ qpid::broker::Broker::Options getBrokerOpts(const std::vector<std::string>& args
QPID_AUTO_TEST_CASE(testReplicationExchange)
{
- qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<string>("qpidd")
+ qpid::broker::Broker::Options brokerOpts(getBrokerOpts(list_of<std::string>("qpidd")
("--replication-exchange-name=qpid.replication")));
SessionFixture f(brokerOpts);
diff --git a/cpp/src/tests/SystemInfo.cpp b/cpp/src/tests/SystemInfo.cpp
new file mode 100644
index 0000000000..12d8d3dba8
--- /dev/null
+++ b/cpp/src/tests/SystemInfo.cpp
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+#include "unit_test.h"
+#include "test_tools.h"
+#include "qpid/sys/SystemInfo.h"
+#include <boost/assign.hpp>
+
+using namespace std;
+using namespace qpid::sys;
+using namespace boost::assign;
+
+namespace qpid {
+namespace tests {
+
+QPID_AUTO_TEST_SUITE(SystemInfoTestSuite)
+
+QPID_AUTO_TEST_CASE(TestIsLocalHost) {
+ // Test that local hostname and addresses are considered local
+ Address a;
+ BOOST_ASSERT(SystemInfo::getLocalHostname(a));
+ BOOST_ASSERT(SystemInfo::isLocalHost(a.host));
+ std::vector<Address> addrs;
+ SystemInfo::getLocalIpAddresses(0, addrs);
+ for (std::vector<Address>::iterator i = addrs.begin(); i != addrs.end(); ++i)
+ BOOST_ASSERT(SystemInfo::isLocalHost(i->host));
+ // Check some non-local addresses
+ BOOST_ASSERT(!SystemInfo::isLocalHost("123.4.5.6"));
+ BOOST_ASSERT(!SystemInfo::isLocalHost("nosuchhost"));
+ BOOST_ASSERT(SystemInfo::isLocalHost("127.0.0.1"));
+ BOOST_ASSERT(SystemInfo::isLocalHost("::1"));
+}
+
+QPID_AUTO_TEST_SUITE_END()
+
+}} // namespace qpid::tests
diff --git a/cpp/src/tests/TestMessageStore.h b/cpp/src/tests/TestMessageStore.h
index 20e0b755b2..0b63bc9c15 100644
--- a/cpp/src/tests/TestMessageStore.h
+++ b/cpp/src/tests/TestMessageStore.h
@@ -31,7 +31,7 @@ using namespace qpid::framing;
namespace qpid {
namespace tests {
-typedef std::pair<string, boost::intrusive_ptr<PersistableMessage> > msg_queue_pair;
+typedef std::pair<std::string, boost::intrusive_ptr<PersistableMessage> > msg_queue_pair;
class TestMessageStore : public NullMessageStore
{
diff --git a/cpp/src/tests/TimerTest.cpp b/cpp/src/tests/TimerTest.cpp
index 6a0a196f4e..fc5004dcb0 100644
--- a/cpp/src/tests/TimerTest.cpp
+++ b/cpp/src/tests/TimerTest.cpp
@@ -81,6 +81,8 @@ class TestTask : public TimerTask
uint64_t difference = _abs64(expected - actual);
#elif defined(_WIN32)
uint64_t difference = labs(expected - actual);
+#elif defined(__SUNPRO_CC)
+ uint64_t difference = llabs(expected - actual);
#else
uint64_t difference = abs(expected - actual);
#endif
diff --git a/cpp/src/tests/TopicExchangeTest.cpp b/cpp/src/tests/TopicExchangeTest.cpp
index ff8931f9c9..d57951ea3f 100644
--- a/cpp/src/tests/TopicExchangeTest.cpp
+++ b/cpp/src/tests/TopicExchangeTest.cpp
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+#include "qpid/broker/TopicKeyNode.h"
#include "qpid/broker/TopicExchange.h"
#include "unit_test.h"
#include "test_tools.h"
@@ -32,14 +33,15 @@ class TopicExchange::TopicExchangeTester {
public:
typedef std::vector<std::string> BindingVec;
+ typedef TopicKeyNode<TopicExchange::BindingKey> TestBindingNode;
private:
// binding node iterator that collects all routes that are bound
- class TestFinder : public TopicExchange::BindingNode::TreeIterator {
+ class TestFinder : public TestBindingNode::TreeIterator {
public:
TestFinder(BindingVec& m) : bv(m) {};
~TestFinder() {};
- bool visit(BindingNode& node) {
+ bool visit(TestBindingNode& node) {
if (!node.bindings.bindingVector.empty())
bv.push_back(node.routePattern);
return true;
@@ -53,7 +55,7 @@ public:
~TopicExchangeTester() {};
bool addBindingKey(const std::string& bKey) {
string routingPattern = normalize(bKey);
- BindingKey *bk = bindingTree.addBindingKey(routingPattern);
+ BindingKey *bk = bindingTree.add(routingPattern);
if (bk) {
// push a dummy binding to mark this node as "non-leaf"
bk->bindingVector.push_back(Binding::shared_ptr());
@@ -64,12 +66,12 @@ public:
bool removeBindingKey(const std::string& bKey){
string routingPattern = normalize(bKey);
- BindingKey *bk = bindingTree.getBindingKey(routingPattern);
+ BindingKey *bk = bindingTree.get(routingPattern);
if (bk) {
bk->bindingVector.pop_back();
if (bk->bindingVector.empty()) {
// no more bindings - remove this node
- bindingTree.removeBindingKey(routingPattern);
+ bindingTree.remove(routingPattern);
}
return true;
}
@@ -87,7 +89,7 @@ public:
}
private:
- TopicExchange::BindingNode bindingTree;
+ TestBindingNode bindingTree;
};
} // namespace broker
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 152581e4ba..a636646035 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -69,9 +69,9 @@ QPID_AUTO_TEST_CASE(testPrepare)
//ensure messages are enqueued in store
t.op.prepare(0);
BOOST_CHECK_EQUAL((size_t) 2, t.store.enqueued.size());
- BOOST_CHECK_EQUAL(string("queue1"), t.store.enqueued[0].first);
+ BOOST_CHECK_EQUAL(std::string("queue1"), t.store.enqueued[0].first);
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second);
- BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first);
+ BOOST_CHECK_EQUAL(std::string("queue2"), t.store.enqueued[1].first);
BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second);
BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete());
}
diff --git a/cpp/src/tests/Uuid.cpp b/cpp/src/tests/Uuid.cpp
index f85a297adc..aa9580e25e 100644
--- a/cpp/src/tests/Uuid.cpp
+++ b/cpp/src/tests/Uuid.cpp
@@ -19,7 +19,6 @@
#include "qpid/framing/Uuid.h"
#include "qpid/framing/Buffer.h"
#include "qpid/types/Uuid.h"
-#include "qpid/sys/alloca.h"
#include "unit_test.h"
@@ -52,6 +51,11 @@ boost::array<uint8_t, 16> sample = {{0x1b, 0x4e, 0x28, 0xba, 0x2f, 0xa1, 0x11,
const string sampleStr("1b4e28ba-2fa1-11d2-883f-b9a761bde3fb");
const string zeroStr("00000000-0000-0000-0000-000000000000");
+QPID_AUTO_TEST_CASE(testUuidStr) {
+ Uuid uuid(sampleStr);
+ BOOST_CHECK(uuid == sample);
+}
+
QPID_AUTO_TEST_CASE(testUuidIstream) {
Uuid uuid;
istringstream in(sampleStr);
@@ -92,12 +96,12 @@ QPID_AUTO_TEST_CASE(testUuidIOstream) {
}
QPID_AUTO_TEST_CASE(testUuidEncodeDecode) {
- char* buff = static_cast<char*>(::alloca(Uuid::size()));
- Buffer wbuf(buff, Uuid::size());
+ std::vector<char> buff(Uuid::size());
+ Buffer wbuf(&buff[0], Uuid::size());
Uuid uuid(sample.c_array());
uuid.encode(wbuf);
- Buffer rbuf(buff, Uuid::size());
+ Buffer rbuf(&buff[0], Uuid::size());
Uuid decoded;
decoded.decode(rbuf);
BOOST_CHECK_EQUAL(string(sample.begin(), sample.end()),
diff --git a/cpp/src/tests/acl.py b/cpp/src/tests/acl.py
index 720b3b4216..0e096a6f5b 100755
--- a/cpp/src/tests/acl.py
+++ b/cpp/src/tests/acl.py
@@ -285,10 +285,38 @@ class ACLTests(TestBase010):
if (result):
self.fail(result)
+ def test_nested_groups(self):
+ """
+ Test nested groups
+ """
+
+ aclf = self.get_acl_file()
+ aclf.write('group user-consume martin@QPID ted@QPID\n')
+ aclf.write('group group2 kim@QPID user-consume rob@QPID \n')
+ aclf.write('acl allow anonymous all all \n')
+ aclf.write('acl allow group2 create queue \n')
+ aclf.write('acl deny all all')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ session = self.get_session('rob','rob')
+ try:
+ session.queue_declare(queue="rob_queue")
+ except qpid.session.SessionException, e:
+ if (403 == e.args[0].error_code):
+ self.fail("ACL should allow queue create request");
+ self.fail("Error during queue create request");
+
+
+
def test_user_realm(self):
"""
Test a user defined without a realm
Ex. group admin rajith
+ Note: a user name without a realm is interpreted as a group name
"""
aclf = self.get_acl_file()
aclf.write('group admin bob\n') # shouldn't be allowed
@@ -297,7 +325,7 @@ class ACLTests(TestBase010):
aclf.close()
result = self.reload_acl()
- if (result.find("Username 'bob' must contain a realm",0,len(result)) == -1):
+ if (result.find("not defined yet.",0,len(result)) == -1):
self.fail(result)
def test_allowed_chars_for_username(self):
@@ -1509,6 +1537,124 @@ class ACLTests(TestBase010):
#=====================================
+ # QMF Topic Exchange tests
+ #=====================================
+
+ def test_qmf_topic_exchange_tests(self):
+ """
+ Test using QMF method hooks into ACL logic
+ """
+ aclf = self.get_acl_file()
+ aclf.write('# begin hack alert: allow anonymous to access the lookup debug functions\n')
+ aclf.write('acl allow-log anonymous create queue\n')
+ aclf.write('acl allow-log anonymous all exchange name=qmf.*\n')
+ aclf.write('acl allow-log anonymous all exchange name=amq.direct\n')
+ aclf.write('acl allow-log anonymous all exchange name=qpid.management\n')
+ aclf.write('acl allow-log anonymous access method name=*\n')
+ aclf.write('# end hack alert\n')
+ aclf.write('acl allow-log uPlain1@COMPANY publish exchange name=X routingkey=ab.cd.e\n')
+ aclf.write('acl allow-log uPlain2@COMPANY publish exchange name=X routingkey=.\n')
+ aclf.write('acl allow-log uStar1@COMPANY publish exchange name=X routingkey=a.*.b\n')
+ aclf.write('acl allow-log uStar2@COMPANY publish exchange name=X routingkey=*.x\n')
+ aclf.write('acl allow-log uStar3@COMPANY publish exchange name=X routingkey=x.x.*\n')
+ aclf.write('acl allow-log uHash1@COMPANY publish exchange name=X routingkey=a.#.b\n')
+ aclf.write('acl allow-log uHash2@COMPANY publish exchange name=X routingkey=a.#\n')
+ aclf.write('acl allow-log uHash3@COMPANY publish exchange name=X routingkey=#.a\n')
+ aclf.write('acl allow-log uHash4@COMPANY publish exchange name=X routingkey=a.#.b.#.c\n')
+ aclf.write('acl allow-log uMixed1@COMPANY publish exchange name=X routingkey=*.x.#.y\n')
+ aclf.write('acl allow-log uMixed2@COMPANY publish exchange name=X routingkey=a.#.b.*\n')
+ aclf.write('acl allow-log uMixed3@COMPANY publish exchange name=X routingkey=*.*.*.#\n')
+
+ aclf.write('acl allow-log all publish exchange name=X routingkey=MN.OP.Q\n')
+ aclf.write('acl allow-log all publish exchange name=X routingkey=M.*.N\n')
+ aclf.write('acl allow-log all publish exchange name=X routingkey=M.#.N\n')
+ aclf.write('acl allow-log all publish exchange name=X routingkey=*.M.#.N\n')
+
+ aclf.write('acl deny-log all all\n')
+ aclf.close()
+
+ result = self.reload_acl()
+ if (result):
+ self.fail(result)
+
+ # aclKey: "ab.cd.e"
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd.e", "allow-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "abx.cd.e", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd..e.", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", "ab.cd.e.", "deny-log")
+ self.LookupPublish("uPlain1@COMPANY", "X", ".ab.cd.e", "deny-log")
+ # aclKey: "."
+ self.LookupPublish("uPlain2@COMPANY", "X", ".", "allow-log")
+
+ # aclKey: "a.*.b"
+ self.LookupPublish("uStar1@COMPANY", "X", "a.xx.b", "allow-log")
+ self.LookupPublish("uStar1@COMPANY", "X", "a.b", "deny-log")
+ # aclKey: "*.x"
+ self.LookupPublish("uStar2@COMPANY", "X", "y.x", "allow-log")
+ self.LookupPublish("uStar2@COMPANY", "X", ".x", "allow-log")
+ self.LookupPublish("uStar2@COMPANY", "X", "x", "deny-log")
+ # aclKey: "x.x.*"
+ self.LookupPublish("uStar3@COMPANY", "X", "x.x.y", "allow-log")
+ self.LookupPublish("uStar3@COMPANY", "X", "x.x.", "allow-log")
+ self.LookupPublish("uStar3@COMPANY", "X", "x.x", "deny-log")
+ self.LookupPublish("uStar3@COMPANY", "X", "q.x.y", "deny-log")
+
+ # aclKey: "a.#.b"
+ self.LookupPublish("uHash1@COMPANY", "X", "a.b", "allow-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "a.x.b", "allow-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "a..x.y.zz.b", "allow-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "a.b.", "deny-log")
+ self.LookupPublish("uHash1@COMPANY", "X", "q.x.b", "deny-log")
+
+ # aclKey: "a.#"
+ self.LookupPublish("uHash2@COMPANY", "X", "a", "allow-log")
+ self.LookupPublish("uHash2@COMPANY", "X", "a.b", "allow-log")
+ self.LookupPublish("uHash2@COMPANY", "X", "a.b.c", "allow-log")
+
+ # aclKey: "#.a"
+ self.LookupPublish("uHash3@COMPANY", "X", "a", "allow-log")
+ self.LookupPublish("uHash3@COMPANY", "X", "x.y.a", "allow-log")
+
+ # aclKey: "a.#.b.#.c"
+ self.LookupPublish("uHash4@COMPANY", "X", "a.b.c", "allow-log")
+ self.LookupPublish("uHash4@COMPANY", "X", "a.x.b.y.c", "allow-log")
+ self.LookupPublish("uHash4@COMPANY", "X", "a.x.x.b.y.y.c", "allow-log")
+
+ # aclKey: "*.x.#.y"
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.x.y", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.x.p.qq.y", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.a.x.y", "deny-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "aa.x.b.c", "deny-log")
+
+ # aclKey: "a.#.b.*"
+ self.LookupPublish("uMixed2@COMPANY", "X", "a.b.x", "allow-log")
+ self.LookupPublish("uMixed2@COMPANY", "X", "a.x.x.x.b.x", "allow-log")
+
+ # aclKey: "*.*.*.#"
+ self.LookupPublish("uMixed3@COMPANY", "X", "x.y.z", "allow-log")
+ self.LookupPublish("uMixed3@COMPANY", "X", "x.y.z.a.b.c", "allow-log")
+ self.LookupPublish("uMixed3@COMPANY", "X", "x.y", "deny-log")
+ self.LookupPublish("uMixed3@COMPANY", "X", "x", "deny-log")
+
+ # Repeat the keys with wildcard user spec
+ self.LookupPublish("uPlain1@COMPANY", "X", "MN.OP.Q", "allow-log")
+ self.LookupPublish("uStar1@COMPANY" , "X", "M.xx.N", "allow-log")
+ self.LookupPublish("uHash1@COMPANY" , "X", "M.N", "allow-log")
+ self.LookupPublish("uHash1@COMPANY" , "X", "M.x.N", "allow-log")
+ self.LookupPublish("uHash1@COMPANY" , "X", "M..x.y.zz.N", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.M.N", "allow-log")
+ self.LookupPublish("uMixed1@COMPANY", "X", "a.M.p.qq.N", "allow-log")
+
+ self.LookupPublish("dev@QPID", "X", "MN.OP.Q", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M.xx.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M.x.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "M..x.y.zz.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "a.M.N", "allow-log")
+ self.LookupPublish("dev@QPID", "X", "a.M.p.qq.N", "allow-log")
+
+ #=====================================
# Connection limits
#=====================================
diff --git a/cpp/src/tests/asyncstore.cmake b/cpp/src/tests/asyncstore.cmake
index 795ea55cf7..51efa88bd3 100644
--- a/cpp/src/tests/asyncstore.cmake
+++ b/cpp/src/tests/asyncstore.cmake
@@ -45,7 +45,7 @@ if (UNIX)
qpidbroker
rt
)
- add_test (Store_Perftools_Smoke_Test ${CMAKE_CURRENT_SOURCE_DIR}/storePerftools/storePerftoolsSmokeTest.sh)
+ add_test (jrnl2Perf_smoke_test ${CMAKE_CURRENT_SOURCE_DIR}/storePerftools/jrnl2Perf_smoke_test.sh)
endif (UNIX)
# Async store perf test (asyncPerf)
@@ -77,4 +77,5 @@ if (UNIX)
qpidtypes
rt
)
+ add_test (asyncStorePerf_smoke_test ${CMAKE_CURRENT_SOURCE_DIR}/storePerftools/asyncStorePerf_smoke_test.sh)
endif (UNIX)
diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py
index 8255fbe9ac..aea4460e5a 100644
--- a/cpp/src/tests/brokertest.py
+++ b/cpp/src/tests/brokertest.py
@@ -76,18 +76,20 @@ def error_line(filename, n=1):
except: return ""
return ":\n" + "".join(result)
-def retry(function, timeout=10, delay=.01):
- """Call function until it returns True or timeout expires.
- Double the delay for each retry. Return True if function
- returns true, False if timeout expires."""
+def retry(function, timeout=10, delay=.01, max_delay=1):
+ """Call function until it returns a true value or timeout expires.
+ Double the delay for each retry up to max_delay.
+ Returns what function returns if true, None if timeout expires."""
deadline = time.time() + timeout
- while not function():
+ ret = None
+ while True:
+ ret = function()
+ if ret: return ret
remaining = deadline - time.time()
if remaining <= 0: return False
delay = min(delay, remaining)
time.sleep(delay)
- delay *= 2
- return True
+ delay = min(delay*2, max_delay)
class AtomicCounter:
def __init__(self):
@@ -239,15 +241,13 @@ def find_in_file(str, filename):
class Broker(Popen):
"A broker process. Takes care of start, stop and logging."
_broker_count = 0
+ _log_count = 0
- def __str__(self): return "Broker<%s %s>"%(self.name, self.pname)
+ def __str__(self): return "Broker<%s %s :%d>"%(self.log, self.pname, self.port())
def find_log(self):
- self.log = "%s.log" % self.name
- i = 1
- while (os.path.exists(self.log)):
- self.log = "%s-%d.log" % (self.name, i)
- i += 1
+ self.log = "%03d:%s.log" % (Broker._log_count, self.name)
+ Broker._log_count += 1
def get_log(self):
return os.path.abspath(self.log)
@@ -298,9 +298,9 @@ class Broker(Popen):
# Read port from broker process stdout if not already read.
if (self._port == 0):
try: self._port = int(self.stdout.readline())
- except ValueError:
- raise Exception("Can't get port for broker %s (%s)%s" %
- (self.name, self.pname, error_line(self.log,5)))
+ except ValueError, e:
+ raise Exception("Can't get port for broker %s (%s)%s: %s" %
+ (self.name, self.pname, error_line(self.log,5), e))
return self._port
def unexpected(self,msg):
@@ -572,7 +572,7 @@ class NumberedSender(Thread):
"""
Thread.__init__(self)
cmd = ["qpid-send",
- "--broker", url or broker.host_port(),
+ "--broker", url or broker.host_port(),
"--address", "%s;{create:always}"%queue,
"--connection-options", "{%s}"%(connection_options),
"--content-stdin"
@@ -647,6 +647,7 @@ class NumberedReceiver(Thread):
self.error = None
self.sender = sender
self.received = 0
+ self.queue = queue
def read_message(self):
n = int(self.receiver.stdout.readline())
@@ -657,7 +658,7 @@ class NumberedReceiver(Thread):
m = self.read_message()
while m != -1:
self.receiver.assert_running()
- assert(m <= self.received) # Check for missing messages
+ assert m <= self.received, "%s missing message %s>%s"%(self.queue, m, self.received)
if (m == self.received): # Ignore duplicates
self.received += 1
if self.sender:
diff --git a/cpp/src/tests/cluster_test_logs.py b/cpp/src/tests/cluster_test_logs.py
index 003d82c619..22f2470590 100755
--- a/cpp/src/tests/cluster_test_logs.py
+++ b/cpp/src/tests/cluster_test_logs.py
@@ -66,7 +66,8 @@ def filter_log(log):
'debug Sending keepalive signal to watchdog', # Watchdog timer thread
'last broker standing joined by 1 replicas, updating queue policies.',
'Connection .* timed out: closing', # heartbeat connection close
- "org.apache.qpid.broker:bridge:" # ignore bridge index
+ "org.apache.qpid.broker:bridge:", # ignore bridge index
+ "closed connection"
])
# Regex to match a UUID
uuid='\w\w\w\w\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w-\w\w\w\w\w\w\w\w\w\w\w\w'
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 8952f5de7b..3c96b252df 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -227,6 +227,18 @@ acl deny all all
self.assertEqual("x", cluster[0].get_message("q").content)
self.assertEqual("y", cluster[1].get_message("q").content)
+ def test_other_mech(self):
+ """Test using a mechanism other than PLAIN/ANONYMOUS for cluster update authentication.
+ Regression test for https://issues.apache.org/jira/browse/QPID-3849"""
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ cluster = self.cluster(2, args=["--auth", "yes", "--sasl-config", sasl_config,
+ "--cluster-username=zig",
+ "--cluster-password=zig",
+ "--cluster-mechanism=DIGEST-MD5"])
+ cluster[0].connect()
+ cluster.start() # Before the fix this broker falied to join the cluster.
+ cluster[2].connect()
+
def test_link_events(self):
"""Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
args = ["--mgmt-pub-interval", 1] # Publish management information every second.
@@ -768,6 +780,35 @@ acl deny all all
fetch(cluster[2])
+ def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
+ """ Prove that traffic can pass between two federated brokers.
+ """
+ tot_time = 0
+ active = False
+ send_session = src_broker.connect().session()
+ sender = send_session.sender(src)
+ receive_session = dst_broker.connect().session()
+ receiver = receive_session.receiver(dst)
+ while not active and tot_time < timeout:
+ sender.send(Message("Hello from Source!"))
+ try:
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ # Get this far without Empty exception, and the link is good!
+ active = True
+ while True:
+ # Keep receiving msgs, as several may have accumulated
+ receiver.fetch(timeout = 1)
+ receive_session.acknowledge()
+ except Empty:
+ if not active:
+ tot_time += 1
+ receiver.close()
+ receive_session.close()
+ sender.close()
+ send_session.close()
+ return active
+
def test_federation_failover(self):
"""
Verify that federation operates across failures occuring in a cluster.
@@ -778,38 +819,6 @@ acl deny all all
cluster to newly-added members
"""
- TIMEOUT = 30
- def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
- """ Prove that traffic can pass from source fed broker to
- destination fed broker
- """
- tot_time = 0
- active = False
- send_session = src_broker.connect().session()
- sender = send_session.sender(src)
- receive_session = dst_broker.connect().session()
- receiver = receive_session.receiver(dst)
- while not active and tot_time < timeout:
- sender.send(Message("Hello from Source!"))
- try:
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- # Get this far without Empty exception, and the link is good!
- active = True
- while True:
- # Keep receiving msgs, as several may have accumulated
- receiver.fetch(timeout = 1)
- receive_session.acknowledge()
- except Empty:
- if not active:
- tot_time += 1
- receiver.close()
- receive_session.close()
- sender.close()
- send_session.close()
- self.assertTrue(active, "Bridge failed to become active")
-
-
# 2 node cluster source, 2 node cluster destination
src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
src_cluster.ready();
@@ -848,43 +857,145 @@ acl deny all all
self.assertEqual(result.status, 0, result)
# check that traffic passes
- verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
# add src[2] broker to source cluster
src_cluster.start(expect=EXPECT_EXIT_FAIL);
src_cluster.ready();
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill src[0]. dst[0] should fail over to src[1]
src_cluster[0].kill()
for b in src_cluster[1:]: b.ready()
- verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
# Kill src[1], dst[0] should fail over to src[2]
src_cluster[1].kill()
for b in src_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
# Kill dest[0], force failover to dest[1]
dst_cluster[0].kill()
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Add dest[2]
# dest[1] syncs dest[2] to current remote state
dst_cluster.start(expect=EXPECT_EXIT_FAIL);
for b in dst_cluster[1:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
# Kill dest[1], force failover to dest[2]
dst_cluster[1].kill()
for b in dst_cluster[2:]: b.ready()
- verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+ assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
for i in range(2, len(src_cluster)): src_cluster[i].kill()
for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
+ def test_federation_multilink_failover(self):
+ """
+ Verify that multi-link federation operates across failures occuring in
+ a cluster.
+ """
+
+ # 1 node cluster source, 1 node cluster destination
+ src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ src_cluster.ready();
+ dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+ dst_cluster.ready();
+
+ # federate a direct binding across two separate links
+
+ # first, create a direct exchange bound to two queues using different
+ # bindings
+ cmd = self.popen(["qpid-config",
+ "--broker", src_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "exchange", "direct", "FedX"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ1"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ1", "one"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "add", "queue", "destQ2"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ cmd = self.popen(["qpid-config",
+ "--broker", dst_cluster[0].host_port(),
+ "bind", "FedX", "destQ2", "two"],
+ EXPECT_EXIT_OK)
+ cmd.wait()
+
+ # Create two separate links between the dst and source brokers, bind
+ # each to different keys
+ dst_cluster[0].startQmf()
+ dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+
+ for _l in [("link1", "bridge1", "one"),
+ ("link2", "bridge2", "two")]:
+ result = dst_broker.create("link", _l[0],
+ {"host":src_cluster[0].host(),
+ "port":src_cluster[0].port()},
+ False)
+ self.assertEqual(result.status, 0, result);
+ result = dst_broker.create("bridge", _l[1],
+ {"link":_l[0],
+ "src":"FedX",
+ "dest":"FedX",
+ "key":_l[2]}, False)
+ self.assertEqual(result.status, 0);
+
+ # check that traffic passes
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ # add new member, verify traffic
+ src_cluster.start(expect=EXPECT_EXIT_FAIL);
+ src_cluster.ready();
+
+ dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+ dst_cluster.ready();
+
+ assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+ src_cluster[0].kill()
+ for b in src_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
+
+ dst_cluster[0].kill()
+ for b in dst_cluster[1:]: b.ready()
+
+ assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
+ assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
+
+ for i in range(1, len(src_cluster)): src_cluster[i].kill()
+ for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
+
# Some utility code for transaction tests
XA_RBROLLBACK = 1
XA_RBTIMEOUT = 2
diff --git a/cpp/src/tests/federation.py b/cpp/src/tests/federation.py
index 7d613b98ce..dcd074eda9 100755
--- a/cpp/src/tests/federation.py
+++ b/cpp/src/tests/federation.py
@@ -23,6 +23,7 @@ from qpid.testlib import TestBase010
from qpid.datatypes import Message
from qpid.queue import Empty
from qpid.util import URL
+import qpid.messaging
from time import sleep, time
@@ -94,6 +95,11 @@ class FederationTests(TestBase010):
break
self._brokers.append(_b)
+ # add a new-style messaging connection to each broker
+ for _b in self._brokers:
+ _b.connection = qpid.messaging.Connection(_b.url)
+ _b.connection.open()
+
def _teardown_brokers(self):
""" Un-does _setup_brokers()
"""
@@ -103,7 +109,7 @@ class FederationTests(TestBase010):
if not _b.client_session.error():
_b.client_session.close(timeout=10)
_b.client_conn.close(timeout=10)
-
+ _b.connection.close()
def test_bridge_create_and_close(self):
self.startQmf();
@@ -127,18 +133,28 @@ class FederationTests(TestBase010):
self.verify_cleanup()
def test_pull_from_exchange(self):
+ """ This test uses an alternative method to manage links and bridges
+ via the broker object.
+ """
session = self.session
-
+
self.startQmf()
qmf = self.qmf
broker = qmf.getObjects(_class="broker")[0]
- result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
- self.assertEqual(result.status, 0, result)
- link = qmf.getObjects(_class="link")[0]
- result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
+ # create link
+ link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False,
+ "authMechanism":"PLAIN", "username":"guest", "password":"guest",
+ "transport":"tcp"}
+ result = broker.create("link", "test-link-1", link_args, False)
self.assertEqual(result.status, 0, result)
+ link = qmf.getObjects(_class="link")[0]
+ # create bridge
+ bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout",
+ "key":"my-key"}
+ result = broker.create("bridge", "test-bridge-1", bridge_args, False);
+ self.assertEqual(result.status, 0, result)
bridge = qmf.getObjects(_class="bridge")[0]
#setup queue to receive messages from local broker
@@ -164,9 +180,11 @@ class FederationTests(TestBase010):
self.fail("Got unexpected message in queue: " + extra.body)
except Empty: None
- result = bridge.close()
+
+ result = broker.delete("bridge", "test-bridge-1", {})
self.assertEqual(result.status, 0, result)
- result = link.close()
+
+ result = broker.delete("link", "test-link-1", {})
self.assertEqual(result.status, 0, result)
self.verify_cleanup()
@@ -376,6 +394,9 @@ class FederationTests(TestBase010):
for i in range(1, 11):
try:
msg = queue.get(timeout=5)
+ mp = msg.get("message_properties").application_headers
+ self.assertEqual(mp.__class__, dict)
+ self.assertEqual(mp['x-qpid.trace'], 'REMOTE') # check that the federation-tag override works
self.assertEqual("Message %d" % i, msg.body)
except Empty:
self.fail("Failed to find expected message containing 'Message %d'" % i)
@@ -2153,3 +2174,433 @@ class FederationTests(TestBase010):
self.verify_cleanup()
+ def test_multilink_direct(self):
+ """ Verify that two distinct links can be created between federated
+ brokers.
+ """
+ self.startQmf()
+ qmf = self.qmf
+ self._setup_brokers()
+ src_broker = self._brokers[0]
+ dst_broker = self._brokers[1]
+
+ # create a direct exchange on each broker
+ for _b in [src_broker, dst_broker]:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+ "direct", "exchange_declare failed!")
+
+ # create destination queues
+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+ dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True)
+ dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+
+ # create two connections, one for high priority traffic
+ for _q in ["HiPri", "Traffic"]:
+ result = dst_broker.qmf_object.create("link", _q,
+ {"host":src_broker.host,
+ "port":src_broker.port},
+ False)
+ self.assertEqual(result.status, 0);
+
+ links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+ for _l in links:
+ if _l.name == "HiPri":
+ hi_link = _l
+ elif _l.name == "Traffic":
+ data_link = _l
+ else:
+ self.fail("Unexpected Link found: " + _l.name)
+
+ # now create a route for messages sent with key "high" to use the
+ # hi_link
+ result = dst_broker.qmf_object.create("bridge", "HiPriBridge",
+ {"link":hi_link.name,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "key":"high"}, False)
+ self.assertEqual(result.status, 0);
+
+
+ # create routes for the "medium" and "low" links to use the normal
+ # data_link
+ for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]:
+ result = dst_broker.qmf_object.create("bridge", _b[0],
+ {"link":data_link.name,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "key":_b[1]}, False)
+ self.assertEqual(result.status, 0);
+
+ # now wait for the links to become operational
+ for _l in [hi_link, data_link]:
+ expire_time = time() + 30
+ while _l.state != "Operational" and time() < expire_time:
+ _l.update()
+ self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+ # verify each link uses a different connection
+ self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef,
+ "Different links using the same connection")
+
+ hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=hi_link.connectionRef)[0]
+ data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=data_link.connectionRef)[0]
+
+
+ # send hi data, verify only goes over hi link
+
+ r_ssn = dst_broker.connection.session()
+ hi_receiver = r_ssn.receiver("HiQ");
+ med_receiver = r_ssn.receiver("MedQ");
+ low_receiver = r_ssn.receiver("LoQ");
+
+ for _c in [hi_conn, data_conn]:
+ _c.update()
+ self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+ s_ssn = src_broker.connection.session()
+ hi_sender = s_ssn.sender("fedX.direct/high")
+ med_sender = s_ssn.sender("fedX.direct/medium")
+ low_sender = s_ssn.sender("fedX.direct/low")
+
+ try:
+ hi_sender.send(qpid.messaging.Message(content="hi priority"))
+ msg = hi_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "hi priority");
+ except:
+ self.fail("Hi Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages")
+
+ # send low and medium, verify it does not go over hi link
+
+ try:
+ med_sender.send(qpid.messaging.Message(content="medium priority"))
+ msg = med_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "medium priority");
+ except:
+ self.fail("Medium Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message")
+
+ try:
+ low_sender.send(qpid.messaging.Message(content="low priority"))
+ msg = low_receiver.fetch(timeout=10)
+ r_ssn.acknowledge()
+ self.assertEqual(msg.content, "low priority");
+ except:
+ self.fail("Low Pri message failure")
+
+ hi_conn.update()
+ data_conn.update()
+ self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+ self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message")
+
+ # cleanup
+
+ for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+ dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+ dst_broker.client_session.queue_delete(queue=_q[0])
+
+ for _b in [src_broker, dst_broker]:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+
+ def test_multilink_shared_queue(self):
+ """ Verify that two distinct links can be created between federated
+ brokers.
+ """
+ self.startQmf()
+ qmf = self.qmf
+ self._setup_brokers()
+ src_broker = self._brokers[0]
+ dst_broker = self._brokers[1]
+
+ # create a topic exchange on the destination broker
+ dst_broker.client_session.exchange_declare(exchange="fedX.topic", type="topic")
+ self.assertEqual(dst_broker.client_session.exchange_query(name="fedX.topic").type,
+ "topic", "exchange_declare failed!")
+
+ # create a destination queue
+ dst_broker.client_session.queue_declare(queue="destQ", auto_delete=True)
+ dst_broker.client_session.exchange_bind(queue="destQ", exchange="fedX.topic", binding_key="srcQ")
+
+ # create a single source queue
+ src_broker.client_session.queue_declare(queue="srcQ", auto_delete=True)
+
+ # create two connections
+ for _q in ["Link1", "Link2"]:
+ result = dst_broker.qmf_object.create("link", _q,
+ {"host":src_broker.host,
+ "port":src_broker.port},
+ False)
+ self.assertEqual(result.status, 0);
+
+ links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+ self.assertEqual(len(links), 2)
+
+ # now create two "parallel" queue routes from the source queue to the
+ # destination exchange.
+ result = dst_broker.qmf_object.create("bridge", "Bridge1",
+ {"link":"Link1",
+ "src":"srcQ",
+ "dest":"fedX.topic",
+ "srcIsQueue": True},
+ False)
+ self.assertEqual(result.status, 0);
+ result = dst_broker.qmf_object.create("bridge", "Bridge2",
+ {"link":"Link2",
+ "src":"srcQ",
+ "dest":"fedX.topic",
+ "srcIsQueue": True},
+ False)
+ self.assertEqual(result.status, 0);
+
+
+ # now wait for the links to become operational
+ for _l in links:
+ expire_time = time() + 30
+ while _l.state != "Operational" and time() < expire_time:
+ _l.update()
+ self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+ # verify each link uses a different connection
+ self.assertNotEqual(links[0].connectionRef, links[1].connectionRef,
+ "Different links using the same connection")
+
+ conn1 = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=links[0].connectionRef)[0]
+ conn2 = qmf.getObjects(_broker=dst_broker.qmf_broker,
+ _objectId=links[1].connectionRef)[0]
+
+ # verify messages sent to the queue are pulled by each connection
+
+ r_ssn = dst_broker.connection.session()
+ receiver = r_ssn.receiver("destQ");
+
+ for _c in [conn1, conn2]:
+ _c.update()
+ self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+ s_ssn = src_broker.connection.session()
+ sender = s_ssn.sender("srcQ")
+
+ try:
+ for x in range(5):
+ sender.send(qpid.messaging.Message(content="hello"))
+ for x in range(5):
+ msg = receiver.fetch(timeout=10)
+ self.assertEqual(msg.content, "hello");
+ r_ssn.acknowledge()
+ except:
+ self.fail("Message failure")
+
+ # expect messages to be split over each connection.
+ conn1.update()
+ conn2.update()
+ self.assertNotEqual(conn1.msgsToClient, 0, "No messages sent")
+ self.assertNotEqual(conn2.msgsToClient, 0, "No messages sent")
+ self.assertEqual(conn2.msgsToClient + conn1.msgsToClient, 5,
+ "Expected 5 messages total")
+
+ for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ dst_broker.client_session.exchange_unbind(queue="destQ", exchange="fedX.topic", binding_key="srcQ")
+ dst_broker.client_session.exchange_delete(exchange="fedX.topic")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
+
+ def test_dynamic_direct_shared_queue(self):
+ """
+ Route Topology:
+
+ +<--- B1
+ B0 <---+<--- B2
+ +<--- B3
+ """
+ session = self.session
+
+ # create the federation
+
+ self.startQmf()
+ qmf = self.qmf
+
+ self._setup_brokers()
+
+ # create direct exchange on each broker, and retrieve the corresponding
+ # management object for that exchange
+
+ exchanges=[]
+ for _b in self._brokers:
+ _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+ self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+ "direct", "exchange_declare failed!")
+ # pull the exchange out of qmf...
+ retries = 0
+ my_exchange = None
+ while my_exchange is None:
+ objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+ for ooo in objs:
+ if ooo.name == "fedX.direct":
+ my_exchange = ooo
+ break
+ if my_exchange is None:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "QMF failed to find new exchange!")
+ sleep(1)
+ exchanges.append(my_exchange)
+
+ self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
+
+ # Create 2 links per each source broker (1,2,3) to the downstream
+ # broker 0:
+ for _b in range(1,4):
+ for _l in ["dynamic", "queue"]:
+ result = self._brokers[0].qmf_object.create( "link",
+ "Link-%d-%s" % (_b, _l),
+ {"host":self._brokers[_b].host,
+ "port":self._brokers[_b].port}, False)
+ self.assertEqual(result.status, 0)
+
+ # create queue on source brokers for use by the dynamic route
+ self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True)
+
+ for _l in range(1,4):
+ # for each dynamic link, create a dynamic bridge for the "fedX.direct"
+ # exchanges, using the fedSrcQ on each upstream source broker
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-%d-dynamic" % _l,
+ {"link":"Link-%d-dynamic" % _l,
+ "src":"fedX.direct",
+ "dest":"fedX.direct",
+ "dynamic":True,
+ "queue":"fedSrcQ"}, False)
+ self.assertEqual(result.status, 0)
+
+ # create a queue route that shares the queue used by the dynamic route
+ result = self._brokers[0].qmf_object.create("bridge",
+ "Bridge-%d-queue" % _l,
+ {"link":"Link-%d-queue" % _l,
+ "src":"fedSrcQ",
+ "dest":"fedX.direct",
+ "srcIsQueue":True}, False)
+ self.assertEqual(result.status, 0)
+
+
+ # wait for the inter-broker links to become operational
+ retries = 0
+ operational = False
+ while not operational:
+ operational = True
+ for _l in qmf.getObjects(_class="link"):
+ #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+ if _l.state != "Operational":
+ operational = False
+ if not operational:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "inter-broker links failed to become operational.")
+ sleep(1)
+
+ # @todo - There is no way to determine when the bridge objects become
+ # active. Hopefully, this is long enough!
+ sleep(6)
+
+ # create a queue on B0, bound to "spudboy"
+ self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True)
+ self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+
+ # subscribe to messages arriving on B2's queue
+ self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1")
+ queue = self._brokers[0].client_session.incoming("f1")
+
+ # wait until the binding key has propagated to each broker
+
+ binding_counts = [1, 1, 1, 1]
+ self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+ for i in range(3,-1,-1):
+ retries = 0
+ exchanges[i].update()
+ while exchanges[i].bindingCount < binding_counts[i]:
+ retries += 1
+ self.failIfEqual(retries, 10,
+ "binding failed to propagate to broker %d"
+ % i)
+ sleep(3)
+ exchanges[i].update()
+
+ for _b in range(1,4):
+ # send 3 msgs from each source broker
+ for i in range(3):
+ dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy")
+ self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i))
+
+ # get exactly 9 (3 per broker) on B0
+ for i in range(9):
+ msg = queue.get(timeout=5)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
+ # verify that messages went across every link
+ for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+ _class="link"):
+ for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+ _objectId=_l.connectionRef):
+ self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.")
+
+ # cleanup
+
+ self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+ self._brokers[0].client_session.message_cancel(destination="f1")
+ self._brokers[0].client_session.queue_delete(queue="DestQ")
+
+ for _b in qmf.getObjects(_class="bridge"):
+ result = _b.close()
+ self.assertEqual(result.status, 0)
+
+ for _l in qmf.getObjects(_class="link"):
+ result = _l.close()
+ self.assertEqual(result.status, 0)
+
+ for _b in self._brokers:
+ _b.client_session.exchange_delete(exchange="fedX.direct")
+
+ self._teardown_brokers()
+
+ self.verify_cleanup()
+
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 827cb7dca9..d25281eed5 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -18,59 +18,123 @@
# under the License.
#
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
from qpid.datatypes import uuid4
from brokertest import *
from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
+from uuid import UUID
log = getLogger(__name__)
+class QmfAgent(object):
+ """Access to a QMF broker agent."""
+ def __init__(self, address, **kwargs):
+ self._connection = Connection.establish(
+ address, client_properties={"qpid.ha-admin":1}, **kwargs)
+ self._agent = BrokerAgent(self._connection)
+ assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+ def __getattr__(self, name):
+ a = getattr(self._agent, name)
+ return a
+
+class Credentials(object):
+ """SASL credentials: username, password, and mechanism"""
+ def __init__(self, username, password, mechanism):
+ (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+ def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+ def tuple(self): return (self.username, self.password, self.mechanism)
+
+ def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
class HaBroker(Broker):
- def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
- ha_replicate="all", **kwargs):
+ """Start a broker with HA enabled
+ @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+ """
+ def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+ client_credentials=None, **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+", "--log-enable=debug+:ha::",
+ "--log-enable=debug+:ha::",
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
if ha_replicate is not None:
args += [ "--ha-replicate=%s"%ha_replicate ]
- if broker_url: args.extend([ "--ha-brokers", broker_url ])
+ if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
Broker.__init__(self, test, args, **kwargs)
- self.commands=os.getenv("PYTHON_COMMANDS")
- assert os.path.isdir(self.commands)
+ self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+ assert os.path.exists(self.qpid_ha_path)
+ self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+ assert os.path.exists(self.qpid_config_path)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-
- def promote(self):
- assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
-
- def set_client_url(self, url):
- assert os.system(
- "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
-
- def set_broker_url(self, url):
- assert os.system(
- "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
-
- def replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ self.qpid_ha_script=import_script(self.qpid_ha_path)
+ self._agent = None
+ self.client_credentials = client_credentials
+
+ def __str__(self): return Broker.__str__(self)
+
+ def qpid_ha(self, args):
+ cred = self.client_credentials
+ url = self.host_port()
+ if cred:
+ url =cred.add_user(url)
+ args = args + ["--sasl-mechanism", cred.mechanism]
+ self.qpid_ha_script.main_except(["", "-b", url]+args)
+
+ def promote(self): self.qpid_ha(["promote"])
+ def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+ def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
+ def agent(self):
+ if not self._agent:
+ cred = self.client_credentials
+ if cred:
+ self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+ else:
+ self._agent = QmfAgent(self.host_port())
+ return self._agent
+
+ def ha_status(self):
+ hb = self.agent().getHaBroker()
+ hb.update()
+ return hb.status
+
+ def wait_status(self, status):
+ def try_get_status():
+ # Ignore ConnectionError, the broker may not be up yet.
+ try: return self.ha_status() == status;
+ except ConnectionError: return False
+ assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
+
+ # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ def qpid_config(self, args):
+ assert subprocess.call(
+ [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
def config_replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
def config_declare(self, queue, replication):
- assert os.system(
- "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
def connect_admin(self, **kwargs):
- return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+ cred = self.client_credentials
+ if cred:
+ return Broker.connect(
+ self, client_properties={"qpid.ha-admin":1},
+ username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+ **kwargs)
+ else:
+ return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
def wait_backup(self, address):
"""Wait for address to become valid on a backup broker."""
@@ -78,6 +142,14 @@ class HaBroker(Broker):
try: wait_address(bs, address)
finally: bs.connection.close()
+ def assert_browse(self, queue, expected, **kwargs):
+ """Verify queue contents by browsing."""
+ bs = self.connect().session()
+ try:
+ wait_address(bs, queue)
+ assert_browse_retry(bs, queue, expected, **kwargs)
+ finally: bs.connection.close()
+
def assert_browse_backup(self, queue, expected, **kwargs):
"""Combines wait_backup and assert_browse_retry."""
bs = self.connect_admin().session()
@@ -86,33 +158,70 @@ class HaBroker(Broker):
assert_browse_retry(bs, queue, expected, **kwargs)
finally: bs.connection.close()
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def try_connect(self):
+ try: return self.connect()
+ except ConnectionError: return None
+
class HaCluster(object):
_cluster_count = 0
- def __init__(self, test, n, **kwargs):
+ def __init__(self, test, n, promote=True, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
- self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ self.kwargs = kwargs
+ self._brokers = []
+ self.id = HaCluster._cluster_count
+ self.broker_id = 0
HaCluster._cluster_count += 1
- self.url = ",".join([b.host_port() for b in self])
- for b in self: b.set_broker_url(self.url)
+ for i in xrange(n): self.start(False)
+ self.update_urls()
self[0].promote()
+ def next_name(self):
+ name="cluster%s-%s"%(self.id, self.broker_id)
+ self.broker_id += 1
+ return name
+
+ def start(self, update_urls=True, args=[]):
+ """Start a new broker in the cluster"""
+ b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+ self._brokers.append(b)
+ if update_urls: self.update_urls()
+ return b
+
+ def update_urls(self):
+ self.url = ",".join([b.host_port() for b in self])
+ if len(self) > 1: # No failover addresses on a 1 cluster.
+ for b in self: b.set_brokers_url(self.url)
+
def connect(self, i):
"""Connect with reconnect_urls"""
return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
- def kill(self, i):
+ def kill(self, i, promote_next=True):
"""Kill broker i, promote broker i+1"""
- self[i].kill()
self[i].expect = EXPECT_EXIT_FAIL
- self[(i+1) % len(self)].promote()
+ self[i].kill()
+ if promote_next: self[(i+1) % len(self)].promote()
+
+ def restart(self, i):
+ """Start a broker with the same port, name and data directory. It will get
+ a separate log file: foo.n.log"""
+ b = self._brokers[i]
+ self._brokers[i] = HaBroker(
+ self.test, name=b.name, port=b.port(), brokers_url=self.url,
+ **self.kwargs)
- def bounce(self, i):
+ def bounce(self, i, promote_next=True):
"""Stop and restart a broker in a cluster."""
- self.kill(i)
- b = self[i]
- self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+ self.kill(i, promote_next)
+ self.restart(i)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -128,12 +237,12 @@ def wait_address(session, address):
except NotFound: return False
assert retry(check), "Timed out waiting for address %s"%(address)
-def assert_missing(session, address):
- """Assert that the address is _not_ valid"""
+def valid_address(session, address):
+ """Test if an address is valid"""
try:
session.receiver(address)
- self.fail("Expected NotFound: %s"%(address))
- except NotFound: pass
+ return True
+ except NotFound: return False
class ReplicationTests(BrokerTest):
"""Correctness tests for HA replication."""
@@ -180,7 +289,7 @@ class ReplicationTests(BrokerTest):
self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
self.assert_browse_retry(b, prefix+"q2", []) # configuration only
- assert_missing(b, prefix+"q3")
+ assert not valid_address(b, prefix+"q3")
b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -195,7 +304,7 @@ class ReplicationTests(BrokerTest):
# Create config, send messages before starting the backup, to test catch-up replication.
setup(p, "1", primary)
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
# Create config, send messages after starting the backup, to test steady-state replication.
setup(p, "2", primary)
@@ -233,10 +342,10 @@ class ReplicationTests(BrokerTest):
s = p.sender("q;{create:always}")
for m in [str(i) for i in range(0,10)]: s.send(m)
s.sync()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
for m in [str(i) for i in range(10,20)]: s.send(m)
s.sync()
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
for m in [str(i) for i in range(20,30)]: s.send(m)
s.sync()
@@ -276,7 +385,7 @@ class ReplicationTests(BrokerTest):
"""Verify that backups rejects connections and that fail-over works in python client"""
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
# Check that backup rejects normal connections
try:
backup.connect().session()
@@ -294,14 +403,15 @@ class ReplicationTests(BrokerTest):
primary.kill()
assert retry(lambda: not is_running(primary.pid))
backup.promote()
- self.assert_browse_retry(s, "q", ["foo"])
+ sender.send("bar")
+ self.assert_browse_retry(s, "q", ["foo", "bar"])
c.close()
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
url="%s,%s"%(primary.host_port(), backup.host_port())
primary.connect().session().sender("q;{create:always}")
backup.wait_backup("q")
@@ -344,6 +454,7 @@ class ReplicationTests(BrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
@@ -393,7 +504,7 @@ class ReplicationTests(BrokerTest):
"""Verify that we replicate to an LVQ correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
@@ -410,7 +521,7 @@ class ReplicationTests(BrokerTest):
"""Test replication with the ring queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
for i in range(10): s.send(Message(str(i)))
backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
@@ -419,18 +530,20 @@ class ReplicationTests(BrokerTest):
"""Test replication with the reject queue policy"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
try:
for i in range(10): s.send(Message(str(i)), sync=False)
except qpid.messaging.exceptions.TargetCapacityExceeded: pass
backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+ # Detach, don't close as there is a broken session
+ s.session.connection.detach()
def test_priority(self):
"""Verify priority queues replicate correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
session = primary.connect().session()
s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
@@ -445,7 +558,7 @@ class ReplicationTests(BrokerTest):
"""Verify priority queues replicate correctly"""
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
session = primary.connect().session()
levels = 8
priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
@@ -464,7 +577,7 @@ class ReplicationTests(BrokerTest):
def test_priority_ring(self):
primary = HaBroker(self, name="primary")
primary.promote()
- backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+ backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
for p in priorities: s.send(Message(priority=p))
@@ -475,8 +588,10 @@ class ReplicationTests(BrokerTest):
# correct result, the uncommented one is for the actualy buggy
# result. See https://issues.apache.org/jira/browse/QPID-3866
#
- # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
- backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+ # expect = sorted(priorities,reverse=True)[0:5]
+ expect = [9,9,9,9,2]
+ primary.assert_browse("q", expect, transform=lambda m: m.priority)
+ backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
def test_backup_acquired(self):
"""Verify that acquired messages are backed up, for all queue types."""
@@ -509,11 +624,11 @@ class ReplicationTests(BrokerTest):
primary = HaBroker(self, name="primary")
primary.promote()
- backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+ backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
c = primary.connect()
for t in tests: t.send(c) # Send messages, leave one unacknowledged.
- backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+ backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
# Wait for backups to catch up.
for t in tests:
t.wait(self, backup1)
@@ -538,11 +653,13 @@ class ReplicationTests(BrokerTest):
self.fail("Excpected no-such-queue exception")
except NotFound: pass
- def test_invalid_default(self):
- """Verify that a queue with an invalid qpid.replicate gets default treatment"""
- cluster = HaCluster(self, 2, ha_replicate="all")
- c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
- cluster[1].wait_backup("q")
+ def test_invalid_replication(self):
+ """Verify that we reject an attempt to declare a queue with invalid replication value."""
+ cluster = HaCluster(self, 1, ha_replicate="all")
+ try:
+ c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+ self.fail("Expected ConnectionError")
+ except ConnectionError: pass
def test_exclusive_queue(self):
"""Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -559,6 +676,136 @@ class ReplicationTests(BrokerTest):
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+ def test_auto_delete_exclusive(self):
+ """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
+ cluster = HaCluster(self,2)
+ s = cluster[0].connect().session()
+ s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+ s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+ s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+ s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+ s.receiver("q;{create:always}")
+
+ s = cluster[1].connect_admin().session()
+ cluster[1].wait_backup("q")
+ assert not valid_address(s, "exad")
+ assert valid_address(s, "ex")
+ assert valid_address(s, "ad")
+ assert valid_address(s, "time")
+
+ def test_broker_info(self):
+ """Check that broker information is correctly published via management"""
+ cluster = HaCluster(self, 3)
+
+ for broker in cluster: # Make sure HA system-id matches broker's
+ qmf = broker.agent().getHaBroker()
+ self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+ cluster_ports = map(lambda b: b.port(), cluster)
+ cluster_ports.sort()
+ def ports(qmf):
+ qmf.update()
+ return sorted(map(lambda b: b["port"], qmf.members))
+ # Check that all brokers have the same membership as the cluster
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+ # Add a new broker, check it is updated everywhere
+ b = cluster.start()
+ cluster_ports.append(b.port())
+ cluster_ports.sort()
+ for broker in cluster:
+ qmf = broker.agent().getHaBroker()
+ assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+
+ def test_auth(self):
+ """Verify that authentication does not interfere with replication."""
+ # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+ sasl_config=os.path.join(self.rootdir, "sasl_config")
+ if not os.path.exists(sasl_config):
+ print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
+ return
+ acl=os.path.join(os.getcwd(), "policy.acl")
+ aclf=file(acl,"w")
+ # Verify that replication works with auth=yes and HA user has at least the following
+ # privileges:
+ aclf.write("""
+acl allow zag@QPID access queue
+acl allow zag@QPID create queue
+acl allow zag@QPID consume queue
+acl allow zag@QPID delete queue
+acl allow zag@QPID access exchange
+acl allow zag@QPID create exchange
+acl allow zag@QPID bind exchange
+acl allow zag@QPID publish exchange
+acl allow zag@QPID delete exchange
+acl allow zag@QPID access method
+acl allow zag@QPID create link
+acl deny all all
+ """)
+ aclf.close()
+ cluster = HaCluster(
+ self, 2,
+ args=["--auth", "yes", "--sasl-config", sasl_config,
+ "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"),
+ "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
+ ],
+ client_credentials=Credentials("zag", "zag", "PLAIN"))
+ s0 = cluster[0].connect(username="zag", password="zag").session();
+ s0.receiver("q;{create:always}")
+ s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+ cluster[1].wait_backup("q")
+ cluster[1].wait_backup("ex")
+ s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
+ s1.sender("ex").send("foo");
+ self.assertEqual(s1.receiver("q").fetch().content, "foo")
+
+ def test_alternate_exchange(self):
+ """Verify that alternate-exchange on exchanges and queues is propagated
+ to new members of a cluster. """
+ cluster = HaCluster(self, 2)
+ s = cluster[0].connect().session()
+ # altex exchange: acts as alternate exchange
+ s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+ # altq queue bound to altex, collect re-routed messages.
+ s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+ # 0ex exchange with alternate-exchange altex and no queues bound
+ s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+ # create queue q with alternate-exchange altex
+ s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+ # create a bunch of exchanges to ensure we don't clean up prematurely if the
+ # response comes in multiple fragments.
+ for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+ def verify(broker):
+ s = broker.connect().session()
+ # Verify unmatched message goes to ex's alternate.
+ s.sender("0ex").send("foo")
+ altq = s.receiver("altq")
+ self.assertEqual("foo", altq.fetch(timeout=0).content)
+ s.acknowledge()
+ # Verify rejected message goes to q's alternate.
+ s.sender("q").send("bar")
+ msg = s.receiver("q").fetch(timeout=0)
+ self.assertEqual("bar", msg.content)
+ s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+ self.assertEqual("bar", altq.fetch(timeout=0).content)
+ s.acknowledge()
+
+ # Sanity check: alternate exchanges on original broker
+ verify(cluster[0])
+ # Check backup that was connected during setup.
+ cluster[1].wait_backup("0ex")
+ cluster[1].wait_backup("q")
+ cluster.bounce(0)
+ verify(cluster[1])
+ # Check a newly started backup.
+ cluster.start()
+ cluster[2].wait_backup("0ex")
+ cluster[2].wait_backup("q")
+ cluster.bounce(1)
+ verify(cluster[2])
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -601,49 +848,135 @@ class LongTests(BrokerTest):
if d: return float(d)*60
else: return 3 # Default is to be quick
-
- def disable_test_failover(self):
+ def test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
- # FIXME aconway 2012-02-03: fails due to dropped messages,
- # known issue: sending messages to new primary before
- # backups are ready. Enable when fixed.
-
- # Start a cluster, all members will be killed during the test.
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
- for name in ["ha0","ha1","ha2"] ]
- url = ",".join([b.host_port() for b in brokers])
- for b in brokers: b.set_broker_url(url)
- brokers[0].promote()
+ brokers = HaCluster(self, 3)
# Start sender and receiver threads
- sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
- receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
- receiver.start()
- sender.start()
- # Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
+ n = 10
+ senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(n)]
+ receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+ failover_updates=False,
+ queue="test%s"%(i)) for i in xrange(n)]
+ for r in receivers: r.start()
+ for s in senders: s.start()
+
+ def wait_passed(r, n):
+ """Wait for receiver r to pass n"""
+ def check():
+ r.check() # Verify no exceptions
+ return r.received > n
+ assert retry(check), "Stalled %s at %s"%(r.queue, n)
+
+ for r in receivers: wait_passed(r, 0)
+
# Kill and restart brokers in a cycle:
endtime = time.time() + self.duration()
i = 0
- while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- port = brokers[i].port()
- brokers[i].kill()
- brokers.append(
- HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
- expect=EXPECT_EXIT_FAIL))
- i += 1
- brokers[i].promote()
- n = receiver.received # Verify we're still running
- def enough():
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
- assert retry(enough, timeout=5)
-
- sender.stop()
- receiver.stop()
- for b in brokers[i:]: b.kill()
+ try:
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ for s in senders: s.sender.assert_running()
+ for r in receivers: r.receiver.assert_running()
+ checkpoint = [ r.received for r in receivers ]
+ # Don't kill primary till it is active and the next
+ # backup is ready, otherwise we can lose messages.
+ brokers[i%3].wait_status("active")
+ brokers[(i+1)%3].wait_status("ready")
+ brokers.bounce(i%3)
+ i += 1
+ map(wait_passed, receivers, checkpoint) # Wait for all receivers
+ except:
+ traceback.print_exc()
+ raise
+ finally:
+ for s in senders: s.stop()
+ for r in receivers: r.stop()
+ dead = []
+ for i in xrange(3):
+ if not brokers[i].is_running(): dead.append(i)
+ brokers.kill(i, False)
+ if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+ """Tests for recovery after a failure."""
+
+ def test_queue_hold(self):
+ """Verify that the broker holds queues without sufficient backup,
+ i.e. does not complete messages sent to those queues."""
+
+ # We don't want backups to time out for this test, set long timeout.
+ cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
+ # Wait for the primary to be ready
+ cluster[0].wait_status("active")
+ # Create a queue before the failure.
+ s1 = cluster.connect(0).session().sender("q1;{create:always}")
+ for b in cluster: b.wait_backup("q1")
+ for i in xrange(100): s1.send(str(i))
+ # Kill primary and 2 backups
+ for i in [0,1,2]: cluster.kill(i, False)
+ cluster[3].promote() # New primary, backups will be 1 and 2
+ cluster[3].wait_status("recovering")
+
+ def assertSyncTimeout(s):
+ try:
+ s.sync(timeout=.01)
+ self.fail("Expected Timeout exception")
+ except Timeout: pass
+
+ # Create a queue after the failure
+ s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+ # Verify that messages sent are not completed
+ for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+ assertSyncTimeout(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ assertSyncTimeout(s2)
+ self.assertEqual(s2.unsettled(), 100)
+
+ # Verify we can receive even if sending is on hold:
+ cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+ # Restart backups, verify queues are released only when both backups are up
+ cluster.restart(1)
+ assertSyncTimeout(s1)
+ self.assertEqual(s1.unsettled(), 100)
+ assertSyncTimeout(s2)
+ self.assertEqual(s2.unsettled(), 100)
+ self.assertEqual(cluster[3].ha_status(), "recovering")
+ cluster.restart(2)
+
+ # Verify everything is up to date and active
+ def settled(sender): sender.sync(); return sender.unsettled() == 0;
+ assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+ assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+ cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+ cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+ cluster[3].wait_status("active"),
+ s1.session.connection.close()
+ s2.session.connection.close()
+
+ def test_expected_backup_timeout(self):
+ """Verify that we time-out expected backups and release held queues
+ after a configured interval. Verify backup is demoted to catch-up,
+ but can still rejoin.
+ """
+ cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
+ cluster[0].wait_status("active") # Primary ready
+ for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+ for i in [0,1]: cluster.kill(i, False)
+ cluster[2].promote() # New primary, backups will be 1 and 2
+ cluster[2].wait_status("recovering")
+ # Should not go active till the expected backup connects or times out.
+ self.assertEqual(cluster[2].ha_status(), "recovering")
+ # Messages should be held expected backup times out
+ s = cluster[2].connect().session().sender("q;{create:always}")
+ for i in xrange(100): s.send(str(i), sync=False)
+ # Verify message held initially.
+ try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
+ except Timeout: pass
+ s.sync(timeout=1) # And released after the timeout.
+ self.assertEqual(cluster[2].ha_status(), "active")
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
diff --git a/cpp/src/tests/ipv6_test b/cpp/src/tests/ipv6_test
index 6becfd8c96..9d1cb2acdd 100755
--- a/cpp/src/tests/ipv6_test
+++ b/cpp/src/tests/ipv6_test
@@ -19,6 +19,19 @@
# under the License.
#
+# Check whether we have any globally configured IPv6 addresses
+# - if not then we can't run the tests because ipv6 lookups won't
+# work within the qpid code. This is a deliberate feature to avoid
+# getting addresses that can't be routed by the machine.
+
+if ip -f inet6 -o addr | cut -f 9 -s -d' ' | grep global > /dev/null ; then
+ echo "IPv6 addresses configured continuing"
+else
+ echo "No global IPv6 addresses configured - skipping test"
+ exit 0
+fi
+
+
# Run a simple test over IPv6
source ./test_env.sh
diff --git a/cpp/src/tests/logging.cpp b/cpp/src/tests/logging.cpp
index 5d5bb1feef..a29714c002 100644
--- a/cpp/src/tests/logging.cpp
+++ b/cpp/src/tests/logging.cpp
@@ -258,7 +258,7 @@ QPID_AUTO_TEST_CASE(testOverhead) {
Statement statement(
Level level, const char* file="", int line=0, const char* fn=0)
{
- Statement s={0, file, line, fn, level};
+ Statement s={0, file, line, fn, level, ::qpid::log::unspecified};
return s;
}
@@ -347,11 +347,11 @@ QPID_AUTO_TEST_CASE(testLoggerStateure) {
};
opts.parse(ARGC(argv), const_cast<char**>(argv));
l.configure(opts);
- QPID_LOG(critical, "foo"); int srcline=__LINE__;
+ QPID_LOG_CAT(critical, test, "foo"); int srcline=__LINE__;
ifstream log("logging.tmp");
string line;
getline(log, line);
- string expect=(format("critical %s:%d: foo")%__FILE__%srcline).str();
+ string expect=(format("[Test] critical %s:%d: foo")%__FILE__%srcline).str();
BOOST_CHECK_EQUAL(expect, line);
log.close();
unlink("logging.tmp");
@@ -375,11 +375,11 @@ QPID_AUTO_TEST_CASE(testQuoteNonPrintable) {
char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff";
string str(s, sizeof(s));
- QPID_LOG(critical, str);
+ QPID_LOG_CAT(critical, test, str);
ifstream log("logging.tmp");
string line;
getline(log, line, '\0');
- string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
+ string expect="[Test] critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
BOOST_CHECK_EQUAL(expect, line);
log.close();
unlink("logging.tmp");
diff --git a/cpp/src/tests/ping_broker b/cpp/src/tests/ping_broker
new file mode 100755
index 0000000000..6c391027a3
--- /dev/null
+++ b/cpp/src/tests/ping_broker
@@ -0,0 +1,127 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from optparse import OptionParser, OptionGroup
+import sys
+import locale
+import socket
+import re
+from qpid.messaging import Connection
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs import BrokerAgent
+from qpidtoollibs import Display, Header, Sorter, YN, Commas, TimeLong
+
+
+class Config:
+ def __init__(self):
+ self._host = "localhost"
+ self._connTimeout = 10
+
+config = Config()
+conn_options = {}
+
+def OptionsAndArguments(argv):
+ """ Set global variables for options, return arguments """
+
+ global config
+ global conn_options
+
+ usage = "%prog [options]"
+
+ parser = OptionParser(usage=usage)
+
+ parser.add_option("-b", "--broker", action="store", type="string", default="localhost", metavar="<url>",
+ help="URL of the broker to query")
+ parser.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>",
+ help="Maximum time to wait for broker connection (in seconds)")
+ parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>",
+ help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
+ parser.add_option("--ssl-certificate", action="store", type="string", metavar="<cert>", help="Client SSL certificate (PEM Format)")
+ parser.add_option("--ssl-key", action="store", type="string", metavar="<key>", help="Client SSL private key (PEM Format)")
+ parser.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
+
+ opts, args = parser.parse_args(args=argv)
+
+ config._host = opts.broker
+ config._connTimeout = opts.timeout
+
+ if opts.sasl_mechanism:
+ conn_options['sasl_mechanisms'] = opts.sasl_mechanism
+ if opts.ssl_certificate:
+ conn_options['ssl_certfile'] = opts.ssl_certificate
+ if opts.ssl_key:
+ conn_options['ssl_key'] = opts.ssl_key
+ if opts.ha_admin:
+ conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+ return args
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.connection = None
+ self.broker = None
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.connection = Connection.establish(self.url, **conn_options)
+ self.broker = BrokerAgent(self.connection)
+
+ def Disconnect(self):
+ """ Release any allocated brokers. Ignore any failures as the tool is
+ shutting down.
+ """
+ try:
+ connection.close()
+ except:
+ pass
+
+ def Ping(self, args):
+ for sequence in range(10):
+ result = self.broker.echo(sequence, "ECHO BODY")
+ if result['sequence'] != sequence:
+ raise Exception("Invalid Sequence")
+
+
+def main(argv=None):
+
+ args = OptionsAndArguments(argv)
+ bm = BrokerManager()
+
+ try:
+ bm.SetBroker(config._host)
+ bm.Ping(args)
+ bm.Disconnect()
+ return 0
+ except KeyboardInterrupt:
+ print
+ except Exception,e:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
+
+ bm.Disconnect() # try to deallocate brokers
+ return 1
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/cpp/src/tests/qpid-latency-test.cpp b/cpp/src/tests/qpid-latency-test.cpp
index 20eb4568f3..2343cb1d77 100644
--- a/cpp/src/tests/qpid-latency-test.cpp
+++ b/cpp/src/tests/qpid-latency-test.cpp
@@ -359,19 +359,29 @@ void Sender::sendByRate()
}
uint64_t interval = TIME_SEC/opts.rate;
int64_t timeLimit = opts.timeLimit * TIME_SEC;
- uint64_t sent = 0, missedRate = 0;
+ uint64_t sent = 0;
AbsTime start = now();
+ AbsTime last = start;
while (true) {
AbsTime sentAt=now();
msg.getDeliveryProperties().setTimestamp(Duration(EPOCH, sentAt));
async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
if (opts.sync) session.sync();
++sent;
+ if (Duration(last, sentAt) > TIME_SEC*2) {
+ Duration t(start, now());
+ //check rate actually achieved thus far
+ uint actualRate = sent / (t/TIME_SEC);
+ //report inability to stay within 1% of desired rate
+ if (actualRate < opts.rate && opts.rate - actualRate > opts.rate/100) {
+ std::cerr << "WARNING: Desired send rate: " << opts.rate << ", actual send rate: " << actualRate << std::endl;
+ }
+ last = sentAt;
+ }
+
AbsTime waitTill(start, sent*interval);
Duration delay(sentAt, waitTill);
- if (delay < 0)
- ++missedRate;
- else
+ if (delay > 0)
sys::usleep(delay / TIME_USEC);
if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
session.sync();
diff --git a/cpp/src/tests/qpid-receive.cpp b/cpp/src/tests/qpid-receive.cpp
index 6deeb566dc..7a02b871db 100644
--- a/cpp/src/tests/qpid-receive.cpp
+++ b/cpp/src/tests/qpid-receive.cpp
@@ -68,6 +68,7 @@ struct Options : public qpid::Options
bool reportHeader;
string readyAddress;
uint receiveRate;
+ std::string replyto;
Options(const std::string& argv0=std::string())
: qpid::Options("Options"),
@@ -114,6 +115,7 @@ struct Options : public qpid::Options
("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
+ ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages")
("help", qpid::optValue(help), "print this usage statement");
add(log);
}
@@ -246,6 +248,9 @@ int main(int argc, char ** argv)
s = session.createSender(msg.getReplyTo());
s.setCapacity(opts.capacity);
}
+ if (!opts.replyto.empty()) {
+ msg.setReplyTo(Address(opts.replyto));
+ }
s.send(msg);
}
if (opts.receiveRate) {
diff --git a/cpp/src/tests/run_acl_tests b/cpp/src/tests/run_acl_tests
index 3a8c03eda6..25241ad75e 100755
--- a/cpp/src/tests/run_acl_tests
+++ b/cpp/src/tests/run_acl_tests
@@ -30,9 +30,9 @@ trap stop_brokers INT TERM QUIT
start_brokers() {
../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port
LOCAL_PORT=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-ip 2 --log-to-file locali.log > qpiddi.port
+ ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port
LOCAL_PORTI=`cat qpiddi.port`
- ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-user 2 --log-to-file localu.log > qpiddu.port
+ ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port
LOCAL_PORTU=`cat qpiddu.port`
}
diff --git a/cpp/src/tests/run_federation_tests b/cpp/src/tests/run_federation_tests
index 7735b559cf..c2ee550226 100755
--- a/cpp/src/tests/run_federation_tests
+++ b/cpp/src/tests/run_federation_tests
@@ -36,10 +36,10 @@ fi
QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no --log-enable=info+ --log-enable=debug+:Bridge --log-to-file"
start_brokers() {
rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log
- LOCAL_PORT=$($QPIDD_CMD fed_local.log)
- REMOTE_PORT=$($QPIDD_CMD fed_remote.log)
- REMOTE_B1=$($QPIDD_CMD fed_b1.log)
- REMOTE_B2=$($QPIDD_CMD fed_b2.log)
+ LOCAL_PORT=$($QPIDD_CMD fed_local.log --federation-tag LOCAL)
+ REMOTE_PORT=$($QPIDD_CMD fed_remote.log --federation-tag REMOTE)
+ REMOTE_B1=$($QPIDD_CMD fed_b1.log --federation-tag B1)
+ REMOTE_B2=$($QPIDD_CMD fed_b2.log --federation-tag B2)
}
stop_brokers() {
diff --git a/cpp/src/tests/run_ha_tests b/cpp/src/tests/run_ha_tests
new file mode 100755
index 0000000000..1a469646c9
--- /dev/null
+++ b/cpp/src/tests/run_ha_tests
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# Make sure the python tools are available. They will be if we are building in
+# a checkoug, they may not be in a distribution.
+test -d $PYTHON_COMMANDS -a -x $PYTHON_COMMANDS/qpid-ha -a -x $PYTHON_COMMANDS/qpid-config || { echo "Skipping HA tests, qpid-ha or qpid-config not available."; exit 0; }
+
+srcdir=`dirname $0`
+$srcdir/ha_tests.py
+
diff --git a/cpp/src/tests/sasl_test_setup.sh b/cpp/src/tests/sasl_test_setup.sh
index 3e69c0f02b..3947986517 100755
--- a/cpp/src/tests/sasl_test_setup.sh
+++ b/cpp/src/tests/sasl_test_setup.sh
@@ -30,7 +30,7 @@ pwcheck_method: auxprop
auxprop_plugin: sasldb
sasldb_path: $PWD/sasl_config/qpidd.sasldb
sql_select: dummy select
-mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL
+mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL CRAM-MD5
EOF
# Populate temporary sasl db.
diff --git a/cpp/src/tests/ssl_test b/cpp/src/tests/ssl_test
index 91ff8eec1e..19a316a483 100755
--- a/cpp/src/tests/ssl_test
+++ b/cpp/src/tests/ssl_test
@@ -148,6 +148,11 @@ URL=$TEST_HOSTNAME:$PORT
MSG=`./qpid-receive -b $URL --connection-options '{transport:ssl,heartbeat:2}' -a "foo;{create:always}" --messages 1`
test "$MSG" = "hello again" || { echo "receive failed '$MSG' != 'hello again'"; exit 1; }
+## Test using the Python client
+echo "Testing Non-Authenticating with Python Client..."
+URL=amqps://$TEST_HOSTNAME:$PORT
+if `$top_srcdir/src/tests/ping_broker -b $URL`; then echo " Passed"; else { echo " Failed"; exit 1; }; fi
+
#### Client Authentication tests
start_authenticating_broker
diff --git a/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh b/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh
index 592e1e60a0..9e8880e128 100755
--- a/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh
+++ b/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh
@@ -9,11 +9,16 @@ run_test() {
fi
}
-NUM_MSGS=1000
+NUM_MSGS=10000
TEST_PROG="./asyncStorePerf"
+# Default (no args)
run_test "${TEST_PROG}"
+
+# Help
run_test "${TEST_PROG} --help"
+
+# Limited combinations of major params
for q in 1 2; do
for p in 1 2; do
for c in 1 2; do
@@ -27,25 +32,3 @@ for q in 1 2; do
done
done
done
-
-
-NUM_MSGS=1000
-TEST_PROG="./jrnl2Perf"
-
-
-run_test "${TEST_PROG}"
-
-# This test returns 1, don't use run_test until this is fixed.
-cmd="${TEST_PROG} --help"
-echo $cmd
-$cmd
-
-for q in 1 2; do
- for p in 1 2; do
- for c in 1; do # BUG - this will fail for c > 1
- run_test "./jrnl2Perf --num_queues $q --num_msgs ${NUM_MSGS} --num_enq_threads_per_queue $p --num_deq_threads_per_queue $c"
- done
- done
-done
-
-#exit 0
diff --git a/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh b/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh
new file mode 100755
index 0000000000..23fe0fea9b
--- /dev/null
+++ b/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+
+run_test() {
+ local cmd=$1
+ echo $cmd
+ $cmd
+ if (( $? != 0 )); then
+ exit 1
+ fi
+}
+
+NUM_MSGS=10000
+TEST_PROG="./jrnl2Perf"
+
+# Default (no args)
+run_test "${TEST_PROG}"
+
+# Help
+# This test returns 1, don't use run_test until this is fixed.
+cmd="${TEST_PROG} --help"
+echo $cmd
+$cmd
+
+# Limited combinations of major params
+for q in 1 2; do
+ for p in 1 2; do
+ for c in 1; do # BUG - this will fail for c > 1
+ run_test "./jrnl2Perf --num_queues $q --num_msgs ${NUM_MSGS} --num_enq_threads_per_queue $p --num_deq_threads_per_queue $c"
+ done
+ done
+done
+
+#exit 0
diff --git a/cpp/src/tests/txjob.cpp b/cpp/src/tests/txjob.cpp
index a7a905c1b7..29394c3415 100644
--- a/cpp/src/tests/txjob.cpp
+++ b/cpp/src/tests/txjob.cpp
@@ -38,9 +38,9 @@ namespace tests {
struct Args : public qpid::TestOptions
{
- string workQueue;
- string source;
- string dest;
+ std::string workQueue;
+ std::string source;
+ std::string dest;
uint messages;
uint jobs;
bool quit;
diff --git a/cpp/src/tests/txshift.cpp b/cpp/src/tests/txshift.cpp
index 882d3716d8..bf85bee986 100644
--- a/cpp/src/tests/txshift.cpp
+++ b/cpp/src/tests/txshift.cpp
@@ -39,7 +39,7 @@ namespace tests {
struct Args : public qpid::TestOptions
{
- string workQueue;
+ std::string workQueue;
size_t workers;
Args() : workQueue("txshift-control"), workers(1)