diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-15 21:05:34 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-15 21:05:34 +0000 |
| commit | 55305747e6e7f931756bfa21460c37e350f5ea0f (patch) | |
| tree | c6b826cac45092f95c428671a13024aad62b3400 /qpid/cpp | |
| parent | 80a0832a2fe775ff217e6353f003226eb3f18d89 (diff) | |
| download | qpid-python-55305747e6e7f931756bfa21460c37e350f5ea0f.tar.gz | |
QPID-3603: HA broker backup/primary ready checks.
- Introduce HA broker state machien
- Inform backup queues when ready.
- Incomplete implementation of backup ready check.
- does not count correctly after a failover, see countUnready.
- Existing replicator bridges updated out of sync with BrokerReplicator initialize.
- Does not handle multi-messages responses.
- Newly promoted HA primary waits for backups to be ready before accepting clients.
- Uniform log prefixes for HA messages.
- qpid-ha tests, call qpid-ha python code directly.
- Move excluder from Backup to HaBroker, it is also used in PROMOTING.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1338889 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
29 files changed, 1058 insertions, 386 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index cc882f80ca..8e4e9dae34 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -628,20 +628,25 @@ if (BUILD_HA) set (ha_SOURCES qpid/ha/Backup.cpp qpid/ha/Backup.h + qpid/ha/BrokerReplicator.cpp + qpid/ha/BrokerReplicator.h + qpid/ha/ConnectionExcluder.cpp + qpid/ha/ConnectionExcluder.h + qpid/ha/Counter.h + qpid/ha/Enum.cpp + qpid/ha/Enum.h qpid/ha/HaBroker.cpp qpid/ha/HaBroker.h qpid/ha/HaPlugin.cpp - qpid/ha/Settings.h - qpid/ha/QueueReplicator.h + qpid/ha/LogPrefix.cpp + qpid/ha/LogPrefix.h + qpid/ha/Primary.cpp + qpid/ha/Primary.h qpid/ha/QueueReplicator.cpp - qpid/ha/ReplicateLevel.h - qpid/ha/ReplicateLevel.cpp - qpid/ha/ReplicatingSubscription.h + qpid/ha/QueueReplicator.h qpid/ha/ReplicatingSubscription.cpp - qpid/ha/BrokerReplicator.cpp - qpid/ha/BrokerReplicator.h - qpid/ha/ConnectionExcluder.cpp - qpid/ha/ConnectionExcluder.h + qpid/ha/ReplicatingSubscription.h + qpid/ha/Settings.h ) add_library (ha MODULE ${ha_SOURCES}) diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index be1fb73e89..31f7bcc494 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -26,16 +26,21 @@ ha_la_SOURCES = \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ qpid/ha/BrokerReplicator.cpp \ - qpid/ha/BrokerReplicator.h \ + qpid/ha/BrokerReplicator.h \ qpid/ha/ConnectionExcluder.cpp \ qpid/ha/ConnectionExcluder.h \ + qpid/ha/Counter.h \ + qpid/ha/Enum.cpp \ + qpid/ha/Enum.h \ qpid/ha/HaBroker.cpp \ qpid/ha/HaBroker.h \ qpid/ha/HaPlugin.cpp \ + qpid/ha/LogPrefix.cpp \ + qpid/ha/LogPrefix.h \ + qpid/ha/Primary.cpp \ + qpid/ha/Primary.h \ qpid/ha/QueueReplicator.cpp \ qpid/ha/QueueReplicator.h \ - qpid/ha/ReplicateLevel.cpp \ - qpid/ha/ReplicateLevel.h \ qpid/ha/ReplicatingSubscription.cpp \ qpid/ha/ReplicatingSubscription.h \ qpid/ha/Settings.h diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index c13ac19454..cb9d61a40f 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -1257,5 +1257,19 @@ void Broker::unbind(const std::string& queueName, } } +// FIXME aconway 2012-04-27: access to linkClientProperties is +// not properly thread safe, you could lose fields if 2 threads +// attempt to add a field concurrently. + +framing::FieldTable Broker::getLinkClientProperties() const { + sys::Mutex::ScopedLock l(linkClientPropertiesLock); + return linkClientProperties; +} + +void Broker::setLinkClientProperties(const framing::FieldTable& ft) { + sys::Mutex::ScopedLock l(linkClientPropertiesLock); + linkClientProperties = ft; +} + }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 543d42e002..089619ec44 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -63,8 +63,8 @@ namespace qpid { namespace sys { - class ProtocolFactory; - class Poller; +class ProtocolFactory; +class Poller; } struct Url; @@ -90,7 +90,7 @@ class Broker : public sys::Runnable, public Plugin::Target, public management::Manageable, public RefCounted { -public: + public: struct Options : public qpid::Options { static const std::string DEFAULT_DATA_DIR_LOCATION; @@ -132,23 +132,23 @@ public: }; class ConnectionCounter { - int maxConnections; - int connectionCount; - sys::Mutex connectionCountLock; - public: - ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; - void inc_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount++; - } - void dec_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - connectionCount--; - } - bool allowConnection() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); - return (maxConnections <= connectionCount); - } + int maxConnections; + int connectionCount; + sys::Mutex connectionCountLock; + public: + ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; + void inc_connectionCount() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); + connectionCount++; + } + void dec_connectionCount() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); + connectionCount--; + } + bool allowConnection() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); + return (maxConnections <= connectionCount); + } }; private: @@ -205,6 +205,10 @@ public: ConnectionCounter connectionCounter; ConsumerFactories consumerFactories; + mutable sys::Mutex linkClientPropertiesLock; + framing::FieldTable linkClientProperties; + + public: QPID_BROKER_EXTERN virtual ~Broker(); @@ -375,6 +379,10 @@ public: ConsumerFactories& getConsumerFactories() { return consumerFactories; } ConnectionObservers& getConnectionObservers() { return connectionObservers; } + + /** Properties to be set on outgoing link connections */ + framing::FieldTable getLinkClientProperties() const; + void setLinkClientProperties(const framing::FieldTable&); }; }} diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 6894324117..5d24e115c4 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -318,7 +318,7 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG)); } - FieldTable ft; + FieldTable ft = connection.getBroker().getLinkClientProperties(); ft.setInt(QPID_FED_LINK,1); ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag()); diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 5f053e0974..1ff5578ff4 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -20,7 +20,6 @@ */ #include "Backup.h" #include "BrokerReplicator.h" -#include "ConnectionExcluder.h" #include "HaBroker.h" #include "ReplicatingSubscription.h" #include "Settings.h" @@ -45,17 +44,15 @@ using types::Variant; using std::string; Backup::Backup(HaBroker& hb, const Settings& s) : - haBroker(hb), broker(hb.getBroker()), settings(s), excluder(new ConnectionExcluder()) + logPrefix(hb), haBroker(hb), broker(hb.getBroker()), settings(s) { - // Exclude client connections before starting the link to avoid self-connection. - broker.getConnectionObservers().add(excluder); // Empty brokerUrl means delay initialization until setUrl() is called. if (!s.brokerUrl.empty()) initialize(Url(s.brokerUrl)); } void Backup::initialize(const Url& url) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(notice, "HA: Backup initialized: " << url); + QPID_LOG(info, logPrefix << "initialized for: " << url); string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; framing::Uuid uuid(true); // Declare the link @@ -75,7 +72,6 @@ Backup::~Backup() { if (link) link->close(); if (replicator.get()) broker.getExchanges().destroy(replicator->getName()); replicator.reset(); - broker.getConnectionObservers().remove(excluder); // This allows client connections. } @@ -84,7 +80,7 @@ void Backup::setBrokerUrl(const Url& url) { if (url.empty()) return; sys::Mutex::ScopedLock l(lock); if (link) { // URL changed after we initialized. - QPID_LOG(info, "HA: Backup broker URL set to " << url); + QPID_LOG(info, logPrefix << "broker URL set to " << url); link->setUrl(url); } else { diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 6c36996914..f794b11a60 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -22,6 +22,7 @@ * */ +#include "LogPrefix.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" @@ -36,7 +37,6 @@ class Link; namespace ha { class Settings; -class ConnectionExcluder; class BrokerReplicator; class HaBroker; @@ -55,13 +55,13 @@ class Backup private: void initialize(const Url&); + LogPrefix logPrefix; sys::Mutex lock; HaBroker& haBroker; broker::Broker& broker; Settings settings; boost::shared_ptr<broker::Link> link; boost::shared_ptr<BrokerReplicator> replicator; - boost::shared_ptr<ConnectionExcluder> excluder; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index 690337831c..ea5f4a5fa8 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -39,6 +39,7 @@ #include "qmf/org/apache/qpid/broker/EventSubscribe.h" #include <algorithm> #include <sstream> +#include <assert.h> namespace qpid { namespace ha { @@ -58,7 +59,6 @@ using namespace broker; namespace { const string QPID_CONFIGURATION_REPLICATOR("qpid.configuration-replicator"); -const string QPID_REPLICATE("qpid.replicate"); const string CLASS_NAME("_class_name"); const string EVENT("_event"); @@ -163,32 +163,13 @@ Variant::Map asMapVoid(const Variant& value) { } // namespace -ReplicateLevel BrokerReplicator::replicateLevel(const std::string& str) { - ReplicateLevel rl; - if (qpid::ha::replicateLevel(str, rl)) return rl; - else return haBroker.getSettings().replicateDefault; -} - -ReplicateLevel BrokerReplicator::replicateLevel(const framing::FieldTable& f) { - if (f.isSet(QPID_REPLICATE)) - return replicateLevel(f.getAsString(QPID_REPLICATE)); - else - return haBroker.getSettings().replicateDefault; -} - -ReplicateLevel BrokerReplicator::replicateLevel(const Variant::Map& m) { - Variant::Map::const_iterator i = m.find(QPID_REPLICATE); - if (i != m.end()) - return replicateLevel(i->second.asString()); - else - return haBroker.getSettings().replicateDefault; -} - BrokerReplicator::~BrokerReplicator() {} BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l) : Exchange(QPID_CONFIGURATION_REPLICATOR), - haBroker(hb), broker(hb.getBroker()), link(l) + logPrefix(hb), + haBroker(hb), broker(hb.getBroker()), link(l), + unreadyCount(boost::bind(&BrokerReplicator::ready, this)) { framing::Uuid uuid(true); const std::string name(QPID_CONFIGURATION_REPLICATOR + ".bridge." + uuid.str()); @@ -211,13 +192,33 @@ BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& // This is called in the connection IO thread when the bridge is started. void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + + switch (haBroker.getStatus()) { + case JOINING: + haBroker.setStatus(CATCHUP); + case CATCHUP: + // FIXME aconway 2012-04-27: distinguish catchup case, below. + break; + case READY: + // FIXME aconway 2012-04-27: distinguish ready case, reconnect to other backup. + break; + case PROMOTING: + case ACTIVE: + // FIXME aconway 2012-04-27: link is connected to self! + // Promotion should close the link before allowing connections. + return; + break; + case STANDALONE: + return; + } + framing::AMQP_ServerProxy peer(sessionHandler.out); string queueName = bridge.getQueueName(); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); //declare and bind an event queue FieldTable declareArgs; - declareArgs.setString(QPID_REPLICATE, str(RL_NONE)); + declareArgs.setString(QPID_REPLICATE, printable(NONE).str()); peer.getQueue().declare(queueName, "", false, false, true, true, declareArgs); peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); //subscribe to the queue @@ -231,7 +232,9 @@ void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionH sendQuery(ORG_APACHE_QPID_BROKER, QUEUE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, EXCHANGE, queueName, sessionHandler); sendQuery(ORG_APACHE_QPID_BROKER, BINDING, queueName, sessionHandler); - QPID_LOG(debug, "HA: Backup configuration bridge: " << queueName); + // Queue ready count - count one for the query in progress. + ++unreadyCount; + QPID_LOG(debug, logPrefix << "opened configuration bridge: " << queueName); } void BrokerReplicator::route(Deliverable& msg) { @@ -244,10 +247,10 @@ void BrokerReplicator::route(Deliverable& msg) { string content = msg.getMessage().getFrames().getContent(); amqp_0_10::ListCodec::decode(content, list); + string type; // FIXME aconway 2012-04-26: quick hack for end-of query, need to handle multi-message responses if (headers->getAsString(QMF_CONTENT) == EVENT) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "HA: Backup received event: " << map); Variant::Map& schema = map[SCHEMA_ID].asMap(); Variant::Map& values = map[VALUES].asMap(); if (match<EventQueueDeclare>(schema)) doEventQueueDeclare(values); @@ -260,8 +263,7 @@ void BrokerReplicator::route(Deliverable& msg) { } else if (headers->getAsString(QMF_OPCODE) == QUERY_RESPONSE) { for (Variant::List::iterator i = list.begin(); i != list.end(); ++i) { Variant::Map& map = i->asMap(); - QPID_LOG(trace, "HA: Backup received event: " << map); - string type = map[SCHEMA_ID].asMap()[CLASS_NAME]; + type = map[SCHEMA_ID].asMap()[CLASS_NAME].asString(); Variant::Map& values = map[VALUES].asMap(); framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); @@ -271,8 +273,13 @@ void BrokerReplicator::route(Deliverable& msg) { else if (type == HA_BROKER) doResponseHaBroker(values); } } + // FIXME aconway 2012-04-26: when the queue query is complete + if (type == QUEUE) { + // Count 1 for the query, which is now complete. + --unreadyCount; + } } catch (const std::exception& e) { - QPID_LOG(critical, "HA: Backup configuration failed: " << e.what() + QPID_LOG(critical, logPrefix << "configuration failed: " << e.what() << ": while handling: " << list); throw; } @@ -281,9 +288,16 @@ void BrokerReplicator::route(Deliverable& msg) { void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { string name = values[QNAME].asString(); Variant::Map argsMap = asMapVoid(values[ARGS]); - if (values[DISP] == CREATED && replicateLevel(argsMap)) { + if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated queue. + if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) { framing::FieldTable args; amqp_0_10::translate(argsMap, args); + // If we already have a queue with this name, replace it. + // The queue was definitely created on the primary. + if (broker.getQueues().find(name)) { + broker.getQueues().destroy(name); + QPID_LOG(warning, logPrefix << "queue declare event, replaced exsiting: " << name); + } std::pair<boost::shared_ptr<Queue>, bool> result = broker.createQueue( name, @@ -294,61 +308,68 @@ void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) { args, values[USER].asString(), values[RHOST].asString()); - if (result.second) { - QPID_LOG(debug, "HA: Backup queue declare event: " << name); - startQueueReplicator(result.first); - } else { - // FIXME aconway 2011-12-02: what's the right way to handle this? - // Should we delete the old & re-create form the event? Responses - // may be old but events are always up-to-date. - QPID_LOG(warning, "HA: Backup queue declare event, already exists: " << name); - } + assert(result.second); + QPID_LOG(debug, logPrefix << "queue declare event: " << name); + startQueueReplicator(result.first, 0); // No unreadyCount for declare events. + // FIXME aconway 2012-04-26: but we will need to count them after a failover. } } +boost::shared_ptr<QueueReplicator> BrokerReplicator::findQueueReplicator( + const std::string& qname) +{ + string rname = QueueReplicator::replicatorName(qname); + boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); + return boost::dynamic_pointer_cast<QueueReplicator>(ex); +} + void BrokerReplicator::doEventQueueDelete(Variant::Map& values) { // The remote queue has already been deleted so replicator // sessions may be closed by a "queue deleted" exception. string name = values[QNAME].asString(); boost::shared_ptr<Queue> queue = broker.getQueues().find(name); if (!queue) { - QPID_LOG(warning, "HA: Backup queue delete event, does not exist: " << name); - } else if (!replicateLevel(queue->getSettings())) { - QPID_LOG(warning, "HA: Backup queue delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "queue delete event, does not exist: " << name); + } else if (!haBroker.replicateLevel(queue->getSettings())) { + QPID_LOG(warning, logPrefix << "queue delete event, not replicated: " << name); } else { - string rname = QueueReplicator::replicatorName(name); - boost::shared_ptr<broker::Exchange> ex = broker.getExchanges().find(rname); - boost::shared_ptr<QueueReplicator> qr = boost::dynamic_pointer_cast<QueueReplicator>(ex); - if (qr) qr->deactivate(); - // QueueReplicator's bridge is now queued for destruction but may not - // actually be destroyed, deleting the exhange - broker.getExchanges().destroy(rname); + boost::shared_ptr<QueueReplicator> qr = findQueueReplicator(name); + if (qr) { + qr->deactivate(); + haBroker.deactivatedBackup(name); + // QueueReplicator's bridge is now queued for destruction but may not + // actually be destroyed. + broker.getExchanges().destroy(qr->getName()); + } broker.deleteQueue(name, values[USER].asString(), values[RHOST].asString()); - QPID_LOG(debug, "HA: Backup queue delete event: " << name); + QPID_LOG(debug, logPrefix << "queue delete event: " << name); } } void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGS])); - if (values[DISP] == CREATED && replicateLevel(argsMap)) { + if (!haBroker.replicateLevel(argsMap)) return; // Not a replicated exchange. + if (values[DISP] == CREATED && haBroker.replicateLevel(argsMap)) { string name = values[EXNAME].asString(); framing::FieldTable args; amqp_0_10::translate(argsMap, args); - if (broker.createExchange( + // If we already have a exchange with this name, replace it. + // The exchange was definitely created on the primary. + if (broker.getExchanges().find(name)) { + broker.getExchanges().destroy(name); + QPID_LOG(warning, logPrefix << "exchange declare event, replaced exsiting: " << name) + } + std::pair<boost::shared_ptr<Exchange>, bool> result = + broker.createExchange( name, values[EXTYPE].asString(), values[DURABLE].asBool(), values[ALTEX].asString(), args, values[USER].asString(), - values[RHOST].asString()).second) - { - QPID_LOG(debug, "HA: Backup exchange declare event: " << name); - } else { - // FIXME aconway 2011-11-22: should delete pre-existing exchange - // and re-create from event. See comment in doEventQueueDeclare. - QPID_LOG(debug, "HA: Backup exchange declare event, already exists: " << name); - } + values[RHOST].asString()); + assert(result.second); + QPID_LOG(debug, logPrefix << "exchange declare event: " << name); } } @@ -356,11 +377,11 @@ void BrokerReplicator::doEventExchangeDelete(Variant::Map& values) { string name = values[EXNAME].asString(); boost::shared_ptr<Exchange> exchange = broker.getExchanges().find(name); if (!exchange) { - QPID_LOG(warning, "HA: Backup exchange delete event, does not exist: " << name); - } else if (!replicateLevel(exchange->getArgs())) { - QPID_LOG(warning, "HA: Backup exchange delete event, not replicated: " << name); + QPID_LOG(warning, logPrefix << "exchange delete event, does not exist: " << name); + } else if (!haBroker.replicateLevel(exchange->getArgs())) { + QPID_LOG(warning, logPrefix << "exchange delete event, not replicated: " << name); } else { - QPID_LOG(debug, "HA: Backup exchange delete event:" << name); + QPID_LOG(debug, logPrefix << "exchange delete event:" << name); broker.deleteExchange( name, values[USER].asString(), @@ -375,14 +396,14 @@ void BrokerReplicator::doEventBind(Variant::Map& values) { broker.getQueues().find(values[QNAME].asString()); // We only replicate binds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) + if (exchange && haBroker.replicateLevel(exchange->getArgs()) && + queue && haBroker.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup bind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "bind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -395,14 +416,14 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { broker.getQueues().find(values[QNAME].asString()); // We only replicate unbinds for a replicated queue to replicated // exchange that both exist locally. - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) + if (exchange && haBroker.replicateLevel(exchange->getArgs()) && + queue && haBroker.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGS]), args); string key = values[KEY].asString(); exchange->unbind(queue, key, &args); - QPID_LOG(debug, "HA: Backup unbind event: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "unbind event: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -410,7 +431,7 @@ void BrokerReplicator::doEventUnbind(Variant::Map& values) { void BrokerReplicator::doResponseQueue(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicateLevel(argsMap)) return; + if (!haBroker.replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); string name(values[NAME].asString()); @@ -424,19 +445,19 @@ void BrokerReplicator::doResponseQueue(Variant::Map& values) { args, ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/); - if (result.second) { - QPID_LOG(debug, "HA: Backup queue response: " << name); - startQueueReplicator(result.first); - } else { - // FIXME aconway 2011-11-22: Normal to find queue already - // exists if we're failing over. - QPID_LOG(warning, "HA: Backup queue response, already exists: " << name); - } + QueueReplicatorPtr qr; + // It is normal for the queue to already exist if we are failing over. + // FIXME aconway 2012-04-26: not correct, unreadyCount + if (result.second) qr = startQueueReplicator(result.first, &unreadyCount); + else qr = findQueueReplicator(name); + if (qr) ++unreadyCount; + // existing QR may refcount down before I've gone thru the responses. + QPID_LOG(debug, logPrefix << "queue response: " << name); } void BrokerReplicator::doResponseExchange(Variant::Map& values) { Variant::Map argsMap(asMapVoid(values[ARGUMENTS])); - if (!replicateLevel(argsMap)) return; + if (!haBroker.replicateLevel(argsMap)) return; framing::FieldTable args; amqp_0_10::translate(argsMap, args); if (broker.createExchange( @@ -448,10 +469,10 @@ void BrokerReplicator::doResponseExchange(Variant::Map& values) { ""/*TODO: who is the user?*/, ""/*TODO: what should we use as connection id?*/).second) { - QPID_LOG(debug, "HA: Backup exchange response: " << values[NAME].asString()); + QPID_LOG(debug, logPrefix << "exchange response: " << values[NAME].asString()); } else { - QPID_LOG(warning, "HA: Backup exchange query, already exists: " << - values[QNAME].asString()); + QPID_LOG(warning, logPrefix << "exchange response, already exists: " << + values[NAME].asString()); } } @@ -483,14 +504,14 @@ void BrokerReplicator::doResponseBind(Variant::Map& values) { boost::shared_ptr<Queue> queue = broker.getQueues().find(qName); // Automatically replicate binding if queue and exchange exist and are replicated - if (exchange && replicateLevel(exchange->getArgs()) && - queue && replicateLevel(queue->getSettings())) + if (exchange && haBroker.replicateLevel(exchange->getArgs()) && + queue && haBroker.replicateLevel(queue->getSettings())) { framing::FieldTable args; amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args); string key = values[KEY].asString(); exchange->bind(queue, key, &args); - QPID_LOG(debug, "HA: Backup bind response: exchange=" << exchange->getName() + QPID_LOG(debug, logPrefix << "bind response: exchange=" << exchange->getName() << " queue=" << queue->getName() << " key=" << key); } @@ -503,28 +524,33 @@ const string REPLICATE_DEFAULT="replicateDefault"; // Received the ha-broker configuration object for the primary broker. void BrokerReplicator::doResponseHaBroker(Variant::Map& values) { try { - ReplicateLevel mine = haBroker.getSettings().replicateDefault; - ReplicateLevel primary = replicateLevel(values[REPLICATE_DEFAULT].asString()); + ReplicateLevel mine = haBroker.getSettings().replicateDefault.get(); + ReplicateLevel primary = haBroker.replicateLevel(values[REPLICATE_DEFAULT].asString()); if (mine != primary) { - std::ostringstream os; - os << "Replicate default on backup (" << mine - << ") does not match primary (" << primary << ")"; - haBroker.shutdown(os.str()); + QPID_LOG(critical, logPrefix << "Replicate default on backup (" << mine + << ") does not match primary (" << primary << ")"); + haBroker.shutdown(); } } catch (const std::exception& e) { - std::ostringstream os; - os << "Received invalid replicate default from primary: " << e.what(); - haBroker.shutdown(os.str()); + QPID_LOG(critical, logPrefix << "Invalid replicate default from primary: " + << e.what()); + haBroker.shutdown(); } } -void BrokerReplicator::startQueueReplicator(const boost::shared_ptr<Queue>& queue) { - if (replicateLevel(queue->getSettings()) == RL_ALL) { - boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); +BrokerReplicator::QueueReplicatorPtr BrokerReplicator::startQueueReplicator( + const boost::shared_ptr<Queue>& queue, Counter* unready) +{ + boost::shared_ptr<QueueReplicator> qr; + if (haBroker.replicateLevel(queue->getSettings()) == ALL) { + qr.reset(new QueueReplicator( + LogPrefix(haBroker, queue->getName()), queue, link, unready)); if (!broker.getExchanges().registerExchange(qr)) throw Exception(QPID_MSG("Duplicate queue replicator " << qr->getName())); qr->activate(); + haBroker.activatedBackup(queue->getName()); } + return qr; } bool BrokerReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } @@ -533,4 +559,9 @@ bool BrokerReplicator::isBound(boost::shared_ptr<Queue>, const string* const, co string BrokerReplicator::getType() const { return QPID_CONFIGURATION_REPLICATOR; } +void BrokerReplicator::ready() { + assert(haBroker.getStatus() == CATCHUP); + haBroker.setStatus(READY); +} + }} // namespace broker diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.h b/qpid/cpp/src/qpid/ha/BrokerReplicator.h index c9d7b9f74c..8b7987a89d 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.h +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.h @@ -22,7 +22,9 @@ * */ -#include "ReplicateLevel.h" +#include "Counter.h" +#include "Enum.h" +#include "LogPrefix.h" #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" #include <boost/shared_ptr.hpp> @@ -42,6 +44,7 @@ class FieldTable; namespace ha { class HaBroker; +class QueueReplicator; /** * Replicate configuration on a backup broker. @@ -68,11 +71,9 @@ class BrokerReplicator : public broker::Exchange bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); private: - void initializeBridge(broker::Bridge&, broker::SessionHandler&); + typedef boost::shared_ptr<QueueReplicator> QueueReplicatorPtr; - ReplicateLevel replicateLevel(const std::string&); - ReplicateLevel replicateLevel(const framing::FieldTable& args); - ReplicateLevel replicateLevel(const types::Variant::Map& args); + void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); @@ -86,11 +87,16 @@ class BrokerReplicator : public broker::Exchange void doResponseBind(types::Variant::Map& values); void doResponseHaBroker(types::Variant::Map& values); - void startQueueReplicator(const boost::shared_ptr<broker::Queue>&); + QueueReplicatorPtr findQueueReplicator(const std::string& qname); + QueueReplicatorPtr startQueueReplicator( + const boost::shared_ptr<broker::Queue>&, Counter*); + void ready(); + LogPrefix logPrefix; HaBroker& haBroker; broker::Broker& broker; boost::shared_ptr<broker::Link> link; + Counter unreadyCount; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp index 67ad7202d6..fef4c67174 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp @@ -27,14 +27,31 @@ namespace qpid { namespace ha { -ConnectionExcluder::ConnectionExcluder() {} +ConnectionExcluder::ConnectionExcluder(const LogPrefix& lp) + : logPrefix(lp), backupAllowed(false) {} void ConnectionExcluder::opened(broker::Connection& connection) { - if (!connection.isLink() && !connection.getClientProperties().isSet(ADMIN_TAG)) - throw Exception( - QPID_MSG("HA: Backup broker rejected connection " << connection.getMgmtId())); + if (connection.isLink()) return; // Allow all outgoing links + if (connection.getClientProperties().isSet(ADMIN_TAG)) { + QPID_LOG(debug, logPrefix << "Allowing admin connection: " + << connection.getMgmtId()); + return; + } + if (connection.getClientProperties().isSet(BACKUP_TAG)) { + if (backupAllowed) { + QPID_LOG(debug, logPrefix << "Allowing backup connection: " + << connection.getMgmtId()); + return; + } + else QPID_LOG(debug, logPrefix << "Rejected backup connection: " + << connection.getMgmtId()); + } + + throw Exception( + QPID_MSG(logPrefix << "Rejected client connection " << connection.getMgmtId())); } const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin"; +const std::string ConnectionExcluder::BACKUP_TAG="qpid.ha-backup"; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h index f8f2843a0c..4a2ebcc127 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionExcluder.h +++ b/qpid/cpp/src/qpid/ha/ConnectionExcluder.h @@ -22,6 +22,7 @@ * */ +#include "LogPrefix.h" #include "qpid/broker/ConnectionObserver.h" #include <boost/function.hpp> @@ -41,12 +42,19 @@ namespace ha { class ConnectionExcluder : public broker::ConnectionObserver { public: - ConnectionExcluder(); + static const std::string ADMIN_TAG; + static const std::string BACKUP_TAG; + + ConnectionExcluder(const LogPrefix&); void opened(broker::Connection& connection); + void setBackupAllowed(bool set) { backupAllowed = set; } + bool isBackupAllowed() const { return backupAllowed; } + private: - static const std::string ADMIN_TAG; + LogPrefix logPrefix; + bool backupAllowed; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Counter.h b/qpid/cpp/src/qpid/ha/Counter.h new file mode 100644 index 0000000000..04dd672126 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Counter.h @@ -0,0 +1,57 @@ +#ifndef QPID_HA_COUNTER_H +#define QPID_HA_COUNTER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/AtomicValue.h" +#include <boost/function.hpp> + +namespace qpid { +namespace ha { + +/** + * Keep a count, call a callback when it reaches 0. + */ +class Counter +{ + public: + Counter(boost::function<void()> f) : callback(f) {} + + void operator++() { ++count; } + + void operator--() { + size_t n = --count; + assert(n != size_t(-1)); // No underflow + if (n == 0) callback(); + } + + size_t get() { return count.get(); } + + Counter& operator=(size_t n) { count = n; return *this; } + + private: + boost::function<void()> callback; + sys::AtomicValue<size_t> count; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_COUNTER_H*/ diff --git a/qpid/cpp/src/qpid/ha/Enum.cpp b/qpid/cpp/src/qpid/ha/Enum.cpp new file mode 100644 index 0000000000..fda2976ad3 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Enum.cpp @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Enum.h" +#include "qpid/Msg.h" +#include "qpid/Exception.h" +#include <algorithm> +#include <iostream> +#include <assert.h> + +namespace qpid { +namespace ha { + +const std::string QPID_REPLICATE("qpid.replicate"); + +std::string EnumBase::str() const { + assert(value < count); + return names[value]; +} + +void EnumBase::parse(const std::string& s) { + if (!parseNoThrow(s)) + throw Exception(QPID_MSG("Invalid " << names[count] << " value: " << s)); +} + +bool EnumBase::parseNoThrow(const std::string& s) { + const char** i = std::find(names, names+count, s); + value = i - names; + return value < count; +} + +template <> const char* Enum<ReplicateLevel>::NAMES[] = { + "none", "configuration", "all", "replication" +}; +template <> const size_t Enum<ReplicateLevel>::N = 3; + +template <> const char* Enum<BrokerStatus>::NAMES[] = { + "joining", "catchup", "ready", "promoting", "active", + "standalone", "broker status" +}; +template <> const size_t Enum<BrokerStatus>::N = 7; + +std::ostream& operator<<(std::ostream& o, EnumBase e) { + return o << e.str(); +} + +std::istream& operator>>(std::istream& i, EnumBase& e) { + std::string s; + i >> s; + e.parse(s); + return i; +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Enum.h b/qpid/cpp/src/qpid/ha/Enum.h new file mode 100644 index 0000000000..82d087b768 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Enum.h @@ -0,0 +1,97 @@ +#ifndef QPID_HA_ENUM_H +#define QPID_HA_ENUM_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/types/Variant.h" +#include <string> +#include <iosfwd> + +namespace qpid { + +namespace framing { +class FieldTable; +} + +namespace ha { + +/** Base class for enums with string conversion */ +class EnumBase { + public: + EnumBase(const char* names_[], size_t count_, unsigned value) + : value(value), names(names_), count(count_) {} + + /** Convert to string */ + std::string str() const; + /** Parse from string, throw if unsuccessful */ + void parse(const std::string&); + /** Parse from string, return false if unsuccessful. */ + bool parseNoThrow(const std::string&); + + protected: + unsigned value; + const char** names; + size_t count; +}; + +std::ostream& operator<<(std::ostream&, EnumBase); +std::istream& operator>>(std::istream&, EnumBase&); + +/** Wrapper template for enums with string conversion */ +template <class T> class Enum : public EnumBase { + public: + Enum(T x=T()) : EnumBase(NAMES, N, x) {} + T get() const { return T(value); } + void operator=(T x) { value = x; } + private: + static const size_t N; + static const char* NAMES[]; +}; + +/** To print an enum x: o << printable(x) */ +template <class T> Enum<T> printable(T x) { return Enum<T>(x); } + +enum ReplicateLevel { + NONE, ///< Nothing is replicated + CONFIGURATION, ///< Wiring is replicated but not messages + ALL ///< Everything is replicated +}; + +/** State of a broker: see HaBroker::setStatus for state diagram */ +enum BrokerStatus { + JOINING, ///< New broker, looking for primary + CATCHUP, ///< Backup: Connected to primary, catching up on state. + READY, ///< Backup: Caught up, ready to take over. + PROMOTING, ///< Primary: waiting for backups to connect & sync + ACTIVE, ///< Primary: actively serving clients. + STANDALONE ///< Not part of a cluster. +}; + +inline bool isPrimary(BrokerStatus s) { + return s == PROMOTING || s == ACTIVE || s == STANDALONE; +} + +inline bool isBackup(BrokerStatus s) { return !isPrimary(s); } + +extern const std::string QPID_REPLICATE; +}} // qpid::ha +#endif /*!QPID_HA_ENUM_H*/ diff --git a/qpid/cpp/src/qpid/ha/HaBroker.cpp b/qpid/cpp/src/qpid/ha/HaBroker.cpp index 589d7ee6aa..b1c7bf98a5 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.cpp +++ b/qpid/cpp/src/qpid/ha/HaBroker.cpp @@ -21,6 +21,7 @@ #include "Backup.h" #include "ConnectionExcluder.h" #include "HaBroker.h" +#include "Primary.h" #include "Settings.h" #include "ReplicatingSubscription.h" #include "qpid/Exception.h" @@ -28,6 +29,7 @@ #include "qpid/broker/Link.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SignalHandler.h" +#include "qpid/framing/FieldTable.h" #include "qpid/management/ManagementAgent.h" #include "qmf/org/apache/qpid/ha/Package.h" #include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h" @@ -43,59 +45,101 @@ namespace _qmf = ::qmf::org::apache::qpid::ha; using namespace management; using namespace std; -namespace { - -const std::string STANDALONE="standalone"; -const std::string CATCH_UP="catch-up"; -const std::string BACKUP="backup"; -const std::string PRIMARY="primary"; - -} // namespace - - HaBroker::HaBroker(broker::Broker& b, const Settings& s) - : broker(b), + : logPrefix(status), + broker(b), settings(s), - mgmtObject(0) + mgmtObject(0), + status(STANDALONE), + excluder(new ConnectionExcluder(logPrefix)) { - // Register a factory for replicating subscriptions. - broker.getConsumerFactories().add( - boost::shared_ptr<ReplicatingSubscription::Factory>( - new ReplicatingSubscription::Factory())); - - broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); - + // Set up the management object. ManagementAgent* ma = broker.getManagementAgent(); - if (!ma) + if (settings.cluster && !ma) throw Exception("Cannot start HA: management is disabled"); _qmf::Package packageInit(ma); mgmtObject = new _qmf::HaBroker(ma, this, "ha-broker"); - mgmtObject->set_status(settings.cluster ? BACKUP : STANDALONE); - mgmtObject->set_replicateDefault(str(settings.replicateDefault)); + mgmtObject->set_replicateDefault(settings.replicateDefault.str()); ma->addObject(mgmtObject); - // NOTE: lock is not needed in a constructor but we created it just to pass - // to the set functions. + // Register a factory for replicating subscriptions. + broker.getConsumerFactories().add( + boost::shared_ptr<ReplicatingSubscription::Factory>( + new ReplicatingSubscription::Factory(*this))); + + // If we are in a cluster, start as backup in joining state. + if (settings.cluster) { + status = JOINING; + backup.reset(new Backup(*this, s)); + broker.getConnectionObservers().add(excluder); + broker.getKnownBrokers = boost::bind(&HaBroker::getKnownBrokers, this); + } + + // NOTE: lock is not needed in a constructor, but create one + // to pass to functions that have a ScopedLock parameter. sys::Mutex::ScopedLock l(lock); if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l); if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l); - - // If we are in a cluster, we start in backup mode. - if (settings.cluster) backup.reset(new Backup(*this, s)); + statusChanged(l); + QPID_LOG(notice, logPrefix << "broker initialized"); } HaBroker::~HaBroker() {} +void HaBroker::promoting(sys::Mutex::ScopedLock&) { + setStatus(PROMOTING); + backup.reset(); // No longer replicating, close link. + primary.reset(new Primary(*this)); // Starts primary-ready check. +} + +// Called back from Primary ready check. +void HaBroker::activate() { + sys::Mutex::ScopedLock l(lock); + activate(l); +} + +void HaBroker::activate(sys::Mutex::ScopedLock&) { + setStatus(ACTIVE); + broker.getConnectionObservers().remove(excluder); // This allows client connections. +} + +ReplicateLevel HaBroker::replicateLevel(const std::string& str) { + Enum<ReplicateLevel> rl; + if (rl.parseNoThrow(str)) return ReplicateLevel(rl.get()); + else return getSettings().replicateDefault.get(); +} + +ReplicateLevel HaBroker::replicateLevel(const framing::FieldTable& f) { + if (f.isSet(QPID_REPLICATE)) + return replicateLevel(f.getAsString(QPID_REPLICATE)); + else + return getSettings().replicateDefault.get(); +} + +ReplicateLevel HaBroker::replicateLevel(const types::Variant::Map& m) { + types::Variant::Map::const_iterator i = m.find(QPID_REPLICATE); + if (i != m.end()) + return replicateLevel(i->second.asString()); + else + return getSettings().replicateDefault.get(); +} + Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, string&) { sys::Mutex::ScopedLock l(lock); switch (methodId) { case _qmf::HaBroker::METHOD_PROMOTE: { - if (backup.get()) { // I am a backup - // NOTE: resetting backup allows client connections, so any - // primary state should be set up here before backup.reset() - backup.reset(); - QPID_LOG(notice, "HA: Promoted to primary"); - mgmtObject->set_status(PRIMARY); + switch (status) { + case JOINING: activate(l); break; + case CATCHUP: + // FIXME aconway 2012-04-27: don't allow promotion in catch-up + // QPID_LOG(error, logPrefix << "Still catching up, cannot be promoted."); + // throw Exception("Still catching up, cannot be promoted."); + promoting(l); // FIXME aconway 2012-04-27: disallow + break; + case READY: promoting(l); break; + case PROMOTING: break; + case ACTIVE: break; + case STANDALONE: break; } break; } @@ -109,12 +153,13 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, } case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: { setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l); - break; + break; } case _qmf::HaBroker::METHOD_REPLICATE: { _qmf::ArgsHaBrokerReplicate& bq_args = dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args); - QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker); + QPID_LOG(debug, logPrefix << "replicate individual queue " + << bq_args.i_queue << " from " << bq_args.i_broker); boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue); Url url(bq_args.i_broker); @@ -128,7 +173,8 @@ Manageable::status_t HaBroker::ManagementMethod (uint32_t methodId, Args& args, boost::shared_ptr<broker::Link> link = result.first; link->setUrl(url); // Create a queue replicator - boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link)); + boost::shared_ptr<QueueReplicator> qr( + new QueueReplicator(LogPrefix(*this, queue->getName()), queue, link)); qr->activate(); broker.getExchanges().registerExchange(qr); break; @@ -152,12 +198,12 @@ void HaBroker::updateClientUrl(const sys::Mutex::ScopedLock&) { mgmtObject->set_publicBrokers(url.str()); knownBrokers.clear(); knownBrokers.push_back(url); - QPID_LOG(debug, "HA: Setting client URL to: " << url); + QPID_LOG(debug, logPrefix << "Setting client URL to: " << url); } void HaBroker::setBrokerUrl(const Url& url, const sys::Mutex::ScopedLock& l) { if (url.empty()) throw Url::Invalid("HA broker URL is empty"); - QPID_LOG(debug, "HA: Setting broker URL to: " << url); + QPID_LOG(debug, logPrefix << "Setting broker URL to: " << url); brokerUrl = url; mgmtObject->set_brokers(brokerUrl.str()); if (backup.get()) backup->setBrokerUrl(brokerUrl); @@ -174,9 +220,79 @@ std::vector<Url> HaBroker::getKnownBrokers() const { return knownBrokers; } -void HaBroker::shutdown(const std::string& message) { - QPID_LOG(critical, "Shutting down: " << message); +void HaBroker::shutdown() { + QPID_LOG(critical, logPrefix << "Critical error, shutting down."); broker.shutdown(); } +BrokerStatus HaBroker::getStatus() const { + sys::Mutex::ScopedLock l(lock); + return status; +} + +void HaBroker::setStatus(BrokerStatus newStatus) { + sys::Mutex::ScopedLock l(lock); + setStatus(newStatus, l); +} + +namespace { +bool checkTransition(BrokerStatus from, BrokerStatus to) { + // Legal state transitions. Initial state is JOINING, ACTIVE is terminal. + static const BrokerStatus TRANSITIONS[][2] = { + { CATCHUP, PROMOTING }, // FIXME aconway 2012-04-27: illegal transition, allow while fixing behavior + { JOINING, CATCHUP }, // Connected to primary + { JOINING, ACTIVE }, // Chosen as initial primary. + { CATCHUP, READY }, // Caught up all queues, ready to take over. + { READY, PROMOTING }, // Chosen as new primary + { PROMOTING, ACTIVE } + }; + static const size_t N = sizeof(TRANSITIONS)/sizeof(TRANSITIONS[0]); + for (size_t i = 0; i < N; ++i) { + if (TRANSITIONS[i][0] == from && TRANSITIONS[i][1] == to) + return true; + } + return false; +} +} // namespace + +void HaBroker::setStatus(BrokerStatus newStatus, sys::Mutex::ScopedLock& l) { + QPID_LOG(notice, logPrefix << "Status change: " + << printable(status) << " -> " << printable(newStatus)); + bool legal = checkTransition(status, newStatus); + assert(legal); + if (!legal) { + QPID_LOG(critical, logPrefix << "Illegal state transition: " + << printable(status) << " -> " << printable(newStatus)); + shutdown(); + } + status = newStatus; + statusChanged(l); +} + +void HaBroker::statusChanged(sys::Mutex::ScopedLock&) { + mgmtObject->set_status(printable(status).str()); + // Set the backup-related properties for newly created links. + framing::FieldTable ft = broker.getLinkClientProperties(); + if (isBackup(status)) + ft.setInt(ConnectionExcluder::BACKUP_TAG, 1); + else + ft.erase(ConnectionExcluder::BACKUP_TAG); + broker.setLinkClientProperties(ft); +} + +void HaBroker::activatedBackup(const std::string& queue) { + sys::Mutex::ScopedLock l(lock); + activeBackups.insert(queue); +} + +void HaBroker::deactivatedBackup(const std::string& queue) { + sys::Mutex::ScopedLock l(lock); + activeBackups.erase(queue); +} + +std::set<std::string> HaBroker::getActiveBackups() const { + sys::Mutex::ScopedLock l(lock); + return activeBackups; +} + }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaBroker.h b/qpid/cpp/src/qpid/ha/HaBroker.h index 99b30fd36b..b3f2c1a941 100644 --- a/qpid/cpp/src/qpid/ha/HaBroker.h +++ b/qpid/cpp/src/qpid/ha/HaBroker.h @@ -22,19 +22,30 @@ * */ +#include "Enum.h" +#include "LogPrefix.h" #include "Settings.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" #include "qmf/org/apache/qpid/ha/HaBroker.h" #include "qpid/management/Manageable.h" +#include "qpid/types/Variant.h" #include <memory> +#include <set> +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { class Broker; } +namespace framing { +class FieldTable; +} + namespace ha { class Backup; +class ConnectionExcluder; +class Primary; /** * HA state and actions associated with a broker. @@ -55,26 +66,58 @@ class HaBroker : public management::Manageable broker::Broker& getBroker() { return broker; } const Settings& getSettings() const { return settings; } - // Log a critical error message and shut down the broker. - void shutdown(const std::string& message); + /** Shut down the broker. Caller should log a critical error message. */ + void shutdown(); + + BrokerStatus getStatus() const; + void setStatus(BrokerStatus); + void activate(); + + Backup* getBackup() { return backup.get(); } + + // Translate replicate levels. + ReplicateLevel replicateLevel(const std::string& str); + ReplicateLevel replicateLevel(const framing::FieldTable& f); + ReplicateLevel replicateLevel(const types::Variant::Map& m); + + // Keep track of the set of actively replicated queues on a backup + // so that it can be transferred to the Primary on promotion. + typedef std::set<std::string> QueueNames; + void activatedBackup(const std::string& queue); + void deactivatedBackup(const std::string& queue); + QueueNames getActiveBackups() const; + + boost::shared_ptr<ConnectionExcluder> getExcluder() { return excluder; } private: void setClientUrl(const Url&, const sys::Mutex::ScopedLock&); void setBrokerUrl(const Url&, const sys::Mutex::ScopedLock&); void setExpectedBackups(size_t, const sys::Mutex::ScopedLock&); void updateClientUrl(const sys::Mutex::ScopedLock&); + bool isPrimary(const sys::Mutex::ScopedLock&) { return !backup.get(); } + + void setStatus(BrokerStatus, sys::Mutex::ScopedLock&); + void promoting(sys::Mutex::ScopedLock&); + void activate(sys::Mutex::ScopedLock&); + void statusChanged(sys::Mutex::ScopedLock&); + std::vector<Url> getKnownBrokers() const; + LogPrefix logPrefix; broker::Broker& broker; const Settings settings; - sys::Mutex lock; + mutable sys::Mutex lock; std::auto_ptr<Backup> backup; + std::auto_ptr<Primary> primary; qmf::org::apache::qpid::ha::HaBroker* mgmtObject; Url clientUrl, brokerUrl; std::vector<Url> knownBrokers; size_t expectedBackups; + BrokerStatus status; + QueueNames activeBackups; + boost::shared_ptr<ConnectionExcluder> excluder; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/HaPlugin.cpp b/qpid/cpp/src/qpid/ha/HaPlugin.cpp index 24977775bb..b6504c03b2 100644 --- a/qpid/cpp/src/qpid/ha/HaPlugin.cpp +++ b/qpid/cpp/src/qpid/ha/HaPlugin.cpp @@ -40,6 +40,7 @@ struct Options : public qpid::Options { ("ha-replicate", optValue(settings.replicateDefault, "LEVEL"), "Replication level for creating queues and exchanges if there is no qpid.replicate argument supplied. LEVEL is 'none', 'configuration' or 'all'") + // FIXME aconway 2012-04-30: required-backups? Also need timeout. ("ha-expected-backups", optValue(settings.expectedBackups, "N"), "Number of backups expected to be active in the HA cluster.") ("ha-username", optValue(settings.username, "USER"), @@ -77,6 +78,6 @@ struct HaPlugin : public Plugin { } }; -static HaPlugin instance; // Static initialization. +HaPlugin instance; // Static initialization. }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/LogPrefix.cpp b/qpid/cpp/src/qpid/ha/LogPrefix.cpp new file mode 100644 index 0000000000..685e623287 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/LogPrefix.cpp @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LogPrefix.h" +#include "HaBroker.h" +#include <iostream> + +namespace qpid { +namespace ha { + +LogPrefix::LogPrefix(HaBroker& hb, const std::string& queue) : haBroker(&hb), status(0) { + if (queue.size()) tail = " queue " + queue; +} + +LogPrefix::LogPrefix(BrokerStatus& s) : haBroker(0), status(&s) {} + +std::ostream& operator<<(std::ostream& o, const LogPrefix& l) { + return o << "HA(" + << printable(l.status ? *l.status : l.haBroker->getStatus()) + << ")" << l.tail << ": "; +} + +}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.h b/qpid/cpp/src/qpid/ha/LogPrefix.h index c11e03f0ce..b45145fbb7 100644 --- a/qpid/cpp/src/qpid/ha/ReplicateLevel.h +++ b/qpid/cpp/src/qpid/ha/LogPrefix.h @@ -1,5 +1,5 @@ -#ifndef QPID_HA_REPLICATELEVEL_H -#define QPID_HA_REPLICATELEVEL_H +#ifndef QPID_HA_LOGPREFIX_H +#define QPID_HA_LOGPREFIX_H /* * @@ -22,31 +22,36 @@ * */ -#include <string> +#include "Enum.h" #include <iosfwd> +#include <string> namespace qpid { namespace ha { -enum ReplicateLevel { RL_NONE, RL_CONFIGURATION, RL_ALL }; +class HaBroker; /** - * If str is a valid replicate level, set out and return true. + * Standard information to prefix log messages. */ -bool replicateLevel(const std::string& str, ReplicateLevel& out); +class LogPrefix +{ + public: + /** For use by all classes other than HaBroker */ + LogPrefix(HaBroker& hb, const std::string& queue=std::string()); -/** - *@return enum corresponding to string level. - *@throw qpid::Exception if level is not a valid replication level. - */ -ReplicateLevel replicateLevel(const std::string& level); + /** For use by the HaBroker itself. */ + LogPrefix(BrokerStatus&); -/**@return string form of replicate level */ -std::string str(ReplicateLevel l); + private: + HaBroker* haBroker; + BrokerStatus* status; + std::string tail; + friend std::ostream& operator<<(std::ostream& o, const LogPrefix& l); +}; -std::ostream& operator<<(std::ostream&, ReplicateLevel); -std::istream& operator>>(std::istream&, ReplicateLevel&); +std::ostream& operator<<(std::ostream& o, const LogPrefix& l); -}} // namespaces qpid::ha +}} // namespace qpid::ha -#endif /*!QPID_HA_REPLICATELEVEL_H*/ +#endif /*!QPID_HA_LOGPREFIX_H*/ diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index bf17d27ca3..27cd4d7111 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -52,7 +52,6 @@ Primary::Primary(HaBroker& b) : QPID_LOG(debug, logPrefix << "Need backup of " << *i << ", " << unready << " unready queues"); } - QPID_LOG(critical, "FIXME Primary " << queues.size() << " queues"); if (queues.empty()) activate(*(sys::Mutex::ScopedLock*)0); // fake lock, ok in ctor. else { diff --git a/qpid/cpp/src/qpid/ha/Primary.h b/qpid/cpp/src/qpid/ha/Primary.h new file mode 100644 index 0000000000..5a1a61ae75 --- /dev/null +++ b/qpid/cpp/src/qpid/ha/Primary.h @@ -0,0 +1,71 @@ +#ifndef QPID_HA_PRIMARY_H +#define QPID_HA_PRIMARY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "LogPrefix.h" +#include "qpid/sys/Mutex.h" +#include <boost/shared_ptr.hpp> +#include <map> +#include <string> + +namespace qpid { + +namespace broker { +class Queue; +} + +namespace ha { +class HaBroker; + +/** + * State associated with a primary broker. Tracks replicating + * subscriptions to determine when primary is ready. + * + * THREAD SAFE: addReplica is called in arbitray threads. + */ +class Primary +{ + public: + static Primary* get() { return instance; } + Primary(HaBroker& b); + + void addReplica(const std::string& q); + void removeReplica(const std::string& q); + + private: + typedef std::map<std::string, size_t> QueueCounts; + + void activate(sys::Mutex::ScopedLock&); + + sys::Mutex lock; + HaBroker& haBroker; + LogPrefix logPrefix; + QueueCounts queues; + size_t expected, unready; + bool activated; + + static Primary* instance; +}; +}} // namespace qpid::ha + +#endif /*!QPID_HA_PRIMARY_H*/ diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp index c6af388d9d..8eb7e441a2 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.cpp @@ -19,6 +19,7 @@ * */ +#include "Counter.h" #include "QueueReplicator.h" #include "ReplicatingSubscription.h" #include "qpid/broker/Bridge.h" @@ -44,19 +45,31 @@ namespace ha { using namespace broker; using namespace framing; -const std::string QueueReplicator::DEQUEUE_EVENT_KEY("qpid.dequeue-event"); -const std::string QueueReplicator::POSITION_EVENT_KEY("qpid.position-event"); +const std::string QPID_HA_EVENT_PREFIX("qpid.ha-event:"); +const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue"); +const std::string QueueReplicator::POSITION_EVENT_KEY(QPID_HA_EVENT_PREFIX+"position"); +const std::string QueueReplicator::READY_EVENT_KEY(QPID_HA_EVENT_PREFIX+"ready"); std::string QueueReplicator::replicatorName(const std::string& queueName) { return QPID_REPLICATOR_ + queueName; } -QueueReplicator::QueueReplicator(boost::shared_ptr<Queue> q, boost::shared_ptr<Link> l) - : Exchange(replicatorName(q->getName()), 0, q->getBroker()), queue(q), link(l) +bool QueueReplicator::isEventKey(const std::string key) { + const std::string& prefix = QPID_HA_EVENT_PREFIX; + bool ret = key.size() > prefix.size() && key.compare(0, prefix.size(), prefix) == 0; + return ret; +} + +QueueReplicator::QueueReplicator(const LogPrefix& lp, + boost::shared_ptr<Queue> q, + boost::shared_ptr<Link> l, + Counter* counter) + : Exchange(replicatorName(q->getName()), 0, q->getBroker()), + logPrefix(lp), queue(q), link(l), + unreadyCount(counter) { framing::Uuid uuid(true); bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str(); - logPrefix = "HA: Backup of " + queue->getName() + ": "; QPID_LOG(info, logPrefix << "Created"); } @@ -103,6 +116,8 @@ void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHa const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; + if (unreadyCount) ++(*unreadyCount); // We are unready. + // FIXME aconway 2011-12-09: Failover optimization removed. // There was code here to re-use messages already on the backup // during fail-over. This optimization was removed to simplify @@ -149,13 +164,18 @@ void QueueReplicator::route(Deliverable& msg) try { const std::string& key = msg.getMessage().getRoutingKey(); sys::Mutex::ScopedLock l(lock); - if (key == DEQUEUE_EVENT_KEY) { + if (!isEventKey(key)) { + msg.deliverTo(queue); + QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); + } + else if (key == DEQUEUE_EVENT_KEY) { SequenceSet dequeues = decodeContent<SequenceSet>(msg.getMessage()); QPID_LOG(trace, logPrefix << "Dequeue: " << dequeues); //TODO: should be able to optimise the following for (SequenceSet::iterator i = dequeues.begin(); i != dequeues.end(); i++) dequeue(*i, l); - } else if (key == POSITION_EVENT_KEY) { + } + else if (key == POSITION_EVENT_KEY) { SequenceNumber position = decodeContent<SequenceNumber>(msg.getMessage()); QPID_LOG(trace, logPrefix << "Position moved from " << queue->getPosition() << " to " << position); @@ -165,10 +185,12 @@ void QueueReplicator::route(Deliverable& msg) << queue->getPosition() << " to " << position)); } queue->setPosition(position); - } else { - msg.deliverTo(queue); - QPID_LOG(trace, logPrefix << "Enqueued message " << queue->getPosition()); } + else if (key == READY_EVENT_KEY) { + QPID_LOG(info, logPrefix << "caught up at " << queue->getPosition()); + if (unreadyCount) --(*unreadyCount); // We are now ready. + } + // Ignore unknown event keys, may be introduced in later versions. } catch (const std::exception& e) { QPID_LOG(critical, logPrefix << "Replication failed: " << e.what()); diff --git a/qpid/cpp/src/qpid/ha/QueueReplicator.h b/qpid/cpp/src/qpid/ha/QueueReplicator.h index 26fb9456d1..db4a901274 100644 --- a/qpid/cpp/src/qpid/ha/QueueReplicator.h +++ b/qpid/cpp/src/qpid/ha/QueueReplicator.h @@ -21,6 +21,8 @@ * under the License. * */ + +#include "LogPrefix.h" #include "qpid/broker/Exchange.h" #include "qpid/framing/SequenceSet.h" #include <boost/enable_shared_from_this.hpp> @@ -39,6 +41,8 @@ class Deliverable; namespace ha { +class Counter; + /** * Exchange created on a backup broker to replicate a queue on the primary. * @@ -54,9 +58,19 @@ class QueueReplicator : public broker::Exchange, public: static const std::string DEQUEUE_EVENT_KEY; static const std::string POSITION_EVENT_KEY; + static const std::string READY_EVENT_KEY; static std::string replicatorName(const std::string& queueName); + /** Test if a string is an event key */ + static bool isEventKey(const std::string key); + + /** + * @para unreadyCount can be 0 if we don't need a ready count from this queue. + */ + QueueReplicator(const LogPrefix&, + boost::shared_ptr<broker::Queue> q, + boost::shared_ptr<broker::Link> l, + Counter* unreadyCount=0); - QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); void activate(); // Call after ctor @@ -73,12 +87,13 @@ class QueueReplicator : public broker::Exchange, void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler); void dequeue(framing::SequenceNumber, const sys::Mutex::ScopedLock&); - std::string logPrefix; + LogPrefix logPrefix; std::string bridgeName; sys::Mutex lock; boost::shared_ptr<broker::Queue> queue; boost::shared_ptr<broker::Link> link; boost::shared_ptr<broker::Bridge> bridge; + Counter* unreadyCount; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp b/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp deleted file mode 100644 index 4981577225..0000000000 --- a/qpid/cpp/src/qpid/ha/ReplicateLevel.cpp +++ /dev/null @@ -1,72 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "ReplicateLevel.h" -#include "qpid/Exception.h" -#include "qpid/Msg.h" -#include <iostream> -#include <assert.h> - -namespace qpid { -namespace ha { - -using namespace std; - -// Note replicateLevel is called during plugin-initialization which -// happens in the static construction phase so these constants need -// to be POD, they can't be class objects -// -namespace { -const char* S_NONE="none"; -const char* S_CONFIGURATION="configuration"; -const char* S_ALL="all"; -} - -bool replicateLevel(const string& level, ReplicateLevel& out) { - if (level == S_NONE) { out = RL_NONE; return true; } - if (level == S_CONFIGURATION) { out = RL_CONFIGURATION; return true; } - if (level == S_ALL) { out = RL_ALL; return true; } - return false; -} - -ReplicateLevel replicateLevel(const string& level) { - ReplicateLevel rl; - if (!replicateLevel(level, rl)) - throw Exception("Invalid value for replication level: "+level); - return rl; -} - -string str(ReplicateLevel l) { - const char* names[] = { S_NONE, S_CONFIGURATION, S_ALL }; - if (l > RL_ALL) - throw Exception(QPID_MSG("Invalid value for replication level: " << l)); - return names[l]; -} - -ostream& operator<<(ostream& o, ReplicateLevel rl) { return o << str(rl); } - -istream& operator>>(istream& i, ReplicateLevel& rl) { - string str; - i >> str; - rl = replicateLevel(str); - return i; -} - -}} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp index 91a4538bc4..9067063fcf 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp @@ -20,6 +20,7 @@ */ #include "ReplicatingSubscription.h" +#include "Primary.h" #include "qpid/broker/Queue.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/ConnectionState.h" @@ -64,14 +65,25 @@ ReplicatingSubscription::Factory::create( boost::shared_ptr<ReplicatingSubscription> rs; if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) { rs.reset(new ReplicatingSubscription( + haBroker, parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments)); queue->addObserver(rs); + // NOTE: readyPosition must be set _after_ addObserver, so + // messages can't be enqueued after setting readyPosition + // but before registering the observer. + rs->readyPosition = queue->getPosition(); + QPID_LOG(debug, rs->logPrefix << "created backup subscription, catching up to " + << QueuedMessage(rs->getQueue().get(), 0, rs->readyPosition) + << rs->logSuffix); + + } return rs; } ReplicatingSubscription::ReplicatingSubscription( + HaBroker& haBroker, SemanticState* parent, const string& name, Queue::shared_ptr queue, @@ -84,15 +96,15 @@ ReplicatingSubscription::ReplicatingSubscription( const framing::FieldTable& arguments ) : ConsumerImpl(parent, name, queue, ack, acquire, exclusive, tag, resumeId, resumeTtl, arguments), + logPrefix(haBroker, queue->getName()), events(new Queue(mask(name))), - consumer(new DelegatingConsumer(*this)) + consumer(new DelegatingConsumer(*this)), + sentReady(false) { - // Separate the remote part from a "local-remote" address. + // Separate the remote part from a "local-remote" address for logging. string address = parent->getSession().getConnection().getUrl(); size_t i = address.find('-'); if (i != string::npos) address = address.substr(i+1); - logPrefix = "HA: Primary "; - stringstream ss; logSuffix = " (" + address + ")"; // FIXME aconway 2011-12-09: Failover optimization removed. @@ -102,8 +114,6 @@ ReplicatingSubscription::ReplicatingSubscription( // can be re-introduced later. Last revision with the optimization: // r1213258 | QPID-3603: Fix QueueReplicator subscription parameters. - QPID_LOG(debug, logPrefix << "created backup subscription " << getName() << logSuffix); - // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0 // so we will start consuming from the lowest numbered message. // This is incorrect if the sequence number wraps around, but @@ -111,34 +121,50 @@ ReplicatingSubscription::ReplicatingSubscription( } // Message is delivered in the subscription's connection thread. -bool ReplicatingSubscription::deliver(QueuedMessage& m) { +bool ReplicatingSubscription::deliver(QueuedMessage& qm) { try { // Add position events for the subscribed queue, not for the internal event queue. - if (m.queue && m.queue == getQueue().get()) { + if (qm.queue == getQueue().get()) { + QPID_LOG(trace, logPrefix << "replicating " << qm << logSuffix); sys::Mutex::ScopedLock l(lock); - if (position != m.position) + if (position != qm.position) throw Exception( QPID_MSG("Expected position " << position - << " but got " << m.position)); - // m.position is the position of the newly enqueued m on the local queue. - // backupPosition is latest position on the backup queue (before enqueueing m.) - if (m.position <= backupPosition) + << " but got " << qm.position)); + // qm.position is the position of the newly enqueued qm on the local queue. + // backupPosition is latest position on backup queue before enqueueing qm. + if (qm.position <= backupPosition) throw Exception( QPID_MSG("Expected position > " << backupPosition - << " but got " << m.position)); + << " but got " << qm.position)); - if (m.position - backupPosition > 1) { + if (qm.position - backupPosition > 1) { // Position has advanced because of messages dequeued ahead of us. - SequenceNumber send(m.position); - --send; // Send the position before m was enqueued. + SequenceNumber send(qm.position); + --send; // Send the position before qm was enqueued. sendPositionEvent(send, l); } - backupPosition = m.position; - QPID_LOG(trace, logPrefix << "replicating " << m << logSuffix); + backupPosition = qm.position; + // Deliver the message + bool delivered = ConsumerImpl::deliver(qm); + + // We have advanced to the initial position, backup is ready. + if (!sentReady && qm.position >= readyPosition) { + sendReadyEvent(l); + sentReady = true; + QPID_LOG(info, logPrefix << "Caught up at " << qm + << logSuffix); + // If we are in a primary broker, notify that a subscription is ready. + // FIXME aconway 2012-04-30: rename addReplica->readyReplica + if (Primary::get()) + Primary::get()->addReplica(qm.queue->getName()); + } + return delivered; } - return ConsumerImpl::deliver(m); + else + return ConsumerImpl::deliver(qm); // Message is for internal event queue. } catch (const std::exception& e) { - QPID_LOG(critical, logPrefix << "error replicating " << getQueue()->getName() + QPID_LOG(critical, logPrefix << "error replicating " << qm << logSuffix << ": " << e.what()); throw; } @@ -154,7 +180,7 @@ void ReplicatingSubscription::complete( const QueuedMessage& qm, const sys::Mutex::ScopedLock&) { // Handle completions for the subscribed queue, not the internal event queue. - if (qm.queue && qm.queue == getQueue().get()) { + if (qm.queue == getQueue().get()) { QPID_LOG(trace, logPrefix << "completed " << qm << logSuffix); Delayed::iterator i= delayed.find(qm.position); // The same message can be completed twice, by acknowledged and @@ -179,7 +205,6 @@ void ReplicatingSubscription::enqueued(const QueuedMessage& qm) { delayed[qm.position] = qm; } - // Function to complete a delayed message, called by cancel() void ReplicatingSubscription::cancelComplete( const Delayed::value_type& v, const sys::Mutex::ScopedLock&) @@ -195,7 +220,8 @@ void ReplicatingSubscription::cancel() boost::dynamic_pointer_cast<QueueObserver>(shared_from_this())); { sys::Mutex::ScopedLock l(lock); - QPID_LOG(debug, logPrefix << "cancel backup subscription " << getName() << logSuffix); + QPID_LOG(debug, logPrefix << "cancel backup subscription to " + << getQueue()->getName() << logSuffix); for_each(delayed.begin(), delayed.end(), boost::bind(&ReplicatingSubscription::cancelComplete, this, _1, boost::ref(l))); delayed.clear(); @@ -255,6 +281,13 @@ void ReplicatingSubscription::sendPositionEvent( sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l); } +// Called with lock held. Called in subscription's connection thread. +void ReplicatingSubscription::sendReadyEvent(const sys::Mutex::ScopedLock&l ) +{ + framing::Buffer buffer; + sendEvent(QueueReplicator::READY_EVENT_KEY, buffer, l); +} + void ReplicatingSubscription::sendEvent(const std::string& key, framing::Buffer& buffer, const sys::Mutex::ScopedLock&) { @@ -300,7 +333,7 @@ bool ReplicatingSubscription::doDispatch() ReplicatingSubscription::DelegatingConsumer::DelegatingConsumer(ReplicatingSubscription& c) : Consumer(c.getName(), true), delegate(c) {} ReplicatingSubscription::DelegatingConsumer::~DelegatingConsumer() {} -bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& m) { return delegate.deliver(m); } +bool ReplicatingSubscription::DelegatingConsumer::deliver(QueuedMessage& qm) { return delegate.deliver(qm); } void ReplicatingSubscription::DelegatingConsumer::notify() { delegate.notify(); } bool ReplicatingSubscription::DelegatingConsumer::filter(boost::intrusive_ptr<Message> msg) { return delegate.filter(msg); } bool ReplicatingSubscription::DelegatingConsumer::accept(boost::intrusive_ptr<Message> msg) { return delegate.accept(msg); } diff --git a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h index f9176915f6..952c970f41 100644 --- a/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h +++ b/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h @@ -51,12 +51,17 @@ namespace ha { * * THREAD SAFE: Used as a consumer in subscription's connection * thread, and as a QueueObserver in arbitrary connection threads. + * + * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer. + * */ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, public broker::QueueObserver { public: struct Factory : public broker::ConsumerFactory { + HaBroker& haBroker; + Factory(HaBroker& hb) : haBroker(hb) {} boost::shared_ptr<broker::SemanticState::ConsumerImpl> create( broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , @@ -68,7 +73,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, // Argument names for consume command. static const std::string QPID_REPLICATING_SUBSCRIPTION; - ReplicatingSubscription(broker::SemanticState* parent, + ReplicatingSubscription(HaBroker&, + broker::SemanticState* parent, const std::string& name, boost::shared_ptr<broker::Queue> , bool ack, bool acquire, bool exclusive, const std::string& tag, const std::string& resumeId, uint64_t resumeTtl, @@ -77,13 +83,13 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, ~ReplicatingSubscription(); // QueueObserver overrides. - bool deliver(broker::QueuedMessage& msg); void enqueued(const broker::QueuedMessage&); void dequeued(const broker::QueuedMessage&); void acquired(const broker::QueuedMessage&) {} void requeued(const broker::QueuedMessage&) {} // Consumer overrides. + bool deliver(broker::QueuedMessage& msg); void cancel(); void acknowledged(const broker::QueuedMessage&); bool browseAcquired() const { return true; } @@ -94,17 +100,22 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, bool doDispatch(); private: typedef std::map<framing::SequenceNumber, broker::QueuedMessage> Delayed; - std::string logPrefix, logSuffix; + + LogPrefix logPrefix; + std::string logSuffix; boost::shared_ptr<broker::Queue> events; boost::shared_ptr<broker::Consumer> consumer; Delayed delayed; framing::SequenceSet dequeues; framing::SequenceNumber backupPosition; + framing::SequenceNumber readyPosition; + bool sentReady; void complete(const broker::QueuedMessage&, const sys::Mutex::ScopedLock&); void cancelComplete(const Delayed::value_type& v, const sys::Mutex::ScopedLock&); void sendDequeueEvent(const sys::Mutex::ScopedLock&); void sendPositionEvent(framing::SequenceNumber, const sys::Mutex::ScopedLock&); + void sendReadyEvent(const sys::Mutex::ScopedLock&); void sendEvent(const std::string& key, framing::Buffer&, const sys::Mutex::ScopedLock&); @@ -126,6 +137,8 @@ class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl, private: ReplicatingSubscription& delegate; }; + + friend class Factory; }; diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index bf70c3f3f7..08d42471b8 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -22,7 +22,7 @@ * */ -#include "ReplicateLevel.h" +#include "Enum.h" #include <string> namespace qpid { @@ -34,12 +34,14 @@ namespace ha { class Settings { public: - Settings() : cluster(false), expectedBackups(0), replicateDefault(RL_NONE) {} + Settings() : cluster(false), expectedBackups(0), replicateDefault(NONE) + {} + bool cluster; // True if we are a cluster member. std::string clientUrl; std::string brokerUrl; size_t expectedBackups; - ReplicateLevel replicateDefault; + Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; private: }; diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py index 8255fbe9ac..257ac68b74 100644 --- a/qpid/cpp/src/tests/brokertest.py +++ b/qpid/cpp/src/tests/brokertest.py @@ -77,17 +77,19 @@ def error_line(filename, n=1): return ":\n" + "".join(result) def retry(function, timeout=10, delay=.01): - """Call function until it returns True or timeout expires. - Double the delay for each retry. Return True if function - returns true, False if timeout expires.""" + """Call function until it returns a true value or timeout expires. + Double the delay for each retry. Returns what function returns if + true, None if timeout expires.""" deadline = time.time() + timeout - while not function(): + ret = None + while not ret: + ret = function() remaining = deadline - time.time() if remaining <= 0: return False delay = min(delay, remaining) time.sleep(delay) delay *= 2 - return True + return ret class AtomicCounter: def __init__(self): @@ -298,9 +300,9 @@ class Broker(Popen): # Read port from broker process stdout if not already read. if (self._port == 0): try: self._port = int(self.stdout.readline()) - except ValueError: - raise Exception("Can't get port for broker %s (%s)%s" % - (self.name, self.pname, error_line(self.log,5))) + except ValueError, e: + raise Exception("Can't get port for broker %s (%s)%s: %s" % + (self.name, self.pname, error_line(self.log,5), e)) return self._port def unexpected(self,msg): diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 827cb7dca9..15137a0c5f 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -28,13 +28,23 @@ from qpidtoollibs import BrokerAgent log = getLogger(__name__) +class QmfHaBroker(object): + def __init__(self, address): + self.connection = Connection.establish( + address, client_properties={"qpid.ha-admin":1}) + self.qmf = BrokerAgent(self.connection) + self.ha_broker = self.qmf.getHaBroker() + if not self.ha_broker: + raise Exception("HA module is not loaded on broker at %s"%address) + class HaBroker(Broker): def __init__(self, test, args=[], broker_url=None, ha_cluster=True, ha_replicate="all", **kwargs): assert BrokerTest.ha_lib, "Cannot locate HA plug-in" args = copy(args) args += ["--load-module", BrokerTest.ha_lib, - "--log-enable=info+", "--log-enable=debug+:ha::", + "--log-enable=info+", + "--log-enable=debug+:ha::", # FIXME aconway 2012-02-13: workaround slow link failover. "--link-maintenace-interval=0.1", "--ha-cluster=%s"%ha_cluster] @@ -42,32 +52,31 @@ class HaBroker(Broker): args += [ "--ha-replicate=%s"%ha_replicate ] if broker_url: args.extend([ "--ha-brokers", broker_url ]) Broker.__init__(self, test, args, **kwargs) - self.commands=os.getenv("PYTHON_COMMANDS") - assert os.path.isdir(self.commands) + self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha") + assert os.path.exists(self.qpid_ha_path) + self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config") + assert os.path.exists(self.qpid_config_path) getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. + self.qpid_ha_script=import_script(self.qpid_ha_path) - def promote(self): - assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0 - - def set_client_url(self, url): - assert os.system( - "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0 + def qpid_ha(self, args): self.qpid_ha_script.main(["", "-b", self.host_port()]+args) - def set_broker_url(self, url): - assert os.system( - "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0 + def promote(self): self.qpid_ha(["promote"]) + def set_client_url(self, url): self.qpid_ha(["set", "--public-brokers", url]) + def set_broker_url(self, url): self.qpid_ha(["set", "--brokers", url]) + def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue]) + def ha_status(self): QmfHaBroker(self.host_port()).ha_broker.status - def replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + # FIXME aconway 2012-05-01: do direct python call to qpid-config code. + def qpid_config(self, args): + assert subprocess.call( + [self.qpid_config_path, "--broker", self.host_port()]+args) == 0 def config_replicate(self, from_broker, queue): - assert os.system( - "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0 + self.qpid_config(["add", "queue", "--start-replica", from_broker, queue]) def config_declare(self, queue, replication): - assert os.system( - "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0 + self.qpid_config(["add", "queue", queue, "--replicate", replication]) def connect_admin(self, **kwargs): return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs) @@ -86,17 +95,47 @@ class HaBroker(Broker): assert_browse_retry(bs, queue, expected, **kwargs) finally: bs.connection.close() + def assert_connect_fail(self): + try: + self.connect() + self.test.fail("Expected ConnectionError") + except ConnectionError: pass + + def connect_retry(self): + def try_connect(): + try: return self.connect() + except ConnectionError: return None + c = retry(try_connect) + if c: return c + else: self.test.fail("Failed to connect") + class HaCluster(object): _cluster_count = 0 def __init__(self, test, n, **kwargs): """Start a cluster of n brokers""" self.test = test - self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)] + self.kwargs = kwargs + self._brokers = [] + self.id = HaCluster._cluster_count HaCluster._cluster_count += 1 + for i in xrange(n): self.start(False) + self.update_urls() + self[0].promote() + + def start(self, update_urls=True): + """Start a new broker in the cluster""" + b = HaBroker( + self.test, + name="broker%s-%s"%(self.id, len(self._brokers)), + **self.kwargs) + self._brokers.append(b) + if update_urls: self.update_urls() + return b + + def update_urls(self): self.url = ",".join([b.host_port() for b in self]) for b in self: b.set_broker_url(self.url) - self[0].promote() def connect(self, i): """Connect with reconnect_urls""" @@ -108,11 +147,15 @@ class HaCluster(object): self[i].expect = EXPECT_EXIT_FAIL self[(i+1) % len(self)].promote() + def restart(self, i): + b = self._brokers[i] + self._brokers[i] = HaBroker( + self.test, name=b.name, port=b.port(), broker_url=self.url, **self.kwargs) + def bounce(self, i): """Stop and restart a broker in a cluster.""" self.kill(i) - b = self[i] - self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url) + self.restart(i) # Behave like a list of brokers. def __len__(self): return len(self._brokers) @@ -344,6 +387,7 @@ class ReplicationTests(BrokerTest): def test_standalone_queue_replica(self): """Test replication of individual queues outside of cluster mode""" + getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover. primary = HaBroker(self, name="primary", ha_cluster=False) pc = primary.connect() ps = pc.session().sender("q;{create:always}") @@ -559,6 +603,26 @@ class ReplicationTests(BrokerTest): test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}"); test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}") + def test_promoting(self): + """Verify that the primary broker does not go active until expected + backups have connected or timeout expires.""" + cluster = HaCluster(self, 3, args=["--ha-expected-backups=2"]) + c = cluster[0].connect() + for i in xrange(10): + s = c.session().sender("q%s;{create:always}"%i) + for j in xrange(100): s.send(str(j)) + cluster.kill(0) # Fail over to 1 + cluster[1].assert_connect_fail() # Waiting for backups, won't accept clients. + cluster.restart(0) + c = cluster[1].connect_retry() + cluster[1].assert_browse_backup("q0", [str(i) for i in xrange(100)]); + + # Verify in logs that all queue catch-up happened before the transition to active. + log = open(cluster[1].log).read() + i = log.find("Status change: promoting -> active") + self.failIf(i < 0) + self.assertEqual(log.find("caught up", i), -1) + def fairshare(msgs, limit, levels): """ Generator to return prioritised messages in expected order for a given fairshare limit @@ -602,14 +666,15 @@ class LongTests(BrokerTest): else: return 3 # Default is to be quick - def disable_test_failover(self): + def disable_test_failover_send_receive(self): """Test failover with continuous send-receive""" # FIXME aconway 2012-02-03: fails due to dropped messages, # known issue: sending messages to new primary before # backups are ready. Enable when fixed. # Start a cluster, all members will be killed during the test. - brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL) + brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL, + args=["--ha-expected-backups=2"]) for name in ["ha0","ha1","ha2"] ] url = ",".join([b.host_port() for b in brokers]) for b in brokers: b.set_broker_url(url) @@ -620,30 +685,31 @@ class LongTests(BrokerTest): receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False) receiver.start() sender.start() - # Wait for sender & receiver to get up and running - assert retry(lambda: receiver.received > 100) - # Kill and restart brokers in a cycle: - endtime = time.time() + self.duration() - i = 0 - while time.time() < endtime or i < 3: # At least 3 iterations - sender.sender.assert_running() - receiver.receiver.assert_running() - port = brokers[i].port() - brokers[i].kill() - brokers.append( - HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, - expect=EXPECT_EXIT_FAIL)) - i += 1 - brokers[i].promote() - n = receiver.received # Verify we're still running - def enough(): - receiver.check() # Verify no exceptions - return receiver.received > n + 100 - assert retry(enough, timeout=5) - - sender.stop() - receiver.stop() - for b in brokers[i:]: b.kill() + try: + # Wait for sender & receiver to get up and running + assert retry(lambda: receiver.received > 100) + # Kill and restart brokers in a cycle: + endtime = time.time() + self.duration() + i = 0 + while time.time() < endtime or i < 3: # At least 3 iterations + sender.sender.assert_running() + receiver.receiver.assert_running() + port = brokers[i].port() + brokers[i].kill() + brokers.append( + HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port, + expect=EXPECT_EXIT_FAIL)) + i += 1 + brokers[i].promote() + n = receiver.received # Verify we're still running + def enough(): + receiver.check() # Verify no exceptions + return receiver.received > n + 100 + assert retry(enough, timeout=5) + finally: + sender.stop() + receiver.stop() + for b in brokers[i:]: b.kill() if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
