diff options
| author | Gordon Sim <gsim@apache.org> | 2014-07-11 17:19:13 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-07-11 17:19:13 +0000 |
| commit | 5945995cf4de6e1a8bd9f31a3b612f87a909c598 (patch) | |
| tree | e696f5809cdbeee2f7367c25a30482ce6762cf99 /qpid/cpp/src | |
| parent | 99d8f7a3e50d86c68ee9a47e9af6dbe912535abd (diff) | |
| download | qpid-python-5945995cf4de6e1a8bd9f31a3b612f87a909c598.tar.gz | |
QPID-5887: abort transactional session on failover; added equivalent of txtest using messaging API
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1609748 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 25 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 314 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/quick_txtest | 1 |
4 files changed, 336 insertions, 10 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 8c6eef6273..b6ae9514b3 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -189,16 +189,23 @@ template <class T> void getFreeKey(std::string& key, T& map) void SessionImpl::setSession(qpid::client::Session s) { ScopedLock l(lock); - session = s; - incoming.setSession(session); - if (transactional) session.txSelect(); - for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { - getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); - } - for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { - getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); + if (session.isValid() && transactional) { + qpid::client::SessionBase_0_10Access ssn_ptr(session); + ssn_ptr.get()->setException(new TransactionAborted("Transaction aborted due to transport failure")); + } else { + session = s; + incoming.setSession(session); + if (transactional) { + session.txSelect(); + } + for (Receivers::iterator i = receivers.begin(); i != receivers.end(); ++i) { + getImplPtr<Receiver, ReceiverImpl>(i->second)->init(session, resolver); + } + for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { + getImplPtr<Sender, SenderImpl>(i->second)->init(session, resolver); + } + session.sync(); } - session.sync(); } struct SessionImpl::CreateReceiver : Command diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt index 912bbd5f37..9fa5fc88e5 100644 --- a/qpid/cpp/src/tests/CMakeLists.txt +++ b/qpid/cpp/src/tests/CMakeLists.txt @@ -110,11 +110,15 @@ target_link_libraries (qpid-txtest qpidclient qpidcommon qpidtypes) #qpid_txtest_SOURCES=qpid-txtest.cpp TestOptions.h ConnectionOptions.h remember_location(qpid-txtest) +add_executable (qpid-txtest2 qpid-txtest2.cpp ${platform_test_additions}) +target_link_libraries (qpid-txtest2 qpidmessaging qpidtypes qpidcommon) +remember_location(qpid-txtest2) + install (TARGETS qpid-perftest qpid-latency-test qpid-client-test qpid-ping qpid-topic-listener qpid-topic-publisher receiver sender - qpid-txtest + qpid-txtest qpid-txtest2 RUNTIME DESTINATION ${QPID_INSTALL_TESTDIR}) # Only build test code if testing is turned on diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp new file mode 100644 index 0000000000..a5e3f34a58 --- /dev/null +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -0,0 +1,314 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <algorithm> +#include <iomanip> +#include <iostream> +#include <memory> +#include <sstream> +#include <vector> + +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Duration.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Session.h" +#include <qpid/Options.h> +#include <qpid/log/Logger.h> +#include <qpid/log/Options.h> +#include "qpid/sys/Runnable.h" +#include "qpid/sys/Thread.h" + +using namespace qpid::messaging; +using namespace qpid::sys; + +namespace qpid { +namespace tests { + +typedef std::vector<std::string> StringSet; + +struct Options : public qpid::Options { + bool help; + bool init, transfer, check;//actions + uint size; + bool durable; + uint queues; + std::string base; + uint msgsPerTx; + uint txCount; + uint totalMsgCount; + bool dtx; + uint capacity; + std::string url; + std::string connectionOptions; + qpid::log::Options log; + bool quiet; + + Options() : help(false), init(true), transfer(true), check(true), + size(256), durable(true), queues(2), + base("tx-test2"), msgsPerTx(1), txCount(5), totalMsgCount(10), + capacity(1000), url("localhost"), quiet(false) + { + addOptions() + ("init", qpid::optValue(init, "yes|no"), "Declare queues and populate one with the initial set of messages.") + ("transfer", qpid::optValue(transfer, "yes|no"), "'Move' messages from one queue to another using transactions to ensure no message loss.") + ("check", qpid::optValue(check, "yes|no"), "Check that the initial messages are all still available.") + ("size", qpid::optValue(size, "N"), "message size") + ("durable", qpid::optValue(durable, "yes|no"), "use durable messages") + ("queues", qpid::optValue(queues, "N"), "number of queues") + ("queue-base-name", qpid::optValue(base, "<name>"), "base name for queues") + ("messages-per-tx", qpid::optValue(msgsPerTx, "N"), "number of messages transferred per transaction") + ("tx-count", qpid::optValue(txCount, "N"), "number of transactions per 'agent'") + ("total-messages", qpid::optValue(totalMsgCount, "N"), "total number of messages in 'circulation'") + ("capacity", qpid::optValue(capacity, "N"), "Pre-fetch window (0 implies no pre-fetch)") + ("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to") + ("connection-options", qpid::optValue(connectionOptions, "OPTIONS"), "options for the connection") + ("quiet", qpid::optValue(quiet), "reduce output from test") + ("help", qpid::optValue(help), "print this usage statement"); + add(log); + } + bool parse(int argc, char** argv) + { + try { + qpid::Options::parse(argc, argv); + qpid::log::Logger::instance().configure(log); + if (help) { + std::cout << *this << std::endl << std::endl + << "Transactionally moves messages between queues" << std::endl; + return false; + } else { + return true; + } + } catch (const std::exception& e) { + std::cerr << *this << std::endl << std::endl << e.what() << std::endl; + return false; + } + } +}; + +const std::string chars("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"); + +std::string generateData(uint size) +{ + if (size < chars.length()) { + return chars.substr(0, size); + } + std::string data; + for (uint i = 0; i < (size / chars.length()); i++) { + data += chars; + } + data += chars.substr(0, size % chars.length()); + return data; +} + +void generateSet(const std::string& base, uint count, StringSet& collection) +{ + for (uint i = 0; i < count; i++) { + std::ostringstream out; + out << base << "-" << (i+1); + collection.push_back(out.str()); + } +} + +struct Client +{ + const Options& opts; + Connection connection; + Session session; + + Client(const Options& o, bool transactional=false) : opts(o), connection(opts.url, opts.connectionOptions) + { + connection.open(); + session = transactional ? connection.createTransactionalSession() : connection.createSession(); + } + + virtual ~Client() + { + try { + session.close(); + connection.close(); + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + } +}; + +struct TransactionalClient : Client +{ + TransactionalClient(const Options& o) : Client(o, true) {} + virtual ~TransactionalClient() {} +}; + +struct Transfer : public TransactionalClient, public Runnable +{ + const std::string target; + const std::string source; + Thread thread; + + Transfer(const std::string& to, const std::string& from, const Options& opts) : TransactionalClient(opts), target(to), source(from) {} + + void run() + { + try { + Sender sender(session.createSender(target)); + Receiver receiver(session.createReceiver(source)); + receiver.setCapacity(opts.capacity); + for (uint t = 0; t < opts.txCount; t++) { + for (uint m = 0; m < opts.msgsPerTx; m++) { + Message msg = receiver.fetch(Duration::SECOND*30); + if (msg.getContentSize() != opts.size) { + std::ostringstream oss; + oss << "Message size incorrect: size=" << msg.getContentSize() << "; expected " << opts.size; + throw std::runtime_error(oss.str()); + } + sender.send(msg); + } + QPID_LOG(info, "Moved " << opts.msgsPerTx << " from " << source << " to " << target); + session.commit(); + } + sender.close(); + receiver.close(); + } catch(const std::exception& e) { + std::cout << "Transfer interrupted: " << e.what() << std::endl; + } + } +}; + +namespace { +const std::string CREATE_DURABLE("; {create:always, node:{durable:True}}"); +const std::string CREATE_NON_DURABLE("; {create:always}"); +} + +struct Controller : public Client +{ + StringSet ids; + StringSet queues; + + Controller(const Options& opts) : Client(opts) + { + generateSet(opts.base, opts.queues, queues); + generateSet("msg", opts.totalMsgCount, ids); + } + + void init() + { + Message msg(generateData(opts.size)); + msg.setDurable(opts.durable); + + for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { + std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE); + Sender sender = session.createSender(address); + if (i == queues.begin()) { + for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { + msg.setCorrelationId(*i); + sender.send(msg); + } + } + sender.close(); + } + } + + void transfer() + { + boost::ptr_vector<Transfer> agents(opts.queues); + //launch transfer agents + for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { + StringSet::iterator next = i + 1; + if (next == queues.end()) next = queues.begin(); + + if (!opts.quiet) std::cout << "Transfering from " << *i << " to " << *next << std::endl; + agents.push_back(new Transfer(*i, *next, opts)); + agents.back().thread = Thread(agents.back()); + } + + for (boost::ptr_vector<Transfer>::iterator i = agents.begin(); i != agents.end(); i++) { + i->thread.join(); + } + } + + int check() + { + StringSet drained; + //drain each queue and verify the correct set of messages are available + for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { + Receiver receiver = session.createReceiver(*i); + uint count(0); + Message msg; + while (receiver.fetch(msg, Duration::IMMEDIATE)) { + //add correlation ids of received messages to drained + drained.push_back(msg.getCorrelationId()); + ++count; + } + receiver.close(); + if (!opts.quiet) std::cout << "Drained " << count << " messages from " << *i << std::endl; + } + + sort(ids.begin(), ids.end()); + sort(drained.begin(), drained.end()); + + //check that drained == ids + StringSet missing; + set_difference(ids.begin(), ids.end(), drained.begin(), drained.end(), back_inserter(missing)); + + StringSet extra; + set_difference(drained.begin(), drained.end(), ids.begin(), ids.end(), back_inserter(extra)); + + if (missing.empty() && extra.empty()) { + std::cout << "All expected messages were retrieved." << std::endl; + return 0; + } else { + if (!missing.empty()) { + std::cout << "The following ids were missing:" << std::endl; + for (StringSet::iterator i = missing.begin(); i != missing.end(); i++) { + std::cout << " '" << *i << "'" << std::endl; + } + } + if (!extra.empty()) { + std::cout << "The following extra ids were encountered:" << std::endl; + for (StringSet::iterator i = extra.begin(); i != extra.end(); i++) { + std::cout << " '" << *i << "'" << std::endl; + } + } + return 1; + } + } +}; +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int argc, char** argv) +{ + try { + Options opts; + if (opts.parse(argc, argv)) { + Controller controller(opts); + if (opts.init) controller.init(); + if (opts.transfer) controller.transfer(); + if (opts.check) return controller.check(); + } + return 0; + } catch(const std::exception& e) { + std::cout << e.what() << std::endl; + } + return 2; +} diff --git a/qpid/cpp/src/tests/quick_txtest b/qpid/cpp/src/tests/quick_txtest index 77e8556f1d..95cd4bfd18 100755 --- a/qpid/cpp/src/tests/quick_txtest +++ b/qpid/cpp/src/tests/quick_txtest @@ -20,3 +20,4 @@ # exec `dirname $0`/run_test ./qpid-txtest --queues 4 --tx-count 10 --quiet +exec `dirname $0`/run_test ./qpid-txtest2 --queues 4 --tx-count 10 --quiet |
