summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-15 21:05:34 +0000
committerAlan Conway <aconway@apache.org>2012-05-15 21:05:34 +0000
commit55305747e6e7f931756bfa21460c37e350f5ea0f (patch)
treec6b826cac45092f95c428671a13024aad62b3400 /qpid/cpp
parent80a0832a2fe775ff217e6353f003226eb3f18d89 (diff)
downloadqpid-python-55305747e6e7f931756bfa21460c37e350f5ea0f.tar.gz
QPID-3603: HA broker backup/primary ready checks.
- Introduce HA broker state machien - Inform backup queues when ready. - Incomplete implementation of backup ready check. - does not count correctly after a failover, see countUnready. - Existing replicator bridges updated out of sync with BrokerReplicator initialize. - Does not handle multi-messages responses. - Newly promoted HA primary waits for backups to be ready before accepting clients. - Uniform log prefixes for HA messages. - qpid-ha tests, call qpid-ha python code directly. - Move excluder from Backup to HaBroker, it is also used in PROMOTING. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/CMakeLists.txt23
-rw-r--r--qpid/cpp/src/ha.mk11
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h48
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp2
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp10
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h4
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp227
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.h18
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp25
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionExcluder.h12
-rw-r--r--qpid/cpp/src/qpid/ha/Counter.h57
-rw-r--r--qpid/cpp/src/qpid/ha/Enum.cpp72
-rw-r--r--qpid/cpp/src/qpid/ha/Enum.h97
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.cpp196
-rw-r--r--qpid/cpp/src/qpid/ha/HaBroker.h49
-rw-r--r--qpid/cpp/src/qpid/ha/HaPlugin.cpp3
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.cpp40
-rw-r--r--qpid/cpp/src/qpid/ha/LogPrefix.h (renamed from qpid/cpp/src/qpid/ha/ReplicateLevel.h)39
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/Primary.h71
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.cpp42
-rw-r--r--qpid/cpp/src/qpid/ha/QueueReplicator.h19
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicateLevel.cpp72
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp83
-rw-r--r--qpid/cpp/src/qpid/ha/ReplicatingSubscription.h19
-rw-r--r--qpid/cpp/src/qpid/ha/Settings.h8
-rw-r--r--qpid/cpp/src/tests/brokertest.py18
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py164
29 files changed, 1058 insertions, 386 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index cc882f80ca..8e4e9dae34 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -628,20 +628,25 @@ if (BUILD_HA)
set (ha_SOURCES
qpid/ha/Backup.cpp
qpid/ha/Backup.h
+ qpid/ha/BrokerReplicator.cpp
+ qpid/ha/BrokerReplicator.h
+ qpid/ha/ConnectionExcluder.cpp
+ qpid/ha/ConnectionExcluder.h
+ qpid/ha/Counter.h
+ qpid/ha/Enum.cpp
+ qpid/ha/Enum.h
qpid/ha/HaBroker.cpp
qpid/ha/HaBroker.h
qpid/ha/HaPlugin.cpp
- qpid/ha/Settings.h
- qpid/ha/QueueReplicator.h
+ qpid/ha/LogPrefix.cpp
+ qpid/ha/LogPrefix.h
+ qpid/ha/Primary.cpp
+ qpid/ha/Primary.h
qpid/ha/QueueReplicator.cpp
- qpid/ha/ReplicateLevel.h
- qpid/ha/ReplicateLevel.cpp
- qpid/ha/ReplicatingSubscription.h
+ qpid/ha/QueueReplicator.h
qpid/ha/ReplicatingSubscription.cpp
- qpid/ha/BrokerReplicator.cpp
- qpid/ha/BrokerReplicator.h
- qpid/ha/ConnectionExcluder.cpp
- qpid/ha/ConnectionExcluder.h
+ qpid/ha/ReplicatingSubscription.h
+ qpid/ha/Settings.h
)
add_library (ha MODULE ${ha_SOURCES})
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index be1fb73e89..31f7bcc494 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -26,16 +26,21 @@ ha_la_SOURCES = \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
qpid/ha/BrokerReplicator.cpp \
- qpid/ha/BrokerReplicator.h \
+ qpid/ha/BrokerReplicator.h \
qpid/ha/ConnectionExcluder.cpp \
qpid/ha/ConnectionExcluder.h \
+ qpid/ha/Counter.h \
+ qpid/ha/Enum.cpp \
+ qpid/ha/Enum.h \
qpid/ha/HaBroker.cpp \
qpid/ha/HaBroker.h \
qpid/ha/HaPlugin.cpp \
+ qpid/ha/LogPrefix.cpp \
+ qpid/ha/LogPrefix.h \
+ qpid/ha/Primary.cpp \
+ qpid/ha/Primary.h \
qpid/ha/QueueReplicator.cpp \
qpid/ha/QueueReplicator.h \
- qpid/ha/ReplicateLevel.cpp \
- qpid/ha/ReplicateLevel.h \
qpid/ha/ReplicatingSubscription.cpp \
qpid/ha/ReplicatingSubscription.h \
qpid/ha/Settings.h
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index c13ac19454..cb9d61a40f 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -1257,5 +1257,19 @@ void Broker::unbind(const std::string& queueName,
}
}
+// FIXME aconway 2012-04-27: access to linkClientProperties is
+// not properly thread safe, you could lose fields if 2 threads
+// attempt to add a field concurrently.
+
+framing::FieldTable Broker::getLinkClientProperties() const {
+ sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+ return linkClientProperties;
+}
+
+void Broker::setLinkClientProperties(const framing::FieldTable& ft) {
+ sys::Mutex::ScopedLock l(linkClientPropertiesLock);
+ linkClientProperties = ft;
+}
+
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 543d42e002..089619ec44 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -63,8 +63,8 @@
namespace qpid {
namespace sys {
- class ProtocolFactory;
- class Poller;
+class ProtocolFactory;
+class Poller;
}
struct Url;
@@ -90,7 +90,7 @@ class Broker : public sys::Runnable, public Plugin::Target,
public management::Manageable,
public RefCounted
{
-public:
+ public:
struct Options : public qpid::Options {
static const std::string DEFAULT_DATA_DIR_LOCATION;
@@ -132,23 +132,23 @@ public:
};
class ConnectionCounter {
- int maxConnections;
- int connectionCount;
- sys::Mutex connectionCountLock;
- public:
- ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
- void inc_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount++;
- }
- void dec_connectionCount() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- connectionCount--;
- }
- bool allowConnection() {
- sys::ScopedLock<sys::Mutex> l(connectionCountLock);
- return (maxConnections <= connectionCount);
- }
+ int maxConnections;
+ int connectionCount;
+ sys::Mutex connectionCountLock;
+ public:
+ ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {};
+ void inc_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ connectionCount++;
+ }
+ void dec_connectionCount() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ connectionCount--;
+ }
+ bool allowConnection() {
+ sys::ScopedLock<sys::Mutex> l(connectionCountLock);
+ return (maxConnections <= connectionCount);
+ }
};
private:
@@ -205,6 +205,10 @@ public:
ConnectionCounter connectionCounter;
ConsumerFactories consumerFactories;
+ mutable sys::Mutex linkClientPropertiesLock;
+ framing::FieldTable linkClientProperties;
+
+
public:
QPID_BROKER_EXTERN virtual ~Broker();
@@ -375,6 +379,10 @@ public:
ConsumerFactories& getConsumerFactories() { return consumerFactories; }
ConnectionObservers& getConnectionObservers() { return connectionObservers; }
+
+ /** Properties to be set on outgoing link connections */
+ framing::FieldTable getLinkClientProperties() const;
+ void setLinkClientProperties(const framing::FieldTable&);
};
}}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 6894324117..5d24e115c4 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -318,7 +318,7 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
}
- FieldTable ft;
+ FieldTable ft = connection.getBroker().getLinkClientProperties();
ft.setInt(QPID_FED_LINK,1);
ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 5f053e0974..1ff5578ff4 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -20,7 +20,6 @@
*/
#include "Backup.h"
#include "BrokerReplicator.h"
-#include "ConnectionExcluder.h"
#include "HaBroker.h"
#include "ReplicatingSubscription.h"
#include "Settings.h"
@@ -45,17 +44,15 @@ using types::Variant;
using std::string;
Backup::Backup(HaBroker& hb, const Settings& s) :
- haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder())
+ logPrefix(hb), haBroker(hb), broker(hb.getBroker()), settings(s)
{
- // Exclude client connections before starting the link to avoid self-connection.
- broker.getConnectionObservers().add(excluder);
// Empty brokerUrl means delay initialization until setUrl() is called.
if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl));
}
void Backup::initialize(const Url& url) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(notice, "HA: Backup initialized: " << url);
+ QPID_LOG(info, logPrefix << "initialized for: " << url);
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
framing::Uuid uuid(true);
// Declare the link
@@ -75,7 +72,6 @@ Backup::~Backup() {
if (link) link->close();
if (replicator.get()) broker.getExchanges().destroy(replicator->getName());
replicator.reset();
- broker.getConnectionObservers().remove(excluder); // This allows client connections.
}
@@ -84,7 +80,7 @@ void Backup::setBrokerUrl(const Url& url) {
if (url.empty()) return;
sys::Mutex::ScopedLock l(lock);
if (link) { // URL changed after we initialized.
- QPID_LOG(info, "HA: Backup broker URL set to " << url);
+ QPID_LOG(info, logPrefix << "broker URL set to " << url);
link->setUrl(url);
}
else {
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 6c36996914..f794b11a60 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
@@ -36,7 +37,6 @@ class Link;
namespace ha {
class Settings;
-class ConnectionExcluder;
class BrokerReplicator;
class HaBroker;
@@ -55,13 +55,13 @@ class Backup
private:
void initialize(const Url&);
+ LogPrefix logPrefix;
sys::Mutex lock;
HaBroker& haBroker;
broker::Broker& broker;
Settings settings;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<BrokerReplicator> replicator;
- boost::shared_ptr<ConnectionExcluder> excluder;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index 690337831c..ea5f4a5fa8 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -39,6 +39,7 @@
#include "qmf/org/apache/qpid/broker/EventSubscribe.h"
#include <algorithm>
#include <sstream>
+#include <assert.h>
namespace qpid {
namespace ha {
@@ -58,7 +59,6 @@ using namespace broker;
namespace {
const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator");
-const string QPID_REPLICATE("qpid.replicate");
const string CLASS_NAME("_class_name");
const string EVENT("_event");
@@ -163,32 +163,13 @@ Variant::Map asMapVoid(const Variant& value) {
} // namespace
-ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) {
- ReplicateLevel rl;
- if (qpid::ha::replicateLevel(str, rl)) return rl;
- else return haBroker.getSettings().replicateDefault;
-}
-
-ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) {
- if (f.isSet(QPID_REPLICATE))
- return replicateLevel(f.getAsString(QPID_REPLICATE));
- else
- return haBroker.getSettings().replicateDefault;
-}
-
-ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) {
- Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
- if (i != m.end())
- return replicateLevel(i->second.asString());
- else
- return haBroker.getSettings().replicateDefault;
-}
-
BrokerReplicator::~BrokerReplicator() {}
BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
: Exchange(QPID_CONFIGURATION_REPLICATOR),
- haBroker(hb), broker(hb.getBroker()), link(l)
+ logPrefix(hb),
+ haBroker(hb), broker(hb.getBroker()), link(l),
+ unreadyCount(boost::bind(&BrokerReplicator::ready, this))
{
framing::Uuid uuid(true);
const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str());
@@ -211,13 +192,33 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>&
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+
+ switch (haBroker.getStatus()) {
+ case JOINING:
+ haBroker.setStatus(CATCHUP);
+ case CATCHUP:
+ // FIXME aconway 2012-04-27: distinguish catchup case, below.
+ break;
+ case READY:
+ // FIXME aconway 2012-04-27: distinguish ready case, reconnect to other backup.
+ break;
+ case PROMOTING:
+ case ACTIVE:
+ // FIXME aconway 2012-04-27: link is connected to self!
+ // Promotion should close the link before allowing connections.
+ return;
+ break;
+ case STANDALONE:
+ return;
+ }
+
framing::AMQP_ServerProxy peer(sessionHandler.out);
string queueName = bridge.getQueueName();
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
//declare and bind an event queue
FieldTable declareArgs;
- declareArgs.setString(QPID_REPLICATE, str(RL_NONE));
+ declareArgs.setString(QPID_REPLICATE, printable(NONE).str());
peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs);
peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
//subscribe to the queue
@@ -231,7 +232,9 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH
sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler);
sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler);
- QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName);
+ // Queue ready count - count one for the query in progress.
+ ++unreadyCount;
+ QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName);
}
void BrokerReplicator::route(Deliverable& msg) {
@@ -244,10 +247,10 @@ void BrokerReplicator::route(Deliverable& msg) {
string content = msg.getMessage().getFrames().getContent();
amqp_0_10::ListCodec::decode(content, list);
+ string type; // FIXME aconway 2012-04-26: quick hack for end-of query, need to handle multi-message responses
if (headers->getAsString(QMF_CONTENT) == EVENT) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "HA: Backup received event: " << map);
Variant::Map& schema = map[SCHEMA_ID].asMap();
Variant::Map& values = map[VALUES].asMap();
if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values);
@@ -260,8 +263,7 @@ void BrokerReplicator::route(Deliverable& msg) {
} else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) {
for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) {
Variant::Map& map = i->asMap();
- QPID_LOG(trace, "HA: Backup received event: " << map);
- string type = map[SCHEMA_ID].asMap()[CLASS_NAME];
+ type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString();
Variant::Map& values = map[VALUES].asMap();
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
@@ -271,8 +273,13 @@ void BrokerReplicator::route(Deliverable& msg) {
else if (type == HA_BROKER) doResponseHaBroker(values);
}
}
+ // FIXME aconway 2012-04-26: when the queue query is complete
+ if (type == QUEUE) {
+ // Count 1 for the query, which is now complete.
+ --unreadyCount;
+ }
} catch (const std::exception& e) {
- QPID_LOG(critical, "HA: Backup configuration failed: " << e.what()
+ QPID_LOG(critical, logPrefix << "configuration failed: " << e.what()
<< ": while handling: " << list);
throw;
}
@@ -281,9 +288,16 @@ void BrokerReplicator::route(Deliverable& msg) {
void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
string name = values[QNAME].asString();
Variant::Map argsMap = asMapVoid(values[ARGS]);
- if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated queue.
+ if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
+ // If we already have a queue with this name, replace it.
+ // The queue was definitely created on the primary.
+ if (broker.getQueues().find(name)) {
+ broker.getQueues().destroy(name);
+ QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " << name);
+ }
std::pair<boost::shared_ptr<Queue>, bool> result =
broker.createQueue(
name,
@@ -294,61 +308,68 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
args,
values[USER].asString(),
values[RHOST].asString());
- if (result.second) {
- QPID_LOG(debug, "HA: Backup queue declare event: " << name);
- startQueueReplicator(result.first);
- } else {
- // FIXME aconway 2011-12-02: what's the right way to handle this?
- // Should we delete the old & re-create form the event? Responses
- // may be old but events are always up-to-date.
- QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name);
- }
+ assert(result.second);
+ QPID_LOG(debug, logPrefix << "queue declare event: " << name);
+ startQueueReplicator(result.first, 0); // No unreadyCount for declare events.
+ // FIXME aconway 2012-04-26: but we will need to count them after a failover.
}
}
+boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator(
+ const std::string& qname)
+{
+ string rname = QueueReplicator::replicatorName(qname);
+ boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
+ return boost::dynamic_pointer_cast<QueueReplicator>(ex);
+}
+
void BrokerReplicator::doEventQueueDelete(Variant::Map& values) {
// The remote queue has already been deleted so replicator
// sessions may be closed by a "queue deleted" exception.
string name = values[QNAME].asString();
boost::shared_ptr<Queue> queue = broker.getQueues().find(name);
if (!queue) {
- QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name);
- } else if (!replicateLevel(queue->getSettings())) {
- QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name);
+ QPID_LOG(warning, logPrefix << "queue delete event, does not exist: " << name);
+ } else if (!haBroker.replicateLevel(queue->getSettings())) {
+ QPID_LOG(warning, logPrefix << "queue delete event, not replicated: " << name);
} else {
- string rname = QueueReplicator::replicatorName(name);
- boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname);
- boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex);
- if (qr) qr->deactivate();
- // QueueReplicator's bridge is now queued for destruction but may not
- // actually be destroyed, deleting the exhange
- broker.getExchanges().destroy(rname);
+ boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name);
+ if (qr) {
+ qr->deactivate();
+ haBroker.deactivatedBackup(name);
+ // QueueReplicator's bridge is now queued for destruction but may not
+ // actually be destroyed.
+ broker.getExchanges().destroy(qr->getName());
+ }
broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString());
- QPID_LOG(debug, "HA: Backup queue delete event: " << name);
+ QPID_LOG(debug, logPrefix << "queue delete event: " << name);
}
}
void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGS]));
- if (values[DISP] == CREATED && replicateLevel(argsMap)) {
+ if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated exchange.
+ if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) {
string name = values[EXNAME].asString();
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
- if (broker.createExchange(
+ // If we already have a exchange with this name, replace it.
+ // The exchange was definitely created on the primary.
+ if (broker.getExchanges().find(name)) {
+ broker.getExchanges().destroy(name);
+ QPID_LOG(warning, logPrefix << "exchange declare event, replaced exsiting: " << name)
+ }
+ std::pair<boost::shared_ptr<Exchange>, bool> result =
+ broker.createExchange(
name,
values[EXTYPE].asString(),
values[DURABLE].asBool(),
values[ALTEX].asString(),
args,
values[USER].asString(),
- values[RHOST].asString()).second)
- {
- QPID_LOG(debug, "HA: Backup exchange declare event: " << name);
- } else {
- // FIXME aconway 2011-11-22: should delete pre-existing exchange
- // and re-create from event. See comment in doEventQueueDeclare.
- QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name);
- }
+ values[RHOST].asString());
+ assert(result.second);
+ QPID_LOG(debug, logPrefix << "exchange declare event: " << name);
}
}
@@ -356,11 +377,11 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) {
string name = values[EXNAME].asString();
boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name);
if (!exchange) {
- QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name);
- } else if (!replicateLevel(exchange->getArgs())) {
- QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name);
+ QPID_LOG(warning, logPrefix << "exchange delete event, does not exist: " << name);
+ } else if (!haBroker.replicateLevel(exchange->getArgs())) {
+ QPID_LOG(warning, logPrefix << "exchange delete event, not replicated: " << name);
} else {
- QPID_LOG(debug, "HA: Backup exchange delete event:" << name);
+ QPID_LOG(debug, logPrefix << "exchange delete event:" << name);
broker.deleteExchange(
name,
values[USER].asString(),
@@ -375,14 +396,14 @@ void BrokerReplicator::doEventBind(Variant::Map& values) {
broker.getQueues().find(values[QNAME].asString());
// We only replicate binds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicateLevel(exchange->getArgs()) &&
- queue && replicateLevel(queue->getSettings()))
+ if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
+ queue && haBroker.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName()
+ QPID_LOG(debug, logPrefix << "bind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
@@ -395,14 +416,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
broker.getQueues().find(values[QNAME].asString());
// We only replicate unbinds for a replicated queue to replicated
// exchange that both exist locally.
- if (exchange && replicateLevel(exchange->getArgs()) &&
- queue && replicateLevel(queue->getSettings()))
+ if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
+ queue && haBroker.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGS]), args);
string key = values[KEY].asString();
exchange->unbind(queue, key, &args);
- QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName()
+ QPID_LOG(debug, logPrefix << "unbind event: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
@@ -410,7 +431,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) {
void BrokerReplicator::doResponseQueue(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicateLevel(argsMap)) return;
+ if (!haBroker.replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
string name(values[NAME].asString());
@@ -424,19 +445,19 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) {
args,
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/);
- if (result.second) {
- QPID_LOG(debug, "HA: Backup queue response: " << name);
- startQueueReplicator(result.first);
- } else {
- // FIXME aconway 2011-11-22: Normal to find queue already
- // exists if we're failing over.
- QPID_LOG(warning, "HA: Backup queue response, already exists: " << name);
- }
+ QueueReplicatorPtr qr;
+ // It is normal for the queue to already exist if we are failing over.
+ // FIXME aconway 2012-04-26: not correct, unreadyCount
+ if (result.second) qr = startQueueReplicator(result.first, &unreadyCount);
+ else qr = findQueueReplicator(name);
+ if (qr) ++unreadyCount;
+ // existing QR may refcount down before I've gone thru the responses.
+ QPID_LOG(debug, logPrefix << "queue response: " << name);
}
void BrokerReplicator::doResponseExchange(Variant::Map& values) {
Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
- if (!replicateLevel(argsMap)) return;
+ if (!haBroker.replicateLevel(argsMap)) return;
framing::FieldTable args;
amqp_0_10::translate(argsMap, args);
if (broker.createExchange(
@@ -448,10 +469,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) {
""/*TODO: who is the user?*/,
""/*TODO: what should we use as connection id?*/).second)
{
- QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString());
+ QPID_LOG(debug, logPrefix << "exchange response: " << values[NAME].asString());
} else {
- QPID_LOG(warning, "HA: Backup exchange query, already exists: " <<
- values[QNAME].asString());
+ QPID_LOG(warning, logPrefix << "exchange response, already exists: " <<
+ values[NAME].asString());
}
}
@@ -483,14 +504,14 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) {
boost::shared_ptr<Queue> queue = broker.getQueues().find(qName);
// Automatically replicate binding if queue and exchange exist and are replicated
- if (exchange && replicateLevel(exchange->getArgs()) &&
- queue && replicateLevel(queue->getSettings()))
+ if (exchange && haBroker.replicateLevel(exchange->getArgs()) &&
+ queue && haBroker.replicateLevel(queue->getSettings()))
{
framing::FieldTable args;
amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
string key = values[KEY].asString();
exchange->bind(queue, key, &args);
- QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName()
+ QPID_LOG(debug, logPrefix << "bind response: exchange=" << exchange->getName()
<< " queue=" << queue->getName()
<< " key=" << key);
}
@@ -503,28 +524,33 @@ const string REPLICATE_DEFAULT="replicateDefault";
// Received the ha-broker configuration object for the primary broker.
void BrokerReplicator::doResponseHaBroker(Variant::Map& values) {
try {
- ReplicateLevel mine = haBroker.getSettings().replicateDefault;
- ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString());
+ ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
+ ReplicateLevel primary = haBroker.replicateLevel(values[REPLICATE_DEFAULT].asString());
if (mine != primary) {
- std::ostringstream os;
- os << "Replicate default on backup (" << mine
- << ") does not match primary (" << primary << ")";
- haBroker.shutdown(os.str());
+ QPID_LOG(critical, logPrefix << "Replicate default on backup (" << mine
+ << ") does not match primary (" << primary << ")");
+ haBroker.shutdown();
}
} catch (const std::exception& e) {
- std::ostringstream os;
- os << "Received invalid replicate default from primary: " << e.what();
- haBroker.shutdown(os.str());
+ QPID_LOG(critical, logPrefix << "Invalid replicate default from primary: "
+ << e.what());
+ haBroker.shutdown();
}
}
-void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) {
- if (replicateLevel(queue->getSettings()) == RL_ALL) {
- boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+BrokerReplicator::QueueReplicatorPtr BrokerReplicator::startQueueReplicator(
+ const boost::shared_ptr<Queue>& queue, Counter* unready)
+{
+ boost::shared_ptr<QueueReplicator> qr;
+ if (haBroker.replicateLevel(queue->getSettings()) == ALL) {
+ qr.reset(new QueueReplicator(
+ LogPrefix(haBroker, queue->getName()), queue, link, unready));
if (!broker.getExchanges().registerExchange(qr))
throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName()));
qr->activate();
+ haBroker.activatedBackup(queue->getName());
}
+ return qr;
}
bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
@@ -533,4 +559,9 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co
string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; }
+void BrokerReplicator::ready() {
+ assert(haBroker.getStatus() == CATCHUP);
+ haBroker.setStatus(READY);
+}
+
}} // namespace broker
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
index c9d7b9f74c..8b7987a89d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h
@@ -22,7 +22,9 @@
*
*/
-#include "ReplicateLevel.h"
+#include "Counter.h"
+#include "Enum.h"
+#include "LogPrefix.h"
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
#include <boost/shared_ptr.hpp>
@@ -42,6 +44,7 @@ class FieldTable;
namespace ha {
class HaBroker;
+class QueueReplicator;
/**
* Replicate configuration on a backup broker.
@@ -68,11 +71,9 @@ class BrokerReplicator : public broker::Exchange
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
private:
- void initializeBridge(broker::Bridge&, broker::SessionHandler&);
+ typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr;
- ReplicateLevel replicateLevel(const std::string&);
- ReplicateLevel replicateLevel(const framing::FieldTable& args);
- ReplicateLevel replicateLevel(const types::Variant::Map& args);
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
@@ -86,11 +87,16 @@ class BrokerReplicator : public broker::Exchange
void doResponseBind(types::Variant::Map& values);
void doResponseHaBroker(types::Variant::Map& values);
- void startQueueReplicator(const boost::shared_ptr<broker::Queue>&);
+ QueueReplicatorPtr findQueueReplicator(const std::string& qname);
+ QueueReplicatorPtr startQueueReplicator(
+ const boost::shared_ptr<broker::Queue>&, Counter*);
+ void ready();
+ LogPrefix logPrefix;
HaBroker& haBroker;
broker::Broker& broker;
boost::shared_ptr<broker::Link> link;
+ Counter unreadyCount;
};
}} // namespace qpid::broker
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
index 67ad7202d6..fef4c67174 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
@@ -27,14 +27,31 @@
namespace qpid {
namespace ha {
-ConnectionExcluder::ConnectionExcluder() {}
+ConnectionExcluder::ConnectionExcluder(const LogPrefix& lp)
+ : logPrefix(lp), backupAllowed(false) {}
void ConnectionExcluder::opened(broker::Connection& connection) {
- if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG))
- throw Exception(
- QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId()));
+ if (connection.isLink()) return; // Allow all outgoing links
+ if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ QPID_LOG(debug, logPrefix << "Allowing admin connection: "
+ << connection.getMgmtId());
+ return;
+ }
+ if (connection.getClientProperties().isSet(BACKUP_TAG)) {
+ if (backupAllowed) {
+ QPID_LOG(debug, logPrefix << "Allowing backup connection: "
+ << connection.getMgmtId());
+ return;
+ }
+ else QPID_LOG(debug, logPrefix << "Rejected backup connection: "
+ << connection.getMgmtId());
+ }
+
+ throw Exception(
+ QPID_MSG(logPrefix << "Rejected client connection " << connection.getMgmtId()));
}
const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";
+const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup";
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
index f8f2843a0c..4a2ebcc127 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
+++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h
@@ -22,6 +22,7 @@
*
*/
+#include "LogPrefix.h"
#include "qpid/broker/ConnectionObserver.h"
#include <boost/function.hpp>
@@ -41,12 +42,19 @@ namespace ha {
class ConnectionExcluder : public broker::ConnectionObserver
{
public:
- ConnectionExcluder();
+ static const std::string ADMIN_TAG;
+ static const std::string BACKUP_TAG;
+
+ ConnectionExcluder(const LogPrefix&);
void opened(broker::Connection& connection);
+ void setBackupAllowed(bool set) { backupAllowed = set; }
+ bool isBackupAllowed() const { return backupAllowed; }
+
private:
- static const std::string ADMIN_TAG;
+ LogPrefix logPrefix;
+ bool backupAllowed;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Counter.h b/qpid/cpp/src/qpid/ha/Counter.h
new file mode 100644
index 0000000000..04dd672126
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Counter.h
@@ -0,0 +1,57 @@
+#ifndef QPID_HA_COUNTER_H
+#define QPID_HA_COUNTER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/AtomicValue.h"
+#include <boost/function.hpp>
+
+namespace qpid {
+namespace ha {
+
+/**
+ * Keep a count, call a callback when it reaches 0.
+ */
+class Counter
+{
+ public:
+ Counter(boost::function<void()> f) : callback(f) {}
+
+ void operator++() { ++count; }
+
+ void operator--() {
+ size_t n = --count;
+ assert(n != size_t(-1)); // No underflow
+ if (n == 0) callback();
+ }
+
+ size_t get() { return count.get(); }
+
+ Counter& operator=(size_t n) { count = n; return *this; }
+
+ private:
+ boost::function<void()> callback;
+ sys::AtomicValue<size_t> count;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_COUNTER_H*/
diff --git a/qpid/cpp/src/qpid/ha/Enum.cpp b/qpid/cpp/src/qpid/ha/Enum.cpp
new file mode 100644
index 0000000000..fda2976ad3
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Enum.cpp
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Enum.h"
+#include "qpid/Msg.h"
+#include "qpid/Exception.h"
+#include <algorithm>
+#include <iostream>
+#include <assert.h>
+
+namespace qpid {
+namespace ha {
+
+const std::string QPID_REPLICATE("qpid.replicate");
+
+std::string EnumBase::str() const {
+ assert(value < count);
+ return names[value];
+}
+
+void EnumBase::parse(const std::string& s) {
+ if (!parseNoThrow(s))
+ throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s));
+}
+
+bool EnumBase::parseNoThrow(const std::string& s) {
+ const char** i = std::find(names, names+count, s);
+ value = i - names;
+ return value < count;
+}
+
+template <> const char* Enum<ReplicateLevel>::NAMES[] = {
+ "none", "configuration", "all", "replication"
+};
+template <> const size_t Enum<ReplicateLevel>::N = 3;
+
+template <> const char* Enum<BrokerStatus>::NAMES[] = {
+ "joining", "catchup", "ready", "promoting", "active",
+ "standalone", "broker status"
+};
+template <> const size_t Enum<BrokerStatus>::N = 7;
+
+std::ostream& operator<<(std::ostream& o, EnumBase e) {
+ return o << e.str();
+}
+
+std::istream& operator>>(std::istream& i, EnumBase& e) {
+ std::string s;
+ i >> s;
+ e.parse(s);
+ return i;
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Enum.h b/qpid/cpp/src/qpid/ha/Enum.h
new file mode 100644
index 0000000000..82d087b768
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Enum.h
@@ -0,0 +1,97 @@
+#ifndef QPID_HA_ENUM_H
+#define QPID_HA_ENUM_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/types/Variant.h"
+#include <string>
+#include <iosfwd>
+
+namespace qpid {
+
+namespace framing {
+class FieldTable;
+}
+
+namespace ha {
+
+/** Base class for enums with string conversion */
+class EnumBase {
+ public:
+ EnumBase(const char* names_[], size_t count_, unsigned value)
+ : value(value), names(names_), count(count_) {}
+
+ /** Convert to string */
+ std::string str() const;
+ /** Parse from string, throw if unsuccessful */
+ void parse(const std::string&);
+ /** Parse from string, return false if unsuccessful. */
+ bool parseNoThrow(const std::string&);
+
+ protected:
+ unsigned value;
+ const char** names;
+ size_t count;
+};
+
+std::ostream& operator<<(std::ostream&, EnumBase);
+std::istream& operator>>(std::istream&, EnumBase&);
+
+/** Wrapper template for enums with string conversion */
+template <class T> class Enum : public EnumBase {
+ public:
+ Enum(T x=T()) : EnumBase(NAMES, N, x) {}
+ T get() const { return T(value); }
+ void operator=(T x) { value = x; }
+ private:
+ static const size_t N;
+ static const char* NAMES[];
+};
+
+/** To print an enum x: o << printable(x) */
+template <class T> Enum<T> printable(T x) { return Enum<T>(x); }
+
+enum ReplicateLevel {
+ NONE, ///< Nothing is replicated
+ CONFIGURATION, ///< Wiring is replicated but not messages
+ ALL ///< Everything is replicated
+};
+
+/** State of a broker: see HaBroker::setStatus for state diagram */
+enum BrokerStatus {
+ JOINING, ///< New broker, looking for primary
+ CATCHUP, ///< Backup: Connected to primary, catching up on state.
+ READY, ///< Backup: Caught up, ready to take over.
+ PROMOTING, ///< Primary: waiting for backups to connect & sync
+ ACTIVE, ///< Primary: actively serving clients.
+ STANDALONE ///< Not part of a cluster.
+};
+
+inline bool isPrimary(BrokerStatus s) {
+ return s == PROMOTING || s == ACTIVE || s == STANDALONE;
+}
+
+inline bool isBackup(BrokerStatus s) { return !isPrimary(s); }
+
+extern const std::string QPID_REPLICATE;
+}} // qpid::ha
+#endif /*!QPID_HA_ENUM_H*/
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp
index 589d7ee6aa..b1c7bf98a5 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.cpp
+++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp
@@ -21,6 +21,7 @@
#include "Backup.h"
#include "ConnectionExcluder.h"
#include "HaBroker.h"
+#include "Primary.h"
#include "Settings.h"
#include "ReplicatingSubscription.h"
#include "qpid/Exception.h"
@@ -28,6 +29,7 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SignalHandler.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/management/ManagementAgent.h"
#include "qmf/org/apache/qpid/ha/Package.h"
#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
@@ -43,59 +45,101 @@ namespace _qmf = ::qmf::org::apache::qpid::ha;
using namespace management;
using namespace std;
-namespace {
-
-const std::string STANDALONE="standalone";
-const std::string CATCH_UP="catch-up";
-const std::string BACKUP="backup";
-const std::string PRIMARY="primary";
-
-} // namespace
-
-
HaBroker::HaBroker(broker::Broker& b, const Settings& s)
- : broker(b),
+ : logPrefix(status),
+ broker(b),
settings(s),
- mgmtObject(0)
+ mgmtObject(0),
+ status(STANDALONE),
+ excluder(new ConnectionExcluder(logPrefix))
{
- // Register a factory for replicating subscriptions.
- broker.getConsumerFactories().add(
- boost::shared_ptr<ReplicatingSubscription::Factory>(
- new ReplicatingSubscription::Factory()));
-
- broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
-
+ // Set up the management object.
ManagementAgent* ma = broker.getManagementAgent();
- if (!ma)
+ if (settings.cluster && !ma)
throw Exception("Cannot start HA: management is disabled");
_qmf::Package packageInit(ma);
mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker");
- mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE);
- mgmtObject->set_replicateDefault(str(settings.replicateDefault));
+ mgmtObject->set_replicateDefault(settings.replicateDefault.str());
ma->addObject(mgmtObject);
- // NOTE: lock is not needed in a constructor but we created it just to pass
- // to the set functions.
+ // Register a factory for replicating subscriptions.
+ broker.getConsumerFactories().add(
+ boost::shared_ptr<ReplicatingSubscription::Factory>(
+ new ReplicatingSubscription::Factory(*this)));
+
+ // If we are in a cluster, start as backup in joining state.
+ if (settings.cluster) {
+ status = JOINING;
+ backup.reset(new Backup(*this, s));
+ broker.getConnectionObservers().add(excluder);
+ broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this);
+ }
+
+ // NOTE: lock is not needed in a constructor, but create one
+ // to pass to functions that have a ScopedLock parameter.
sys::Mutex::ScopedLock l(lock);
if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
-
- // If we are in a cluster, we start in backup mode.
- if (settings.cluster) backup.reset(new Backup(*this, s));
+ statusChanged(l);
+ QPID_LOG(notice, logPrefix << "broker initialized");
}
HaBroker::~HaBroker() {}
+void HaBroker::promoting(sys::Mutex::ScopedLock&) {
+ setStatus(PROMOTING);
+ backup.reset(); // No longer replicating, close link.
+ primary.reset(new Primary(*this)); // Starts primary-ready check.
+}
+
+// Called back from Primary ready check.
+void HaBroker::activate() {
+ sys::Mutex::ScopedLock l(lock);
+ activate(l);
+}
+
+void HaBroker::activate(sys::Mutex::ScopedLock&) {
+ setStatus(ACTIVE);
+ broker.getConnectionObservers().remove(excluder); // This allows client connections.
+}
+
+ReplicateLevel HaBroker::replicateLevel(const std::string& str) {
+ Enum<ReplicateLevel> rl;
+ if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get());
+ else return getSettings().replicateDefault.get();
+}
+
+ReplicateLevel HaBroker::replicateLevel(const framing::FieldTable& f) {
+ if (f.isSet(QPID_REPLICATE))
+ return replicateLevel(f.getAsString(QPID_REPLICATE));
+ else
+ return getSettings().replicateDefault.get();
+}
+
+ReplicateLevel HaBroker::replicateLevel(const types::Variant::Map& m) {
+ types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
+ if (i != m.end())
+ return replicateLevel(i->second.asString());
+ else
+ return getSettings().replicateDefault.get();
+}
+
Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) {
sys::Mutex::ScopedLock l(lock);
switch (methodId) {
case _qmf::HaBroker::METHOD_PROMOTE: {
- if (backup.get()) { // I am a backup
- // NOTE: resetting backup allows client connections, so any
- // primary state should be set up here before backup.reset()
- backup.reset();
- QPID_LOG(notice, "HA: Promoted to primary");
- mgmtObject->set_status(PRIMARY);
+ switch (status) {
+ case JOINING: activate(l); break;
+ case CATCHUP:
+ // FIXME aconway 2012-04-27: don't allow promotion in catch-up
+ // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted.");
+ // throw Exception("Still catching up, cannot be promoted.");
+ promoting(l); // FIXME aconway 2012-04-27: disallow
+ break;
+ case READY: promoting(l); break;
+ case PROMOTING: break;
+ case ACTIVE: break;
+ case STANDALONE: break;
}
break;
}
@@ -109,12 +153,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
}
case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
- break;
+ break;
}
case _qmf::HaBroker::METHOD_REPLICATE: {
_qmf::ArgsHaBrokerReplicate& bq_args =
dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
- QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+ QPID_LOG(debug, logPrefix << "replicate individual queue "
+ << bq_args.i_queue << " from " << bq_args.i_broker);
boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
Url url(bq_args.i_broker);
@@ -128,7 +173,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args,
boost::shared_ptr<broker::Link> link = result.first;
link->setUrl(url);
// Create a queue replicator
- boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+ boost::shared_ptr<QueueReplicator> qr(
+ new QueueReplicator(LogPrefix(*this, queue->getName()), queue, link));
qr->activate();
broker.getExchanges().registerExchange(qr);
break;
@@ -152,12 +198,12 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) {
mgmtObject->set_publicBrokers(url.str());
knownBrokers.clear();
knownBrokers.push_back(url);
- QPID_LOG(debug, "HA: Setting client URL to: " << url);
+ QPID_LOG(debug, logPrefix << "Setting client URL to: " << url);
}
void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) {
if (url.empty()) throw Url::Invalid("HA broker URL is empty");
- QPID_LOG(debug, "HA: Setting broker URL to: " << url);
+ QPID_LOG(debug, logPrefix << "Setting broker URL to: " << url);
brokerUrl = url;
mgmtObject->set_brokers(brokerUrl.str());
if (backup.get()) backup->setBrokerUrl(brokerUrl);
@@ -174,9 +220,79 @@ std::vector<Url> HaBroker::getKnownBrokers() const {
return knownBrokers;
}
-void HaBroker::shutdown(const std::string& message) {
- QPID_LOG(critical, "Shutting down: " << message);
+void HaBroker::shutdown() {
+ QPID_LOG(critical, logPrefix << "Critical error, shutting down.");
broker.shutdown();
}
+BrokerStatus HaBroker::getStatus() const {
+ sys::Mutex::ScopedLock l(lock);
+ return status;
+}
+
+void HaBroker::setStatus(BrokerStatus newStatus) {
+ sys::Mutex::ScopedLock l(lock);
+ setStatus(newStatus, l);
+}
+
+namespace {
+bool checkTransition(BrokerStatus from, BrokerStatus to) {
+ // Legal state transitions. Initial state is JOINING, ACTIVE is terminal.
+ static const BrokerStatus TRANSITIONS[][2] = {
+ { CATCHUP, PROMOTING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior
+ { JOINING, CATCHUP }, // Connected to primary
+ { JOINING, ACTIVE }, // Chosen as initial primary.
+ { CATCHUP, READY }, // Caught up all queues, ready to take over.
+ { READY, PROMOTING }, // Chosen as new primary
+ { PROMOTING, ACTIVE }
+ };
+ static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]);
+ for (size_t i = 0; i < N; ++i) {
+ if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to)
+ return true;
+ }
+ return false;
+}
+} // namespace
+
+void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) {
+ QPID_LOG(notice, logPrefix << "Status change: "
+ << printable(status) << " -> " << printable(newStatus));
+ bool legal = checkTransition(status, newStatus);
+ assert(legal);
+ if (!legal) {
+ QPID_LOG(critical, logPrefix << "Illegal state transition: "
+ << printable(status) << " -> " << printable(newStatus));
+ shutdown();
+ }
+ status = newStatus;
+ statusChanged(l);
+}
+
+void HaBroker::statusChanged(sys::Mutex::ScopedLock&) {
+ mgmtObject->set_status(printable(status).str());
+ // Set the backup-related properties for newly created links.
+ framing::FieldTable ft = broker.getLinkClientProperties();
+ if (isBackup(status))
+ ft.setInt(ConnectionExcluder::BACKUP_TAG, 1);
+ else
+ ft.erase(ConnectionExcluder::BACKUP_TAG);
+ broker.setLinkClientProperties(ft);
+}
+
+void HaBroker::activatedBackup(const std::string& queue) {
+ sys::Mutex::ScopedLock l(lock);
+ activeBackups.insert(queue);
+}
+
+void HaBroker::deactivatedBackup(const std::string& queue) {
+ sys::Mutex::ScopedLock l(lock);
+ activeBackups.erase(queue);
+}
+
+std::set<std::string> HaBroker::getActiveBackups() const {
+ sys::Mutex::ScopedLock l(lock);
+ return activeBackups;
+}
+
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h
index 99b30fd36b..b3f2c1a941 100644
--- a/qpid/cpp/src/qpid/ha/HaBroker.h
+++ b/qpid/cpp/src/qpid/ha/HaBroker.h
@@ -22,19 +22,30 @@
*
*/
+#include "Enum.h"
+#include "LogPrefix.h"
#include "Settings.h"
#include "qpid/Url.h"
#include "qpid/sys/Mutex.h"
#include "qmf/org/apache/qpid/ha/HaBroker.h"
#include "qpid/management/Manageable.h"
+#include "qpid/types/Variant.h"
#include <memory>
+#include <set>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
class Broker;
}
+namespace framing {
+class FieldTable;
+}
+
namespace ha {
class Backup;
+class ConnectionExcluder;
+class Primary;
/**
* HA state and actions associated with a broker.
@@ -55,26 +66,58 @@ class HaBroker : public management::Manageable
broker::Broker& getBroker() { return broker; }
const Settings& getSettings() const { return settings; }
- // Log a critical error message and shut down the broker.
- void shutdown(const std::string& message);
+ /** Shut down the broker. Caller should log a critical error message. */
+ void shutdown();
+
+ BrokerStatus getStatus() const;
+ void setStatus(BrokerStatus);
+ void activate();
+
+ Backup* getBackup() { return backup.get(); }
+
+ // Translate replicate levels.
+ ReplicateLevel replicateLevel(const std::string& str);
+ ReplicateLevel replicateLevel(const framing::FieldTable& f);
+ ReplicateLevel replicateLevel(const types::Variant::Map& m);
+
+ // Keep track of the set of actively replicated queues on a backup
+ // so that it can be transferred to the Primary on promotion.
+ typedef std::set<std::string> QueueNames;
+ void activatedBackup(const std::string& queue);
+ void deactivatedBackup(const std::string& queue);
+ QueueNames getActiveBackups() const;
+
+ boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; }
private:
void setClientUrl(const Url&, const sys::Mutex::ScopedLock&);
void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&);
void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&);
void updateClientUrl(const sys::Mutex::ScopedLock&);
+
bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); }
+
+ void setStatus(BrokerStatus, sys::Mutex::ScopedLock&);
+ void promoting(sys::Mutex::ScopedLock&);
+ void activate(sys::Mutex::ScopedLock&);
+ void statusChanged(sys::Mutex::ScopedLock&);
+
std::vector<Url> getKnownBrokers() const;
+ LogPrefix logPrefix;
broker::Broker& broker;
const Settings settings;
- sys::Mutex lock;
+ mutable sys::Mutex lock;
std::auto_ptr<Backup> backup;
+ std::auto_ptr<Primary> primary;
qmf::org::apache::qpid::ha::HaBroker* mgmtObject;
Url clientUrl, brokerUrl;
std::vector<Url> knownBrokers;
size_t expectedBackups;
+ BrokerStatus status;
+ QueueNames activeBackups;
+ boost::shared_ptr<ConnectionExcluder> excluder;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
index 24977775bb..b6504c03b2 100644
--- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp
+++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp
@@ -40,6 +40,7 @@ struct Options : public qpid::Options {
("ha-replicate",
optValue(settings.replicateDefault, "LEVEL"),
"Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'")
+ // FIXME aconway 2012-04-30: required-backups? Also need timeout.
("ha-expected-backups", optValue(settings.expectedBackups, "N"),
"Number of backups expected to be active in the HA cluster.")
("ha-username", optValue(settings.username, "USER"),
@@ -77,6 +78,6 @@ struct HaPlugin : public Plugin {
}
};
-static HaPlugin instance; // Static initialization.
+HaPlugin instance; // Static initialization.
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
new file mode 100644
index 0000000000..685e623287
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "LogPrefix.h"
+#include "HaBroker.h"
+#include <iostream>
+
+namespace qpid {
+namespace ha {
+
+LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), status(0) {
+ if (queue.size()) tail = " queue " + queue;
+}
+
+LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {}
+
+std::ostream& operator<<(std::ostream& o, const LogPrefix& l) {
+ return o << "HA("
+ << printable(l.status ? *l.status : l.haBroker->getStatus())
+ << ")" << l.tail << ": ";
+}
+
+}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.h b/qpid/cpp/src/qpid/ha/LogPrefix.h
index c11e03f0ce..b45145fbb7 100644
--- a/qpid/cpp/src/qpid/ha/ReplicateLevel.h
+++ b/qpid/cpp/src/qpid/ha/LogPrefix.h
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_REPLICATELEVEL_H
-#define QPID_HA_REPLICATELEVEL_H
+#ifndef QPID_HA_LOGPREFIX_H
+#define QPID_HA_LOGPREFIX_H
/*
*
@@ -22,31 +22,36 @@
*
*/
-#include <string>
+#include "Enum.h"
#include <iosfwd>
+#include <string>
namespace qpid {
namespace ha {
-enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL };
+class HaBroker;
/**
- * If str is a valid replicate level, set out and return true.
+ * Standard information to prefix log messages.
*/
-bool replicateLevel(const std::string& str, ReplicateLevel& out);
+class LogPrefix
+{
+ public:
+ /** For use by all classes other than HaBroker */
+ LogPrefix(HaBroker& hb, const std::string& queue=std::string());
-/**
- *@return enum corresponding to string level.
- *@throw qpid::Exception if level is not a valid replication level.
- */
-ReplicateLevel replicateLevel(const std::string& level);
+ /** For use by the HaBroker itself. */
+ LogPrefix(BrokerStatus&);
-/**@return string form of replicate level */
-std::string str(ReplicateLevel l);
+ private:
+ HaBroker* haBroker;
+ BrokerStatus* status;
+ std::string tail;
+ friend std::ostream& operator<<(std::ostream& o, const LogPrefix& l);
+};
-std::ostream& operator<<(std::ostream&, ReplicateLevel);
-std::istream& operator>>(std::istream&, ReplicateLevel&);
+std::ostream& operator<<(std::ostream& o, const LogPrefix& l);
-}} // namespaces qpid::ha
+}} // namespace qpid::ha
-#endif /*!QPID_HA_REPLICATELEVEL_H*/
+#endif /*!QPID_HA_LOGPREFIX_H*/
diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp
index bf17d27ca3..27cd4d7111 100644
--- a/qpid/cpp/src/qpid/ha/Primary.cpp
+++ b/qpid/cpp/src/qpid/ha/Primary.cpp
@@ -52,7 +52,6 @@ Primary::Primary(HaBroker& b) :
QPID_LOG(debug, logPrefix << "Need backup of " << *i
<< ", " << unready << " unready queues");
}
- QPID_LOG(critical, "FIXME Primary " << queues.size() << " queues");
if (queues.empty())
activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor.
else {
diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h
new file mode 100644
index 0000000000..5a1a61ae75
--- /dev/null
+++ b/qpid/cpp/src/qpid/ha/Primary.h
@@ -0,0 +1,71 @@
+#ifndef QPID_HA_PRIMARY_H
+#define QPID_HA_PRIMARY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "LogPrefix.h"
+#include "qpid/sys/Mutex.h"
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <string>
+
+namespace qpid {
+
+namespace broker {
+class Queue;
+}
+
+namespace ha {
+class HaBroker;
+
+/**
+ * State associated with a primary broker. Tracks replicating
+ * subscriptions to determine when primary is ready.
+ *
+ * THREAD SAFE: addReplica is called in arbitray threads.
+ */
+class Primary
+{
+ public:
+ static Primary* get() { return instance; }
+ Primary(HaBroker& b);
+
+ void addReplica(const std::string& q);
+ void removeReplica(const std::string& q);
+
+ private:
+ typedef std::map<std::string, size_t> QueueCounts;
+
+ void activate(sys::Mutex::ScopedLock&);
+
+ sys::Mutex lock;
+ HaBroker& haBroker;
+ LogPrefix logPrefix;
+ QueueCounts queues;
+ size_t expected, unready;
+ bool activated;
+
+ static Primary* instance;
+};
+}} // namespace qpid::ha
+
+#endif /*!QPID_HA_PRIMARY_H*/
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
index c6af388d9d..8eb7e441a2 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
@@ -19,6 +19,7 @@
*
*/
+#include "Counter.h"
#include "QueueReplicator.h"
#include "ReplicatingSubscription.h"
#include "qpid/broker/Bridge.h"
@@ -44,19 +45,31 @@ namespace ha {
using namespace broker;
using namespace framing;
-const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event");
-const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event");
+const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:");
+const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
+const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position");
+const std::string QueueReplicator::READY_EVENT_KEY(QPID_HA_EVENT_PREFIX+"ready");
std::string QueueReplicator::replicatorName(const std::string& queueName) {
return QPID_REPLICATOR_ + queueName;
}
-QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l)
- : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l)
+bool QueueReplicator::isEventKey(const std::string key) {
+ const std::string& prefix = QPID_HA_EVENT_PREFIX;
+ bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0;
+ return ret;
+}
+
+QueueReplicator::QueueReplicator(const LogPrefix& lp,
+ boost::shared_ptr<Queue> q,
+ boost::shared_ptr<Link> l,
+ Counter* counter)
+ : Exchange(replicatorName(q->getName()), 0, q->getBroker()),
+ logPrefix(lp), queue(q), link(l),
+ unreadyCount(counter)
{
framing::Uuid uuid(true);
bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
- logPrefix = "HA: Backup of " + queue->getName() + ": ";
QPID_LOG(info, logPrefix << "Created");
}
@@ -103,6 +116,8 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
framing::FieldTable settings;
+ if (unreadyCount) ++(*unreadyCount); // We are unready.
+
// FIXME aconway 2011-12-09: Failover optimization removed.
// There was code here to re-use messages already on the backup
// during fail-over. This optimization was removed to simplify
@@ -149,13 +164,18 @@ void QueueReplicator::route(Deliverable& msg)
try {
const std::string& key = msg.getMessage().getRoutingKey();
sys::Mutex::ScopedLock l(lock);
- if (key == DEQUEUE_EVENT_KEY) {
+ if (!isEventKey(key)) {
+ msg.deliverTo(queue);
+ QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
+ }
+ else if (key == DEQUEUE_EVENT_KEY) {
SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues);
//TODO: should be able to optimise the following
for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++)
dequeue(*i, l);
- } else if (key == POSITION_EVENT_KEY) {
+ }
+ else if (key == POSITION_EVENT_KEY) {
SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage());
QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition()
<< " to " << position);
@@ -165,10 +185,12 @@ void QueueReplicator::route(Deliverable& msg)
<< queue->getPosition() << " to " << position));
}
queue->setPosition(position);
- } else {
- msg.deliverTo(queue);
- QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition());
}
+ else if (key == READY_EVENT_KEY) {
+ QPID_LOG(info, logPrefix << "caught up at " << queue->getPosition());
+ if (unreadyCount) --(*unreadyCount); // We are now ready.
+ }
+ // Ignore unknown event keys, may be introduced in later versions.
}
catch (const std::exception& e) {
QPID_LOG(critical, logPrefix << "Replication failed: " << e.what());
diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h
index 26fb9456d1..db4a901274 100644
--- a/qpid/cpp/src/qpid/ha/QueueReplicator.h
+++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h
@@ -21,6 +21,8 @@
* under the License.
*
*/
+
+#include "LogPrefix.h"
#include "qpid/broker/Exchange.h"
#include "qpid/framing/SequenceSet.h"
#include <boost/enable_shared_from_this.hpp>
@@ -39,6 +41,8 @@ class Deliverable;
namespace ha {
+class Counter;
+
/**
* Exchange created on a backup broker to replicate a queue on the primary.
*
@@ -54,9 +58,19 @@ class QueueReplicator : public broker::Exchange,
public:
static const std::string DEQUEUE_EVENT_KEY;
static const std::string POSITION_EVENT_KEY;
+ static const std::string READY_EVENT_KEY;
static std::string replicatorName(const std::string& queueName);
+ /** Test if a string is an event key */
+ static bool isEventKey(const std::string key);
+
+ /**
+ * @para unreadyCount can be 0 if we don't need a ready count from this queue.
+ */
+ QueueReplicator(const LogPrefix&,
+ boost::shared_ptr<broker::Queue> q,
+ boost::shared_ptr<broker::Link> l,
+ Counter* unreadyCount=0);
- QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l);
~QueueReplicator();
void activate(); // Call after ctor
@@ -73,12 +87,13 @@ class QueueReplicator : public broker::Exchange,
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
- std::string logPrefix;
+ LogPrefix logPrefix;
std::string bridgeName;
sys::Mutex lock;
boost::shared_ptr<broker::Queue> queue;
boost::shared_ptr<broker::Link> link;
boost::shared_ptr<broker::Bridge> bridge;
+ Counter* unreadyCount;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
deleted file mode 100644
index 4981577225..0000000000
--- a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "ReplicateLevel.h"
-#include "qpid/Exception.h"
-#include "qpid/Msg.h"
-#include <iostream>
-#include <assert.h>
-
-namespace qpid {
-namespace ha {
-
-using namespace std;
-
-// Note replicateLevel is called during plugin-initialization which
-// happens in the static construction phase so these constants need
-// to be POD, they can't be class objects
-//
-namespace {
-const char* S_NONE="none";
-const char* S_CONFIGURATION="configuration";
-const char* S_ALL="all";
-}
-
-bool replicateLevel(const string& level, ReplicateLevel& out) {
- if (level == S_NONE) { out = RL_NONE; return true; }
- if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; }
- if (level == S_ALL) { out = RL_ALL; return true; }
- return false;
-}
-
-ReplicateLevel replicateLevel(const string& level) {
- ReplicateLevel rl;
- if (!replicateLevel(level, rl))
- throw Exception("Invalid value for replication level: "+level);
- return rl;
-}
-
-string str(ReplicateLevel l) {
- const char* names[] = { S_NONE, S_CONFIGURATION, S_ALL };
- if (l > RL_ALL)
- throw Exception(QPID_MSG("Invalid value for replication level: " << l));
- return names[l];
-}
-
-ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); }
-
-istream& operator>>(istream& i, ReplicateLevel& rl) {
- string str;
- i >> str;
- rl = replicateLevel(str);
- return i;
-}
-
-}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
index 91a4538bc4..9067063fcf 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
@@ -20,6 +20,7 @@
*/
#include "ReplicatingSubscription.h"
+#include "Primary.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/SessionContext.h"
#include "qpid/broker/ConnectionState.h"
@@ -64,14 +65,25 @@ ReplicatingSubscription::Factory::create(
boost::shared_ptr<ReplicatingSubscription> rs;
if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
rs.reset(new ReplicatingSubscription(
+ haBroker,
parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments));
queue->addObserver(rs);
+ // NOTE: readyPosition must be set _after_ addObserver, so
+ // messages can't be enqueued after setting readyPosition
+ // but before registering the observer.
+ rs->readyPosition = queue->getPosition();
+ QPID_LOG(debug, rs->logPrefix << "created backup subscription, catching up to "
+ << QueuedMessage(rs->getQueue().get(), 0, rs->readyPosition)
+ << rs->logSuffix);
+
+
}
return rs;
}
ReplicatingSubscription::ReplicatingSubscription(
+ HaBroker& haBroker,
SemanticState* parent,
const string& name,
Queue::shared_ptr queue,
@@ -84,15 +96,15 @@ ReplicatingSubscription::ReplicatingSubscription(
const framing::FieldTable& arguments
) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag,
resumeId, resumeTtl, arguments),
+ logPrefix(haBroker, queue->getName()),
events(new Queue(mask(name))),
- consumer(new DelegatingConsumer(*this))
+ consumer(new DelegatingConsumer(*this)),
+ sentReady(false)
{
- // Separate the remote part from a "local-remote" address.
+ // Separate the remote part from a "local-remote" address for logging.
string address = parent->getSession().getConnection().getUrl();
size_t i = address.find('-');
if (i != string::npos) address = address.substr(i+1);
- logPrefix = "HA: Primary ";
- stringstream ss;
logSuffix = " (" + address + ")";
// FIXME aconway 2011-12-09: Failover optimization removed.
@@ -102,8 +114,6 @@ ReplicatingSubscription::ReplicatingSubscription(
// can be re-introduced later. Last revision with the optimization:
// r1213258 | QPID-3603: Fix QueueReplicator subscription parameters.
- QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix);
-
// FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
// so we will start consuming from the lowest numbered message.
// This is incorrect if the sequence number wraps around, but
@@ -111,34 +121,50 @@ ReplicatingSubscription::ReplicatingSubscription(
}
// Message is delivered in the subscription's connection thread.
-bool ReplicatingSubscription::deliver(QueuedMessage& m) {
+bool ReplicatingSubscription::deliver(QueuedMessage& qm) {
try {
// Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue == getQueue().get()) {
+ if (qm.queue == getQueue().get()) {
+ QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix);
sys::Mutex::ScopedLock l(lock);
- if (position != m.position)
+ if (position != qm.position)
throw Exception(
QPID_MSG("Expected position " << position
- << " but got " << m.position));
- // m.position is the position of the newly enqueued m on the local queue.
- // backupPosition is latest position on the backup queue (before enqueueing m.)
- if (m.position <= backupPosition)
+ << " but got " << qm.position));
+ // qm.position is the position of the newly enqueued qm on the local queue.
+ // backupPosition is latest position on backup queue before enqueueing qm.
+ if (qm.position <= backupPosition)
throw Exception(
QPID_MSG("Expected position > " << backupPosition
- << " but got " << m.position));
+ << " but got " << qm.position));
- if (m.position - backupPosition > 1) {
+ if (qm.position - backupPosition > 1) {
// Position has advanced because of messages dequeued ahead of us.
- SequenceNumber send(m.position);
- --send; // Send the position before m was enqueued.
+ SequenceNumber send(qm.position);
+ --send; // Send the position before qm was enqueued.
sendPositionEvent(send, l);
}
- backupPosition = m.position;
- QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix);
+ backupPosition = qm.position;
+ // Deliver the message
+ bool delivered = ConsumerImpl::deliver(qm);
+
+ // We have advanced to the initial position, backup is ready.
+ if (!sentReady && qm.position >= readyPosition) {
+ sendReadyEvent(l);
+ sentReady = true;
+ QPID_LOG(info, logPrefix << "Caught up at " << qm
+ << logSuffix);
+ // If we are in a primary broker, notify that a subscription is ready.
+ // FIXME aconway 2012-04-30: rename addReplica->readyReplica
+ if (Primary::get())
+ Primary::get()->addReplica(qm.queue->getName());
+ }
+ return delivered;
}
- return ConsumerImpl::deliver(m);
+ else
+ return ConsumerImpl::deliver(qm); // Message is for internal event queue.
} catch (const std::exception& e) {
- QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName()
+ QPID_LOG(critical, logPrefix << "error replicating " << qm
<< logSuffix << ": " << e.what());
throw;
}
@@ -154,7 +180,7 @@ void ReplicatingSubscription::complete(
const QueuedMessage& qm, const sys::Mutex::ScopedLock&)
{
// Handle completions for the subscribed queue, not the internal event queue.
- if (qm.queue && qm.queue == getQueue().get()) {
+ if (qm.queue == getQueue().get()) {
QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix);
Delayed::iterator i= delayed.find(qm.position);
// The same message can be completed twice, by acknowledged and
@@ -179,7 +205,6 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) {
delayed[qm.position] = qm;
}
-
// Function to complete a delayed message, called by cancel()
void ReplicatingSubscription::cancelComplete(
const Delayed::value_type& v, const sys::Mutex::ScopedLock&)
@@ -195,7 +220,8 @@ void ReplicatingSubscription::cancel()
boost::dynamic_pointer_cast<QueueObserver>(shared_from_this()));
{
sys::Mutex::ScopedLock l(lock);
- QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix);
+ QPID_LOG(debug, logPrefix << "cancel backup subscription to "
+ << getQueue()->getName() << logSuffix);
for_each(delayed.begin(), delayed.end(),
boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l)));
delayed.clear();
@@ -255,6 +281,13 @@ void ReplicatingSubscription::sendPositionEvent(
sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
}
+// Called with lock held. Called in subscription's connection thread.
+void ReplicatingSubscription::sendReadyEvent(const sys::Mutex::ScopedLock&l )
+{
+ framing::Buffer buffer;
+ sendEvent(QueueReplicator::READY_EVENT_KEY, buffer, l);
+}
+
void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer,
const sys::Mutex::ScopedLock&)
{
@@ -300,7 +333,7 @@ bool ReplicatingSubscription::doDispatch()
ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {}
ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {}
-bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); }
+bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& qm) { return delegate.deliver(qm); }
void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); }
bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); }
bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); }
diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
index f9176915f6..952c970f41 100644
--- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
+++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
@@ -51,12 +51,17 @@ namespace ha {
*
* THREAD SAFE: Used as a consumer in subscription's connection
* thread, and as a QueueObserver in arbitrary connection threads.
+ *
+ * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
+ *
*/
class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
public broker::QueueObserver
{
public:
struct Factory : public broker::ConsumerFactory {
+ HaBroker& haBroker;
+ Factory(HaBroker& hb) : haBroker(hb) {}
boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
@@ -68,7 +73,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
// Argument names for consume command.
static const std::string QPID_REPLICATING_SUBSCRIPTION;
- ReplicatingSubscription(broker::SemanticState* parent,
+ ReplicatingSubscription(HaBroker&,
+ broker::SemanticState* parent,
const std::string& name, boost::shared_ptr<broker::Queue> ,
bool ack, bool acquire, bool exclusive, const std::string& tag,
const std::string& resumeId, uint64_t resumeTtl,
@@ -77,13 +83,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
~ReplicatingSubscription();
// QueueObserver overrides.
- bool deliver(broker::QueuedMessage& msg);
void enqueued(const broker::QueuedMessage&);
void dequeued(const broker::QueuedMessage&);
void acquired(const broker::QueuedMessage&) {}
void requeued(const broker::QueuedMessage&) {}
// Consumer overrides.
+ bool deliver(broker::QueuedMessage& msg);
void cancel();
void acknowledged(const broker::QueuedMessage&);
bool browseAcquired() const { return true; }
@@ -94,17 +100,22 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
bool doDispatch();
private:
typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed;
- std::string logPrefix, logSuffix;
+
+ LogPrefix logPrefix;
+ std::string logSuffix;
boost::shared_ptr<broker::Queue> events;
boost::shared_ptr<broker::Consumer> consumer;
Delayed delayed;
framing::SequenceSet dequeues;
framing::SequenceNumber backupPosition;
+ framing::SequenceNumber readyPosition;
+ bool sentReady;
void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&);
void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&);
void sendDequeueEvent(const sys::Mutex::ScopedLock&);
void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&);
+ void sendReadyEvent(const sys::Mutex::ScopedLock&);
void sendEvent(const std::string& key, framing::Buffer&,
const sys::Mutex::ScopedLock&);
@@ -126,6 +137,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl,
private:
ReplicatingSubscription& delegate;
};
+
+ friend class Factory;
};
diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h
index bf70c3f3f7..08d42471b8 100644
--- a/qpid/cpp/src/qpid/ha/Settings.h
+++ b/qpid/cpp/src/qpid/ha/Settings.h
@@ -22,7 +22,7 @@
*
*/
-#include "ReplicateLevel.h"
+#include "Enum.h"
#include <string>
namespace qpid {
@@ -34,12 +34,14 @@ namespace ha {
class Settings
{
public:
- Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {}
+ Settings() : cluster(false), expectedBackups(0), replicateDefault(NONE)
+ {}
+
bool cluster; // True if we are a cluster member.
std::string clientUrl;
std::string brokerUrl;
size_t expectedBackups;
- ReplicateLevel replicateDefault;
+ Enum<ReplicateLevel> replicateDefault;
std::string username, password, mechanism;
private:
};
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 8255fbe9ac..257ac68b74 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -77,17 +77,19 @@ def error_line(filename, n=1):
return ":\n" + "".join(result)
def retry(function, timeout=10, delay=.01):
- """Call function until it returns True or timeout expires.
- Double the delay for each retry. Return True if function
- returns true, False if timeout expires."""
+ """Call function until it returns a true value or timeout expires.
+ Double the delay for each retry. Returns what function returns if
+ true, None if timeout expires."""
deadline = time.time() + timeout
- while not function():
+ ret = None
+ while not ret:
+ ret = function()
remaining = deadline - time.time()
if remaining <= 0: return False
delay = min(delay, remaining)
time.sleep(delay)
delay *= 2
- return True
+ return ret
class AtomicCounter:
def __init__(self):
@@ -298,9 +300,9 @@ class Broker(Popen):
# Read port from broker process stdout if not already read.
if (self._port == 0):
try: self._port = int(self.stdout.readline())
- except ValueError:
- raise Exception("Can't get port for broker %s (%s)%s" %
- (self.name, self.pname, error_line(self.log,5)))
+ except ValueError, e:
+ raise Exception("Can't get port for broker %s (%s)%s: %s" %
+ (self.name, self.pname, error_line(self.log,5), e))
return self._port
def unexpected(self,msg):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 827cb7dca9..15137a0c5f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -28,13 +28,23 @@ from qpidtoollibs import BrokerAgent
log = getLogger(__name__)
+class QmfHaBroker(object):
+ def __init__(self, address):
+ self.connection = Connection.establish(
+ address, client_properties={"qpid.ha-admin":1})
+ self.qmf = BrokerAgent(self.connection)
+ self.ha_broker = self.qmf.getHaBroker()
+ if not self.ha_broker:
+ raise Exception("HA module is not loaded on broker at %s"%address)
+
class HaBroker(Broker):
def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
ha_replicate="all", **kwargs):
assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
args = copy(args)
args += ["--load-module", BrokerTest.ha_lib,
- "--log-enable=info+", "--log-enable=debug+:ha::",
+ "--log-enable=info+",
+ "--log-enable=debug+:ha::",
# FIXME aconway 2012-02-13: workaround slow link failover.
"--link-maintenace-interval=0.1",
"--ha-cluster=%s"%ha_cluster]
@@ -42,32 +52,31 @@ class HaBroker(Broker):
args += [ "--ha-replicate=%s"%ha_replicate ]
if broker_url: args.extend([ "--ha-brokers", broker_url ])
Broker.__init__(self, test, args, **kwargs)
- self.commands=os.getenv("PYTHON_COMMANDS")
- assert os.path.isdir(self.commands)
+ self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+ assert os.path.exists(self.qpid_ha_path)
+ self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+ assert os.path.exists(self.qpid_config_path)
getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+ self.qpid_ha_script=import_script(self.qpid_ha_path)
- def promote(self):
- assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
-
- def set_client_url(self, url):
- assert os.system(
- "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
+ def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args)
- def set_broker_url(self, url):
- assert os.system(
- "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+ def promote(self): self.qpid_ha(["promote"])
+ def set_client_url(self, url): self.qpid_ha(["set", "--public-brokers", url])
+ def set_broker_url(self, url): self.qpid_ha(["set", "--brokers", url])
+ def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+ def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status
- def replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+ def qpid_config(self, args):
+ assert subprocess.call(
+ [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
def config_replicate(self, from_broker, queue):
- assert os.system(
- "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+ self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
def config_declare(self, queue, replication):
- assert os.system(
- "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+ self.qpid_config(["add", "queue", queue, "--replicate", replication])
def connect_admin(self, **kwargs):
return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
@@ -86,17 +95,47 @@ class HaBroker(Broker):
assert_browse_retry(bs, queue, expected, **kwargs)
finally: bs.connection.close()
+ def assert_connect_fail(self):
+ try:
+ self.connect()
+ self.test.fail("Expected ConnectionError")
+ except ConnectionError: pass
+
+ def connect_retry(self):
+ def try_connect():
+ try: return self.connect()
+ except ConnectionError: return None
+ c = retry(try_connect)
+ if c: return c
+ else: self.test.fail("Failed to connect")
+
class HaCluster(object):
_cluster_count = 0
def __init__(self, test, n, **kwargs):
"""Start a cluster of n brokers"""
self.test = test
- self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+ self.kwargs = kwargs
+ self._brokers = []
+ self.id = HaCluster._cluster_count
HaCluster._cluster_count += 1
+ for i in xrange(n): self.start(False)
+ self.update_urls()
+ self[0].promote()
+
+ def start(self, update_urls=True):
+ """Start a new broker in the cluster"""
+ b = HaBroker(
+ self.test,
+ name="broker%s-%s"%(self.id, len(self._brokers)),
+ **self.kwargs)
+ self._brokers.append(b)
+ if update_urls: self.update_urls()
+ return b
+
+ def update_urls(self):
self.url = ",".join([b.host_port() for b in self])
for b in self: b.set_broker_url(self.url)
- self[0].promote()
def connect(self, i):
"""Connect with reconnect_urls"""
@@ -108,11 +147,15 @@ class HaCluster(object):
self[i].expect = EXPECT_EXIT_FAIL
self[(i+1) % len(self)].promote()
+ def restart(self, i):
+ b = self._brokers[i]
+ self._brokers[i] = HaBroker(
+ self.test, name=b.name, port=b.port(), broker_url=self.url, **self.kwargs)
+
def bounce(self, i):
"""Stop and restart a broker in a cluster."""
self.kill(i)
- b = self[i]
- self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+ self.restart(i)
# Behave like a list of brokers.
def __len__(self): return len(self._brokers)
@@ -344,6 +387,7 @@ class ReplicationTests(BrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
+ getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
primary = HaBroker(self, name="primary", ha_cluster=False)
pc = primary.connect()
ps = pc.session().sender("q;{create:always}")
@@ -559,6 +603,26 @@ class ReplicationTests(BrokerTest):
test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
+ def test_promoting(self):
+ """Verify that the primary broker does not go active until expected
+ backups have connected or timeout expires."""
+ cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"])
+ c = cluster[0].connect()
+ for i in xrange(10):
+ s = c.session().sender("q%s;{create:always}"%i)
+ for j in xrange(100): s.send(str(j))
+ cluster.kill(0) # Fail over to 1
+ cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients.
+ cluster.restart(0)
+ c = cluster[1].connect_retry()
+ cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]);
+
+ # Verify in logs that all queue catch-up happened before the transition to active.
+ log = open(cluster[1].log).read()
+ i = log.find("Status change: promoting -> active")
+ self.failIf(i < 0)
+ self.assertEqual(log.find("caught up", i), -1)
+
def fairshare(msgs, limit, levels):
"""
Generator to return prioritised messages in expected order for a given fairshare limit
@@ -602,14 +666,15 @@ class LongTests(BrokerTest):
else: return 3 # Default is to be quick
- def disable_test_failover(self):
+ def disable_test_failover_send_receive(self):
"""Test failover with continuous send-receive"""
# FIXME aconway 2012-02-03: fails due to dropped messages,
# known issue: sending messages to new primary before
# backups are ready. Enable when fixed.
# Start a cluster, all members will be killed during the test.
- brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+ brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL,
+ args=["--ha-expected-backups=2"])
for name in ["ha0","ha1","ha2"] ]
url = ",".join([b.host_port() for b in brokers])
for b in brokers: b.set_broker_url(url)
@@ -620,30 +685,31 @@ class LongTests(BrokerTest):
receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
receiver.start()
sender.start()
- # Wait for sender & receiver to get up and running
- assert retry(lambda: receiver.received > 100)
- # Kill and restart brokers in a cycle:
- endtime = time.time() + self.duration()
- i = 0
- while time.time() < endtime or i < 3: # At least 3 iterations
- sender.sender.assert_running()
- receiver.receiver.assert_running()
- port = brokers[i].port()
- brokers[i].kill()
- brokers.append(
- HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
- expect=EXPECT_EXIT_FAIL))
- i += 1
- brokers[i].promote()
- n = receiver.received # Verify we're still running
- def enough():
- receiver.check() # Verify no exceptions
- return receiver.received > n + 100
- assert retry(enough, timeout=5)
-
- sender.stop()
- receiver.stop()
- for b in brokers[i:]: b.kill()
+ try:
+ # Wait for sender & receiver to get up and running
+ assert retry(lambda: receiver.received > 100)
+ # Kill and restart brokers in a cycle:
+ endtime = time.time() + self.duration()
+ i = 0
+ while time.time() < endtime or i < 3: # At least 3 iterations
+ sender.sender.assert_running()
+ receiver.receiver.assert_running()
+ port = brokers[i].port()
+ brokers[i].kill()
+ brokers.append(
+ HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
+ expect=EXPECT_EXIT_FAIL))
+ i += 1
+ brokers[i].promote()
+ n = receiver.received # Verify we're still running
+ def enough():
+ receiver.check() # Verify no exceptions
+ return receiver.received > n + 100
+ assert retry(enough, timeout=5)
+ finally:
+ sender.stop()
+ receiver.stop()
+ for b in brokers[i:]: b.kill()
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)