diff options
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp | 18 |
1 files changed, 11 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 1ede47ed60..2001ec5332 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -137,7 +137,9 @@ ReplicatingSubscription::ReplicatingSubscription( } // If there's already a guard (we are in failover) use it, else create one. - if (Primary::get()) guard = Primary::get()->getGuard(queue, info); + boost::shared_ptr<Primary> primary = + boost::dynamic_pointer_cast<Primary>(haBroker.getRole()); + if (primary) guard = primary->getGuard(queue, info); if (!guard) guard.reset(new QueueGuard(*queue, info)); // NOTE: Once the observer is attached we can have concurrent @@ -148,19 +150,19 @@ ReplicatingSubscription::ReplicatingSubscription( // between the snapshot and attaching the observer. observer.reset(new QueueObserver(*this)); queue->addObserver(observer); - ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot(); + ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot(); std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET); - ReplicationIdSet backup; - if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr); + ReplicationIdSet backupIds; + if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr); // Initial dequeues are messages on backup but not on primary. - ReplicationIdSet initDequeues = backup - primary; + ReplicationIdSet initDequeues = backupIds - primaryIds; QueuePosition front,back; queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue { sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued() dequeues += initDequeues; // Messages on backup that are not on primary. - skip = backup - initDequeues; // Messages already on the backup. + skip = backupIds - initDequeues; // Messages already on the backup. // Queue front is moving but we know this subscriptions will start at a // position >= front so if front is safe then position must be. @@ -247,7 +249,9 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) { sys::Mutex::ScopedUnlock u(lock); // Notify Primary that a subscription is ready. QPID_LOG(debug, logPrefix << "Caught up"); - if (Primary::get()) Primary::get()->readyReplica(*this); + boost::shared_ptr<Primary> primary = + boost::dynamic_pointer_cast<Primary>(haBroker.getRole()); + if (primary) primary->readyReplica(*this); } } |
