summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp23
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h2
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp9
-rw-r--r--qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp6
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp24
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h7
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp1
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py112
9 files changed, 89 insertions, 97 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 503de3e351..93ad5ec381 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -100,8 +100,8 @@ Role* Backup::recover(Mutex::ScopedLock&) {
// Reset membership before allowing backups to connect.
backups = membership.otherBackups();
membership.clear();
- return new Primary(haBroker, backups);
}
+ return new Primary(haBroker, backups);
}
Role* Backup::promote() {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 5e8da17a1b..1587b5b33f 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -865,27 +865,14 @@ bool BrokerReplicator::hasBindings() { return false; }
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
-void BrokerReplicator::disconnectedExchange(boost::shared_ptr<Exchange> ex) {
+void BrokerReplicator::disconnectedQueueReplicator(boost::shared_ptr<Exchange> ex) {
boost::shared_ptr<QueueReplicator> qr(boost::dynamic_pointer_cast<QueueReplicator>(ex));
- // FIXME aconway 2013-11-01: move logic with releaseFromUse to QueueReplicator
if (qr) {
qr->disconnect();
if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
// Transactions are aborted on failover so clean up tx-queues
deleteQueue(qr->getQueue()->getName());
}
- else if (qr->getQueue()->isAutoDelete() && qr->isSubscribed()) {
- if (qr->getQueue()->getSettings().autoDeleteDelay) {
- // Start the auto-delete timer
- qr->getQueue()->releaseFromUse();
- qr->getQueue()->scheduleAutoDelete();
- }
- else {
- // Delete immediately. Don't purge, the primary is gone so we need
- // to reroute the deleted messages.
- deleteQueue(qr->getQueue()->getName(), false);
- }
- }
}
}
@@ -893,9 +880,9 @@ typedef vector<boost::shared_ptr<Exchange> > ExchangeVector;
// Callback function for accumulating exchange candidates
namespace {
- void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
- ev.push_back(i);
- }
+void exchangeAccumulatorCallback(ExchangeVector& ev, const Exchange::shared_ptr& i) {
+ ev.push_back(i);
+}
}
// Called by ConnectionObserver::disconnected, disconnected from the network side.
@@ -907,7 +894,7 @@ void BrokerReplicator::disconnected() {
ExchangeVector exs;
exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
for_each(exs.begin(), exs.end(),
- boost::bind(&BrokerReplicator::disconnectedExchange, this, _1));
+ boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
}
void BrokerReplicator::setMembership(const Variant::List& brokers) {
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index e319ab1219..b3e3fe3223 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -148,7 +148,7 @@ class BrokerReplicator : public broker::Exchange,
void deleteQueue(const std::string& name, bool purge=true);
void deleteExchange(const std::string& name);
- void disconnectedExchange(boost::shared_ptr<broker::Exchange>);
+ void disconnectedQueueReplicator(boost::shared_ptr<broker::Exchange>);
void disconnected();
void setMembership(const types::Variant::List&); // Set membership from list.
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index 0c1858ceb1..0c0fe983bb 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -94,7 +94,16 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
logPrefix("Primary: "), active(false),
replicationTest(hb.getSettings().replicateDefault.get())
{
+ // Note that at this point, we are still rejecting client connections.
+ // So we are safe from client interference while we set up the primary.
+
hb.getMembership().setStatus(RECOVERING);
+
+ // Process all QueueReplicators, handles auto-delete queues.
+ QueueReplicator::Vector qrs;
+ QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
+ std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
+
broker::QueueRegistry& queues = hb.getBroker().getQueues();
queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
if (expect.empty()) {
diff --git a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
index a32334bcf9..eeb3312aec 100644
--- a/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
@@ -26,7 +26,6 @@
#include "QueueGuard.h"
#include "RemoteBackup.h"
#include "ReplicatingSubscription.h"
-#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
@@ -121,7 +120,7 @@ void PrimaryTxObserver::initialize() {
throw InvalidArgumentException(
QPID_MSG(logPrefix << "TX replication queue already exists."));
txQueue = result.first;
- txQueue->markInUse(true); // Prevent auto-delete till we are done.
+ txQueue->markInUse(); // Prevent auto-delete till we are done.
txQueue->deliver(TxBackupsEvent(backups).message());
}
@@ -228,7 +227,8 @@ void PrimaryTxObserver::end(Mutex::ScopedLock&) {
// If there are no outstanding completions, break pointer cycle here.
// Otherwise break it in cancel() when the remaining completions are done.
if (incomplete.empty()) txBuffer = 0;
- txQueue->releaseFromUse(true); // txQueue will auto-delete
+ txQueue->releaseFromUse(); // txQueue will auto-delete
+ txQueue->scheduleAutoDelete();
txQueue.reset();
try {
broker.getExchanges().destroy(getExchangeName());
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index cc6c8a3f30..50f2ececdb 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -50,6 +50,7 @@ using namespace framing::execution;
using namespace std;
using std::exception;
using sys::Mutex;
+using boost::shared_ptr;
const std::string QueueReplicator::QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -61,6 +62,17 @@ bool QueueReplicator::isReplicatorName(const std::string& name) {
return startsWith(name, QUEUE_REPLICATOR_PREFIX);
}
+namespace {
+void pushIfQr(QueueReplicator::Vector& v, const shared_ptr<Exchange>& ex) {
+ shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
+ if (qr) v.push_back(qr);
+}
+}
+
+void QueueReplicator::copy(ExchangeRegistry& registry, Vector& result) {
+ registry.eachExchange(boost::bind(&pushIfQr, boost::ref(result), _1));
+}
+
class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
public:
ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
@@ -116,6 +128,7 @@ QueueReplicator::QueueReplicator(HaBroker& hb,
framing::FieldTable args = getArgs();
args.setString(QPID_REPLICATE, printable(NONE).str());
setArgs(args);
+ // Don't allow backup queues to auto-delete, primary decides when to delete.
if (q->isAutoDelete()) q->markInUse();
dispatch[DequeueEvent::KEY] =
@@ -306,5 +319,16 @@ bool QueueReplicator::isBound(boost::shared_ptr<Queue>, const std::string* const
bool QueueReplicator::hasBindings() { return false; }
std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
+void QueueReplicator::promoted() {
+ // Promoted to primary, deal with auto-delete now.
+ if (queue && queue->isAutoDelete() && subscribed) {
+ // Make a temporary shared_ptr to prevent premature deletion of queue.
+ // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+ // which could delete the queue while it's still running it's destroyed logic.
+ boost::shared_ptr<Queue> q(queue);
+ q->releaseFromUse();
+ q->scheduleAutoDelete();
+ }
+}
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 6fd140fde3..8938285fe3 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -38,6 +38,7 @@ class Queue;
class QueueRegistry;
class SessionHandler;
class Deliverable;
+class ExchangeRegistry;
}
namespace ha {
@@ -59,9 +60,12 @@ class QueueReplicator : public broker::Exchange,
public:
static const std::string QPID_SYNC_FREQUENCY;
static const std::string REPLICATOR_PREFIX;
+ typedef std::vector<boost::shared_ptr<QueueReplicator> > Vector;
static std::string replicatorName(const std::string& queueName);
static bool isReplicatorName(const std::string&);
+ /*** Copy QueueReplicators from the registry */
+ static void copy(broker::ExchangeRegistry&, Vector& result);
QueueReplicator(HaBroker&,
boost::shared_ptr<broker::Queue> q,
@@ -78,7 +82,6 @@ class QueueReplicator : public broker::Exchange,
// Set if the queue has ever been subscribed to, used for auto-delete cleanup.
void setSubscribed() { subscribed = true; }
- bool isSubscribed() { return subscribed; }
boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
@@ -90,6 +93,8 @@ class QueueReplicator : public broker::Exchange,
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
bool hasBindings();
+ void promoted();
+
protected:
typedef boost::function<void(const std::string&, sys::Mutex::ScopedLock&)> DispatchFn;
typedef qpid::sys::unordered_map<std::string, DispatchFn> DispatchMap;
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index d0b93da85f..95215e1e59 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -22,7 +22,6 @@
#include "Event.h"
#include "IdSetter.h"
#include "QueueGuard.h"
-#include "QueueReplicator.h"
#include "QueueSnapshots.h"
#include "ReplicatingSubscription.h"
#include "TxReplicatingSubscription.h"
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 138868f64e..1a5d6ddff8 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -34,6 +34,14 @@ log = getLogger(__name__)
class HaBrokerTest(BrokerTest):
"""Base class for HA broker tests"""
+def alt_setup(session, suffix):
+ # Create exchange to use as alternate and a queue bound to it.
+ # altex exchange: acts as alternate exchange
+ session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+ # altq queue bound to altex, collect re-routed messages.
+ session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+
class ReplicationTests(HaBrokerTest):
"""Correctness tests for HA replication."""
@@ -718,19 +726,44 @@ acl deny all all
except NotFound: pass
assert not cluster[1].agent().getQueue("q") # Should not be in QMF
- def alt_setup(self, session, suffix):
- # Create exchange to use as alternate and a queue bound to it.
- # altex exchange: acts as alternate exchange
- session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
- # altq queue bound to altex, collect re-routed messages.
- session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+ def test_auto_delete_failover(self):
+ """Test auto-delete queues. Verify that:
+ - queues auto-deleted on the primary are deleted on the backup.
+ - auto-delete queues with/without timeout are deleted after a failover.
+ - messages are correctly routed to the alternate exchange.
+ """
+ cluster = HaCluster(self, 3)
+ s = cluster[0].connect().session()
+ def setup(q, timeout=""):
+ if timeout: timeout = ",arguments:{'qpid.auto_delete_timeout':%s}"%timeout
+ # Create alternate exchange, auto-delete queue and queue bound to alt. ex.
+ s.sender("%s-altex;{create:always,node:{type:topic,x-declare:{type:fanout}}}"%q)
+ qs = s.sender("%s;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:%s-altex%s}}}"%(q,q,timeout))
+ s.sender("%s-altq;{create:always,node:{x-bindings:[{exchange:%s-altex,queue:%s-altq}]}}"%(q,q,q))
+ qs.send(q) # Send a message to the auto-delete queue
+ return s
+
+ for args in [("q1",""),("q2","0"),("q3","1"),("q4",""),("q5","")]: setup(*args)
+ receivers = [s.receiver("q%s"%i) for i in [1,2,3,4]] # Subscribe to queues
+ # Note q5 is never subscribed to, so should not be auto-deleted.
+ receivers[3].close() # Trigger auto-delete for q4
+ cluster[0].kill(final=False)
+ cluster[2].promote()
+ cluster.restart(0)
+ cluster[2].assert_browse("q3",["q3"]) # Not yet auto-deleted, 1 sec timeout.
+ for i in [2,1,0]:
+ for q in ["q1", "q2", "q3","q4"]:
+ cluster[i].wait_no_queue(q,timeout=2) # auto-deleted
+ cluster[i].assert_browse_backup("%s-altq"%q, [q]) # Routed to alternate
+ cluster[i].assert_browse_backup("q5", ["q5"]) # Never subscribed, not deleted.
+ cluster[i].assert_browse_backup("q5-altq", [])
def test_auto_delete_close(self):
"""Verify auto-delete queues are deleted on backup if auto-deleted
on primary"""
cluster=HaCluster(self, 2)
p = cluster[0].connect().session()
- self.alt_setup(p, "1")
+ alt_setup(p, "1")
r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
s = p.sender("adq1")
for m in ["aa","bb","cc"]: s.send(m)
@@ -742,71 +775,6 @@ acl deny all all
cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
cluster[1].wait_queue("adq2")
- def test_auto_delete_crash(self):
- """Verify auto-delete queues are deleted on backup if the primary crashes"""
- cluster=HaCluster(self, 2)
- p = cluster[0].connect().session()
- self.alt_setup(p,"1")
-
- # adq1 is subscribed so will be auto-deleted.
- r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
- s = p.sender("adq1")
- for m in ["aa","bb","cc"]: s.send(m)
- # adq2 is subscribed after cluster[2] starts.
- p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
- # adq3 is never subscribed.
- p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
-
- cluster.start()
- cluster[2].wait_status("ready")
-
- p.receiver("adq2") # Subscribed after cluster[2] joined
-
- for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
- for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
- cluster[0].kill()
-
- cluster[1].wait_no_queue("adq1")
- cluster[1].wait_no_queue("adq2")
- cluster[1].wait_queue("adq3")
-
- cluster[2].wait_no_queue("adq1")
- cluster[2].wait_no_queue("adq2")
- cluster[2].wait_queue("adq3")
-
- cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
- cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
-
- def test_auto_delete_timeout(self):
- cluster = HaCluster(self, 2)
- # Test timeout
- r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
- # Test special case of timeout = 0
- r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}")
- cluster[1].wait_queue("q0")
- cluster[1].wait_queue("q1")
- cluster[0].kill()
- cluster[1].wait_queue("q1") # Not timed out yet
- cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout
- cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout
-
- def test_alt_exchange_dup(self):
- """QPID-4349: if a queue has an alterante exchange and is deleted the
- messages appear twice on the alternate, they are rerouted once by the
- primary and again by the backup."""
- cluster = HaCluster(self,2)
-
- # Set up q with alternate exchange altex bound to altq.
- s = cluster[0].connect().session()
- s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
- s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
- snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}")
- messages = [ str(n) for n in xrange(10) ]
- for m in messages: snd.send(m)
- cluster[1].assert_browse_backup("q", messages)
- s.sender("q;{delete:always}").close()
- cluster[1].assert_browse_backup("altq", messages)
-
def test_expired(self):
"""Regression test for QPID-4379: HA does not properly handle expired messages"""
# Race between messages expiring and HA replicating consumer.