summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp18
1 files changed, 11 insertions, 7 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 1ede47ed60..2001ec5332 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -137,7 +137,9 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// If there's already a guard (we are in failover) use it, else create one.
- if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
+ boost::shared_ptr<Primary> primary =
+ boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
+ if (primary) guard = primary->getGuard(queue, info);
if (!guard) guard.reset(new QueueGuard(*queue, info));
// NOTE: Once the observer is attached we can have concurrent
@@ -148,19 +150,19 @@ ReplicatingSubscription::ReplicatingSubscription(
// between the snapshot and attaching the observer.
observer.reset(new QueueObserver(*this));
queue->addObserver(observer);
- ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+ ReplicationIdSet primaryIds = haBroker.getQueueSnapshots()->get(queue)->snapshot();
std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
- ReplicationIdSet backup;
- if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+ ReplicationIdSet backupIds;
+ if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);
// Initial dequeues are messages on backup but not on primary.
- ReplicationIdSet initDequeues = backup - primary;
+ ReplicationIdSet initDequeues = backupIds - primaryIds;
QueuePosition front,back;
queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
{
sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
dequeues += initDequeues; // Messages on backup that are not on primary.
- skip = backup - initDequeues; // Messages already on the backup.
+ skip = backupIds - initDequeues; // Messages already on the backup.
// Queue front is moving but we know this subscriptions will start at a
// position >= front so if front is safe then position must be.
@@ -247,7 +249,9 @@ void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
sys::Mutex::ScopedUnlock u(lock);
// Notify Primary that a subscription is ready.
QPID_LOG(debug, logPrefix << "Caught up");
- if (Primary::get()) Primary::get()->readyReplica(*this);
+ boost::shared_ptr<Primary> primary =
+ boost::dynamic_pointer_cast<Primary>(haBroker.getRole());
+ if (primary) primary->readyReplica(*this);
}
}