summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2010-11-18 19:40:53 +0000
committerAlan Conway <aconway@apache.org>2010-11-18 19:40:53 +0000
commit4869e431e27144d96c81448d7db0d20c68f2b6a7 (patch)
tree8e7d07af618e2a560a585545c6b7dd5056b53e04 /cpp
parent9891b7d43f702ea4374c62381eba24db21b1bfbf (diff)
downloadqpid-python-4869e431e27144d96c81448d7db0d20c68f2b6a7.tar.gz
QPID-2874 Clustered broker crashes in assertion in cluster/ExpiryPolicy.cpp
- Added missing lock to ExpiryPolicy - 1-N mapping for expiry ID to mapping when receiving an update. - Regression test. A fan-out message (sent to multiple queues e.g. by fanout or topic exchange) is a single message on multiple queues with a single expiry ID. During an update however each instance is sent as a separate message so we need to allow 1-N mapping of expiry ID to message during update. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1036589 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.cpp52
-rw-r--r--cpp/src/qpid/cluster/ExpiryPolicy.h10
-rwxr-xr-xcpp/src/tests/cluster_tests.py26
3 files changed, 75 insertions, 13 deletions
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.cpp b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
index e1ba420f2a..d91437c5ba 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.cpp
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.cpp
@@ -41,40 +41,76 @@ struct ExpiryTask : public sys::TimerTask {
const uint64_t expiryId;
};
+// Called while receiving an update
+void ExpiryPolicy::setId(uint64_t id) {
+ sys::Mutex::ScopedLock l(lock);
+ expiryId = id;
+}
+
+// Called while giving an update
+uint64_t ExpiryPolicy::getId() const {
+ sys::Mutex::ScopedLock l(lock);
+ return expiryId;
+}
+
+// Called in enqueuing connection thread
void ExpiryPolicy::willExpire(broker::Message& m) {
- uint64_t id = expiryId++;
- assert(unexpiredById.find(id) == unexpiredById.end());
- assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
- unexpiredById[id] = &m;
- unexpiredByMessage[&m] = id;
+ uint64_t id;
+ {
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ //
+ // TODO: fix update to avoid duplicating messages.
+ sys::Mutex::ScopedLock l(lock);
+ id = expiryId++; // if this is an update, this expiryId may already exist
+ assert(unexpiredByMessage.find(&m) == unexpiredByMessage.end());
+ unexpiredById.insert(IdMessageMap::value_type(id, &m));
+ unexpiredByMessage[&m] = id;
+ }
timer.add(new ExpiryTask(this, id, m.getExpiration()));
}
+// Called in dequeueing connection thread
void ExpiryPolicy::forget(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
assert(i != unexpiredByMessage.end());
unexpiredById.erase(i->second);
unexpiredByMessage.erase(i);
}
+// Called in dequeueing connection or cleanup thread.
bool ExpiryPolicy::hasExpired(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
return unexpiredByMessage.find(&m) == unexpiredByMessage.end();
}
+// Called in timer thread
void ExpiryPolicy::sendExpire(uint64_t id) {
+ {
+ sys::Mutex::ScopedLock l(lock);
+ // Don't multicast an expiry notice if message is already forgotten.
+ if (unexpiredById.find(id) == unexpiredById.end()) return;
+ }
mcast.mcastControl(framing::ClusterMessageExpiredBody(framing::ProtocolVersion(), id), memberId);
}
+// Called in CPG deliver thread.
void ExpiryPolicy::deliverExpire(uint64_t id) {
- IdMessageMap::iterator i = unexpiredById.find(id);
- if (i != unexpiredById.end()) {
+ sys::Mutex::ScopedLock l(lock);
+ std::pair<IdMessageMap::iterator, IdMessageMap::iterator> expired = unexpiredById.equal_range(id);
+ IdMessageMap::iterator i = expired.first;
+ while (i != expired.second) {
i->second->setExpiryPolicy(expiredPolicy); // hasExpired() == true;
unexpiredByMessage.erase(i->second);
- unexpiredById.erase(i);
+ unexpiredById.erase(i++);
}
}
+// Called in update thread on the updater.
boost::optional<uint64_t> ExpiryPolicy::getId(broker::Message& m) {
+ sys::Mutex::ScopedLock l(lock);
MessageIdMap::iterator i = unexpiredByMessage.find(&m);
return i == unexpiredByMessage.end() ? boost::optional<uint64_t>() : i->second;
}
diff --git a/cpp/src/qpid/cluster/ExpiryPolicy.h b/cpp/src/qpid/cluster/ExpiryPolicy.h
index bdbe3a61dc..77a656aa68 100644
--- a/cpp/src/qpid/cluster/ExpiryPolicy.h
+++ b/cpp/src/qpid/cluster/ExpiryPolicy.h
@@ -61,20 +61,24 @@ class ExpiryPolicy : public broker::ExpiryPolicy
// Cluster delivers expiry notice.
void deliverExpire(uint64_t);
- void setId(uint64_t id) { expiryId = id; }
- uint64_t getId() const { return expiryId; }
+ void setId(uint64_t id);
+ uint64_t getId() const;
boost::optional<uint64_t> getId(broker::Message&);
private:
typedef std::map<broker::Message*, uint64_t> MessageIdMap;
- typedef std::map<uint64_t, broker::Message*> IdMessageMap;
+ // When messages are fanned out to multiple queues, update sends
+ // them as independenty messages so we can have multiple messages
+ // with the same expiry ID.
+ typedef std::multimap<uint64_t, broker::Message*> IdMessageMap;
struct Expired : public broker::ExpiryPolicy {
bool hasExpired(broker::Message&);
void willExpire(broker::Message&);
};
+ mutable sys::Mutex lock;
MessageIdMap unexpiredByMessage;
IdMessageMap unexpiredById;
uint64_t expiryId;
diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py
index 7272675971..1df72bf1d2 100755
--- a/cpp/src/tests/cluster_tests.py
+++ b/cpp/src/tests/cluster_tests.py
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, subprocess
from qpid import datatypes, messaging
from qpid.brokertest import *
from qpid.harness import Skipped
-from qpid.messaging import Message
+from qpid.messaging import Message, Empty
from threading import Thread, Lock
from logging import getLogger
from itertools import chain
@@ -41,7 +41,6 @@ log = getLogger("qpid.cluster_tests")
# Import scripts as modules
qpid_cluster=import_script(checkenv("QPID_CLUSTER_EXEC"))
-
def readfile(filename):
"""Returns te content of file named filename as a string"""
f = file(filename)
@@ -223,6 +222,29 @@ acl allow all all
out = qs.communicate()[0]
assert out.find("amq.failover") > 0
+ def evaluate_address(self, session, address):
+ """Create a receiver just to evaluate an address for its side effects"""
+ r = session.receiver(address)
+ r.close()
+
+ def test_expire_fanout(self):
+ """Regression test for QPID-2874: Clustered broker crashes in assertion in
+ cluster/ExpiryPolicy.cpp.
+ Caused by a fan-out message being updated as separate messages"""
+ cluster = self.cluster(1)
+ session0 = cluster[0].connect().session()
+ # Create 2 queues bound to fanout exchange.
+ self.evaluate_address(session0, "q1;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q1}]}}")
+ self.evaluate_address(session0, "q2;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:q2}]}}")
+ queues = ["q1", "q2"]
+ # Send a fanout message with a long timeout
+ s = session0.sender("amq.fanout")
+ s.send(Message("foo", ttl=100), sync=False)
+ # Start a new member, check the messages
+ cluster.start()
+ session1 = cluster[1].connect().session()
+ for q in queues: self.assert_browse(session1, "q1", ["foo"])
+
class LongTests(BrokerTest):
"""Tests that can run for a long time if -DDURATION=<minutes> is set"""
def duration(self):