summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/qpid-txtest2.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2014-08-08 09:24:15 +0000
committerAlan Conway <aconway@apache.org>2014-08-08 09:24:15 +0000
commit2602ecaf16a3ddf424383214da2ea846634c083f (patch)
treecbe7e6a423e2d521c2ebce63a479f2a4e3074ae9 /qpid/cpp/src/tests/qpid-txtest2.cpp
parenta833f714a4de983bce8fb1c2f6b87070bd3b4309 (diff)
downloadqpid-python-2602ecaf16a3ddf424383214da2ea846634c083f.tar.gz
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.
There were several problems: 1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued on backup if dequeued outside a TX on primary. 2. Race condition if tx created immediately after queue could cause duplication of TX message. 3. Replication IDs were not being set during recovery from store (regression, store change?) Fix: 1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues. 2. Check for duplicate replication-ids on backup in QueueReplicator::route. 3. Set replication-id in publish() if not already set in record(). git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-txtest2.cpp')
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp12
1 files changed, 11 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index a744d07a12..3d9941baee 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -205,7 +205,7 @@ struct Transfer : public TransactionalClient, public Runnable
}
session.commit();
t++;
- if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
+ if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl;
} catch (const TransactionAborted&) {
std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl;
session = connection.createTransactionalSession();
@@ -246,6 +246,16 @@ struct Controller : public Client
for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) {
std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE);
+
+ // Clear out any garbage on queues.
+ Receiver receiver = session.createReceiver(address);
+ Message rmsg;
+ uint count(0);
+ while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count;
+ session.acknowledge();
+ receiver.close();
+ if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl;
+
Sender sender = session.createSender(address);
if (i == queues.begin()) {
for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) {