diff options
| author | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:15 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2014-08-08 09:24:15 +0000 |
| commit | 2602ecaf16a3ddf424383214da2ea846634c083f (patch) | |
| tree | cbe7e6a423e2d521c2ebce63a479f2a4e3074ae9 /qpid/cpp/src/tests/qpid-txtest2.cpp | |
| parent | a833f714a4de983bce8fb1c2f6b87070bd3b4309 (diff) | |
| download | qpid-python-2602ecaf16a3ddf424383214da2ea846634c083f.tar.gz | |
QPID-5966: HA mixing tx enqueue and non-tx dequeue leaves extra messages on backup.
There were several problems:
1. Positions of transactionally enqueued messages not known to QueueReplicator, so not dequeued
on backup if dequeued outside a TX on primary.
2. Race condition if tx created immediately after queue could cause duplication of TX message.
3. Replication IDs were not being set during recovery from store (regression, store change?)
Fix:
1. Update positions QueueReplicator positions via QueueObserver::enqueued to see all enqueues.
2. Check for duplicate replication-ids on backup in QueueReplicator::route.
3. Set replication-id in publish() if not already set in record().
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1616704 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests/qpid-txtest2.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/qpid-txtest2.cpp | 12 |
1 files changed, 11 insertions, 1 deletions
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp index a744d07a12..3d9941baee 100644 --- a/qpid/cpp/src/tests/qpid-txtest2.cpp +++ b/qpid/cpp/src/tests/qpid-txtest2.cpp @@ -205,7 +205,7 @@ struct Transfer : public TransactionalClient, public Runnable } session.commit(); t++; - if (!opts.quiet && t % 10 == 0) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; + if (!opts.quiet) std::cout << "Transaction " << t << " of " << opts.txCount << " committed successfully" << std::endl; } catch (const TransactionAborted&) { std::cout << "Transaction " << (t+1) << " of " << opts.txCount << " was aborted and will be retried" << std::endl; session = connection.createTransactionalSession(); @@ -246,6 +246,16 @@ struct Controller : public Client for (StringSet::iterator i = queues.begin(); i != queues.end(); i++) { std::string address = *i + (opts.durable ? CREATE_DURABLE : CREATE_NON_DURABLE); + + // Clear out any garbage on queues. + Receiver receiver = session.createReceiver(address); + Message rmsg; + uint count(0); + while (receiver.fetch(rmsg, Duration::IMMEDIATE)) ++count; + session.acknowledge(); + receiver.close(); + if (!opts.quiet) std::cout << "Cleaned up " << count << " messages from " << *i << std::endl; + Sender sender = session.createSender(address); if (i == queues.begin()) { for (StringSet::iterator i = ids.begin(); i != ids.end(); i++) { |
