diff options
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.cpp | 24 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/BrokerReplicator.h | 8 |
3 files changed, 24 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index db877043e9..f133ecd5c6 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -92,7 +92,6 @@ void Backup::stop(Mutex::ScopedLock&) { QPID_LOG(debug, logPrefix << "Leaving backup role."); if (link) link->close(); if (replicator.get()) { - broker.getExchanges().destroy(replicator->getName()); replicator->shutdown(); replicator.reset(); } diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 37c2a2d6b4..9ab6628e17 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -287,8 +287,8 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& alternates(hb.getBroker().getExchanges()), connection(0) { - broker.getConnectionObservers().add( - boost::shared_ptr<broker::ConnectionObserver>(new ConnectionObserver(*this))); + connectionObserver.reset(new ConnectionObserver(*this)); + broker.getConnectionObservers().add(connectionObserver); framing::FieldTable args = getArgs(); args.setString(QPID_REPLICATE, printable(NONE).str()); setArgs(args); @@ -320,10 +320,11 @@ void BrokerReplicator::initialize() { "", // excludes false, // dynamic 0, // sync? - // shared_ptr keeps this in memory until outstanding initializeBridge + // shared_ptr keeps this in memory until outstanding connected // calls are run. - boost::bind(&BrokerReplicator::initializeBridge, shared_from_this(), _1, _2) + boost::bind(&BrokerReplicator::connected, shared_from_this(), _1, _2) ); + assert(result.second); result.first->setErrorListener( boost::shared_ptr<ErrorListener>(new ErrorListener(logPrefix, *this))); } @@ -339,10 +340,21 @@ void collectQueueReplicators( } } // namespace -void BrokerReplicator::shutdown() {} +void BrokerReplicator::shutdown() { + // NOTE: this is called in a QMF dispatch thread, not the Link's connection + // thread. It's OK to be unlocked because it doesn't use any mutable state, + // it only calls thread safe functions objects belonging to the Broker. + + // Unregister with broker objects: + if (connectionObserver) { + broker.getConnectionObservers().remove(connectionObserver); + connectionObserver.reset(); + } + broker.getExchanges().destroy(getName()); +} // This is called in the connection IO thread when the bridge is started. -void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { +void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) { // Use the credentials of the outgoing Link connection for creating queues, // exchanges etc. We know link->getConnection() is non-zero because we are // being called in the connections thread context. diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index 9161227c0f..2274f49294 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -61,7 +61,9 @@ class QueueReplicator; * exchanges and bindings to replicate the primary. * It also creates QueueReplicators for newly replicated queues. * - * THREAD UNSAFE: Only called in Link connection thread, no need for locking. + * THREAD UNSAFE: + * All members except shutdown are only called in the Link's connection thread context. + * shutdown() does not use any mutable state. * */ class BrokerReplicator : public broker::Exchange, @@ -96,7 +98,7 @@ class BrokerReplicator : public broker::Exchange, class ErrorListener; class ConnectionObserver; - void initializeBridge(broker::Bridge&, broker::SessionHandler&); + void connected(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); @@ -134,6 +136,7 @@ class BrokerReplicator : public broker::Exchange, void deleteExchange(const std::string& name); void autoDeleteCheck(boost::shared_ptr<broker::Exchange>); + void disconnected(); void setMembership(const types::Variant::List&); // Set membership from list. @@ -155,6 +158,7 @@ class BrokerReplicator : public broker::Exchange, EventDispatchMap dispatch; std::auto_ptr<UpdateTracker> queueTracker; std::auto_ptr<UpdateTracker> exchangeTracker; + boost::shared_ptr<ConnectionObserver> connectionObserver; }; }} // namespace qpid::broker |
