summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-07-11 17:19:13 +0000
committerGordon Sim <gsim@apache.org>2014-07-11 17:19:13 +0000
commit5945995cf4de6e1a8bd9f31a3b612f87a909c598 (patch)
treee696f5809cdbeee2f7367c25a30482ce6762cf99 /qpid/cpp/src
parent99d8f7a3e50d86c68ee9a47e9af6dbe912535abd (diff)
downloadqpid-python-5945995cf4de6e1a8bd9f31a3b612f87a909c598.tar.gz
QPID-5887: abort transactional session on failover; added equivalent of txtest using messaging API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1609748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp25
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt6
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp314
-rwxr-xr-xqpid/cpp/src/tests/quick_txtest1
4 files changed, 336 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 8c6eef6273..b6ae9514b3 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -189,16 +189,23 @@ template <class T> void getFreeKey(std::string& key, T& map)
void SessionImpl::setSession(qpid::client::Session s)
{
ScopedLock l(lock);
- session = s;
- incoming.setSession(session);
- if (transactional) session.txSelect();
- for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
- getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
- }
- for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
- getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
+ if (session.isValid() && transactional) {
+ qpid::client::SessionBase_0_10Access ssn_ptr(session);
+ ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure"));
+ } else {
+ session = s;
+ incoming.setSession(session);
+ if (transactional) {
+ session.txSelect();
+ }
+ for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver);
+ }
+ for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) {
+ getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver);
+ }
+ session.sync();
}
- session.sync();
}
struct SessionImpl::CreateReceiver : Command
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index 912bbd5f37..9fa5fc88e5 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -110,11 +110,15 @@ target_link_libraries (qpid-txtest qpidclient qpidcommon qpidtypes)
#qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h
remember_location(qpid-txtest)
+add_executable (qpid-txtest2 qpid-txtest2.cpp ${platform_test_additions})
+target_link_libraries (qpid-txtest2 qpidmessaging qpidtypes qpidcommon)
+remember_location(qpid-txtest2)
+
install (TARGETS
qpid-perftest qpid-latency-test qpid-client-test
qpid-ping
qpid-topic-listener qpid-topic-publisher receiver sender
- qpid-txtest
+ qpid-txtest qpid-txtest2
RUNTIME DESTINATION ${QPID_INSTALL_TESTDIR})
# Only build test code if testing is turned on
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
new file mode 100644
index 0000000000..a5e3f34a58
--- /dev/null
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -0,0 +1,314 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <algorithm>
+#include <iomanip>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <vector>
+
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Duration.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Session.h"
+#include <qpid/Options.h>
+#include <qpid/log/Logger.h>
+#include <qpid/log/Options.h>
+#include "qpid/sys/Runnable.h"
+#include "qpid/sys/Thread.h"
+
+using namespace qpid::messaging;
+using namespace qpid::sys;
+
+namespace qpid {
+namespace tests {
+
+typedef std::vector<std::string> StringSet;
+
+struct Options : public qpid::Options {
+ bool help;
+ bool init, transfer, check;//actions
+ uint size;
+ bool durable;
+ uint queues;
+ std::string base;
+ uint msgsPerTx;
+ uint txCount;
+ uint totalMsgCount;
+ bool dtx;
+ uint capacity;
+ std::string url;
+ std::string connectionOptions;
+ qpid::log::Options log;
+ bool quiet;
+
+ Options() : help(false), init(true), transfer(true), check(true),
+ size(256), durable(true), queues(2),
+ base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10),
+ capacity(1000), url("localhost"), quiet(false)
+ {
+ addOptions()
+ ("init", qpid::optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.")
+ ("transfer", qpid::optValue(transfer, "yes|no"), "'Move' messages from one queue to another using transactions to ensure no message loss.")
+ ("check", qpid::optValue(check, "yes|no"), "Check that the initial messages are all still available.")
+ ("size", qpid::optValue(size, "N"), "message size")
+ ("durable", qpid::optValue(durable, "yes|no"), "use durable messages")
+ ("queues", qpid::optValue(queues, "N"), "number of queues")
+ ("queue-base-name", qpid::optValue(base, "<name>"), "base name for queues")
+ ("messages-per-tx", qpid::optValue(msgsPerTx, "N"), "number of messages transferred per transaction")
+ ("tx-count", qpid::optValue(txCount, "N"), "number of transactions per 'agent'")
+ ("total-messages", qpid::optValue(totalMsgCount, "N"), "total number of messages in 'circulation'")
+ ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)")
+ ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
+ ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection")
+ ("quiet", qpid::optValue(quiet), "reduce output from test")
+ ("help", qpid::optValue(help), "print this usage statement");
+ add(log);
+ }
+ bool parse(int argc, char** argv)
+ {
+ try {
+ qpid::Options::parse(argc, argv);
+ qpid::log::Logger::instance().configure(log);
+ if (help) {
+ std::cout << *this << std::endl << std::endl
+ << "Transactionally moves messages between queues" << std::endl;
+ return false;
+ } else {
+ return true;
+ }
+ } catch (const std::exception& e) {
+ std::cerr << *this << std::endl << std::endl << e.what() << std::endl;
+ return false;
+ }
+ }
+};
+
+const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+
+std::string generateData(uint size)
+{
+ if (size < chars.length()) {
+ return chars.substr(0, size);
+ }
+ std::string data;
+ for (uint i = 0; i < (size / chars.length()); i++) {
+ data += chars;
+ }
+ data += chars.substr(0, size % chars.length());
+ return data;
+}
+
+void generateSet(const std::string& base, uint count, StringSet& collection)
+{
+ for (uint i = 0; i < count; i++) {
+ std::ostringstream out;
+ out << base << "-" << (i+1);
+ collection.push_back(out.str());
+ }
+}
+
+struct Client
+{
+ const Options& opts;
+ Connection connection;
+ Session session;
+
+ Client(const Options& o, bool transactional=false) : opts(o), connection(opts.url, opts.connectionOptions)
+ {
+ connection.open();
+ session = transactional ? connection.createTransactionalSession() : connection.createSession();
+ }
+
+ virtual ~Client()
+ {
+ try {
+ session.close();
+ connection.close();
+ } catch(const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ }
+ }
+};
+
+struct TransactionalClient : Client
+{
+ TransactionalClient(const Options& o) : Client(o, true) {}
+ virtual ~TransactionalClient() {}
+};
+
+struct Transfer : public TransactionalClient, public Runnable
+{
+ const std::string target;
+ const std::string source;
+ Thread thread;
+
+ Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {}
+
+ void run()
+ {
+ try {
+ Sender sender(session.createSender(target));
+ Receiver receiver(session.createReceiver(source));
+ receiver.setCapacity(opts.capacity);
+ for (uint t = 0; t < opts.txCount; t++) {
+ for (uint m = 0; m < opts.msgsPerTx; m++) {
+ Message msg = receiver.fetch(Duration::SECOND*30);
+ if (msg.getContentSize() != opts.size) {
+ std::ostringstream oss;
+ oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size;
+ throw std::runtime_error(oss.str());
+ }
+ sender.send(msg);
+ }
+ QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << source << " to " << target);
+ session.commit();
+ }
+ sender.close();
+ receiver.close();
+ } catch(const std::exception& e) {
+ std::cout << "Transfer interrupted: " << e.what() << std::endl;
+ }
+ }
+};
+
+namespace {
+const std::string CREATE_DURABLE("; {create:always, node:{durable:True}}");
+const std::string CREATE_NON_DURABLE("; {create:always}");
+}
+
+struct Controller : public Client
+{
+ StringSet ids;
+ StringSet queues;
+
+ Controller(const Options& opts) : Client(opts)
+ {
+ generateSet(opts.base, opts.queues, queues);
+ generateSet("msg", opts.totalMsgCount, ids);
+ }
+
+ void init()
+ {
+ Message msg(generateData(opts.size));
+ msg.setDurable(opts.durable);
+
+ for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
+ std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE);
+ Sender sender = session.createSender(address);
+ if (i == queues.begin()) {
+ for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {
+ msg.setCorrelationId(*i);
+ sender.send(msg);
+ }
+ }
+ sender.close();
+ }
+ }
+
+ void transfer()
+ {
+ boost::ptr_vector<Transfer> agents(opts.queues);
+ //launch transfer agents
+ for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
+ StringSet::iterator next = i + 1;
+ if (next == queues.end()) next = queues.begin();
+
+ if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl;
+ agents.push_back(new Transfer(*i, *next, opts));
+ agents.back().thread = Thread(agents.back());
+ }
+
+ for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) {
+ i->thread.join();
+ }
+ }
+
+ int check()
+ {
+ StringSet drained;
+ //drain each queue and verify the correct set of messages are available
+ for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
+ Receiver receiver = session.createReceiver(*i);
+ uint count(0);
+ Message msg;
+ while (receiver.fetch(msg, Duration::IMMEDIATE)) {
+ //add correlation ids of received messages to drained
+ drained.push_back(msg.getCorrelationId());
+ ++count;
+ }
+ receiver.close();
+ if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl;
+ }
+
+ sort(ids.begin(), ids.end());
+ sort(drained.begin(), drained.end());
+
+ //check that drained == ids
+ StringSet missing;
+ set_difference(ids.begin(), ids.end(), drained.begin(), drained.end(), back_inserter(missing));
+
+ StringSet extra;
+ set_difference(drained.begin(), drained.end(), ids.begin(), ids.end(), back_inserter(extra));
+
+ if (missing.empty() && extra.empty()) {
+ std::cout << "All expected messages were retrieved." << std::endl;
+ return 0;
+ } else {
+ if (!missing.empty()) {
+ std::cout << "The following ids were missing:" << std::endl;
+ for (StringSet::iterator i = missing.begin(); i != missing.end(); i++) {
+ std::cout << " '" << *i << "'" << std::endl;
+ }
+ }
+ if (!extra.empty()) {
+ std::cout << "The following extra ids were encountered:" << std::endl;
+ for (StringSet::iterator i = extra.begin(); i != extra.end(); i++) {
+ std::cout << " '" << *i << "'" << std::endl;
+ }
+ }
+ return 1;
+ }
+ }
+};
+}} // namespace qpid::tests
+
+using namespace qpid::tests;
+
+int main(int argc, char** argv)
+{
+ try {
+ Options opts;
+ if (opts.parse(argc, argv)) {
+ Controller controller(opts);
+ if (opts.init) controller.init();
+ if (opts.transfer) controller.transfer();
+ if (opts.check) return controller.check();
+ }
+ return 0;
+ } catch(const std::exception& e) {
+ std::cout << e.what() << std::endl;
+ }
+ return 2;
+}
diff --git a/qpid/cpp/src/tests/quick_txtest b/qpid/cpp/src/tests/quick_txtest
index 77e8556f1d..95cd4bfd18 100755
--- a/qpid/cpp/src/tests/quick_txtest
+++ b/qpid/cpp/src/tests/quick_txtest
@@ -20,3 +20,4 @@
#
exec `dirname $0`/run_test ./qpid-txtest --queues 4 --tx-count 10 --quiet
+exec `dirname $0`/run_test ./qpid-txtest2 --queues 4 --tx-count 10 --quiet