summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-29 14:55:37 +0000
committerAlan Conway <aconway@apache.org>2012-05-29 14:55:37 +0000
commit3f64f140c95f54adfd5d698765f01d04670a0af0 (patch)
treed9ab7286491dff1e5228a5e707fadab888ee89fe /qpid/cpp/src
parentab7c34a2e2b21faebf0d6d50f94223cde86601d0 (diff)
downloadqpid-python-3f64f140c95f54adfd5d698765f01d04670a0af0.tar.gz
QPID-3603: Fix deadlock in ReplicatingSubscription causing primary to hang sporadically
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1343762 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp36
1 files changed, 27 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 9d7df51e3b..4490e309aa 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -146,6 +146,8 @@ struct QueueRange {
SequenceNumber front;
SequenceNumber back;
+ QueueRange() { }
+
// FIXME aconway 2012-05-26: fix front calculation
QueueRange(broker::Queue& q) {
back = q.getPosition();
@@ -210,7 +212,6 @@ ReplicatingSubscription::ReplicatingSubscription(
QueueRange primary(*queue);
QueueRange backup(arguments);
backupPosition = backup.back;
-
// We can re-use some backup messages if backup and primary queues
// overlap and the backup is not missing messages at the front of the queue.
if (!primary.empty && // Primary not empty
@@ -252,15 +253,22 @@ ReplicatingSubscription::~ReplicatingSubscription() {
// Called in subscription's connection thread when the subscription is created.
void ReplicatingSubscription::initialize() {
sys::Mutex::ScopedLock l(lock); // QueueObserver methods can be called concurrently
+
// Send initial dequeues and position to the backup.
// There most be a shared_ptr(this) when sending.
sendDequeueEvent(l);
sendPositionEvent(position, l);
backupPosition = position;
- // Set the ready position, all messages after this has been set as an observer.
- // All messages after this position have been seen by us as QueueObserver.
- QueueRange range(*getQueue());
+ // Set the ready position. All messages after this position have
+ // been seen by us as QueueObserver.
+ QueueRange range;
+ {
+ // Drop the lock, QueueRange will lock the queues message lock
+ // which is also locked around calls to enqueued() and dequeued()
+ sys::Mutex::ScopedUnlock u(lock);
+ range = QueueRange(*getQueue());
+ }
readyPosition = range.back;
if (range.empty || position >= readyPosition) {
setReady(l);
@@ -291,6 +299,7 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
// Send the position before qm was enqueued.
sendPositionEvent(qm.position-1, l);
}
+ // Backup will automaticall advance by 1 on delivery of message.
backupPosition = qm.position;
}
// Deliver the message
@@ -314,9 +323,12 @@ bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
void ReplicatingSubscription::setReady(const sys::Mutex::ScopedLock&) {
if (ready) return;
ready = true;
- QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
// Notify Primary that a subscription is ready.
- if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ QPID_LOG(info, logPrefix << "Caught up at " << getPosition());
+ if (Primary::get()) Primary::get()->readyReplica(getQueue()->getName());
+ }
}
// INVARIANT: delayed contains msg <=> we have outstanding startCompletion on msg
@@ -400,7 +412,10 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
dequeues.encode(buffer);
dequeues.clear();
buffer.reset();
- sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer);
+ }
}
// QueueObserver override. Called after the message has been removed
@@ -408,8 +423,8 @@ void ReplicatingSubscription::sendDequeueEvent(const sys::Mutex::ScopedLock&)
// arbitrary connection threads.
void ReplicatingSubscription::dequeued(const QueuedMessage& qm)
{
+ QPID_LOG(trace, logPrefix << "Dequeued " << qm);
{
- QPID_LOG(trace, logPrefix << "Dequeued " << qm);
sys::Mutex::ScopedLock l(lock);
dequeues.add(qm.position);
// If we have not yet sent this message to the backup, then
@@ -429,7 +444,10 @@ void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, const sys::M
framing::Buffer buffer(&buf[0], buf.size());
pos.encode(buffer);
buffer.reset();
- sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+ {
+ sys::Mutex::ScopedUnlock u(lock);
+ sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer);
+ }
}
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer)