diff options
author | Alan Conway <aconway@apache.org> | 2011-11-25 21:51:40 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2011-11-25 21:51:40 +0000 |
commit | a3101943e1facd947c4a9e9a3ac3c07d9e78606e (patch) | |
tree | 01a60bfb265a72a3f7e96aa46fecca3cdbe0e847 | |
parent | 4884b20b88b740bcc66dfaec246dabcdd3dfa21d (diff) | |
download | qpid-python-a3101943e1facd947c4a9e9a3ac3c07d9e78606e.tar.gz |
QPID-3603: Move wiring-replicator creation out of SemanticState::route.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1206349 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
-rw-r--r-- | qpid/cpp/src/ha.mk | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 110 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.cpp | 178 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/ha/WiringReplicator.h | 21 |
7 files changed, 161 insertions, 164 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 758130e2a0..58be6dfe70 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -595,8 +595,6 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PriorityQueue.cpp \ qpid/broker/NameGenerator.cpp \ qpid/broker/NameGenerator.h \ - qpid/ha/WiringReplicator.h \ - qpid/ha/WiringReplicator.cpp \ qpid/broker/NullMessageStore.cpp \ qpid/broker/NullMessageStore.h \ qpid/broker/OwnershipToken.h \ diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk index 66dbc950ea..ca6415d8dd 100644 --- a/qpid/cpp/src/ha.mk +++ b/qpid/cpp/src/ha.mk @@ -23,12 +23,14 @@ dmoduleexec_LTLIBRARIES += ha.la ha_la_SOURCES = \ - qpid/ha/HaPlugin.cpp \ - qpid/ha/HaBroker.cpp \ - qpid/ha/HaBroker.h \ qpid/ha/Backup.cpp \ qpid/ha/Backup.h \ - qpid/ha/Settings.h + qpid/ha/HaBroker.cpp \ + qpid/ha/HaBroker.h \ + qpid/ha/HaPlugin.cpp \ + qpid/ha/Settings.h \ + qpid/ha/WiringReplicator.cpp \ + qpid/ha/WiringReplicator.h ha_la_LIBADD = libqpidbroker.la ha_la_LDFLAGS = $(PLUGINLDFLAGS) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index aac7e34cbc..adfe645f41 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -499,7 +499,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) { std::string exchangeName = msg->getExchangeName(); if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) { cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues()); - if (!cacheExchange) cacheExchange = ha::WiringReplicator::create(exchangeName, getSession().getBroker()); if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName); } cacheExchange->setProperties(msg); diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 43c348b3ce..ceea6ccb68 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -20,11 +20,13 @@ */ #include "Backup.h" #include "Settings.h" +#include "WiringReplicator.h" #include "qpid/Url.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/broker/Bridge.h" #include "qpid/broker/Broker.h" #include "qpid/broker/SessionHandler.h" +#include "qpid/broker/Link.h" #include "qpid/framing/AMQP_ServerProxy.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" @@ -39,114 +41,24 @@ using namespace broker; using types::Variant; using std::string; -namespace { -const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); -const string _WHAT("_what"); -const string _CLASS_NAME("_class_name"); -const string _PACKAGE_NAME("_package_name"); -const string _SCHEMA_ID("_schema_id"); -const string OBJECT("OBJECT"); -const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); -const string QMF_DEFAULT_DIRECT("qmf.default.direct"); -const string QMF2("qmf2"); -const string QMF_OPCODE("qmf.opcode"); -const string _QUERY_REQUEST("_query_request"); -const string BROKER("broker"); -} - -void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); - Variant::Map request; - request[_WHAT] = OBJECT; - Variant::Map schema; - schema[_CLASS_NAME] = className; - schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; - request[_SCHEMA_ID] = schema; - - AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); - method.setBof(true); - method.setEof(false); - method.setBos(true); - method.setEos(true); - AMQHeaderBody headerBody; - MessageProperties* props = headerBody.get<MessageProperties>(true); - props->setReplyTo(qpid::framing::ReplyTo("", queueName)); - props->setAppId(QMF2); - props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); - headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); - AMQFrame header(headerBody); - header.setBof(false); - header.setEof(false); - header.setBos(true); - header.setEos(true); - AMQContentBody data; - qpid::amqp_0_10::MapCodec::encode(request, data.getData()); - AMQFrame content(data); - content.setBof(false); - content.setEof(true); - content.setBos(true); - content.setEos(true); - sessionHandler.out->handle(method); - sessionHandler.out->handle(header); - sessionHandler.out->handle(content); -} - -namespace { -const string QMF_DEFAULT_TOPIC("qmf.default.topic"); -const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); -const string QUEUE("queue"); -const string EXCHANGE("exchange"); -const string BINDING("binding"); -} - -// Initialize a bridge as a wiring replicator. -void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) { - framing::AMQP_ServerProxy peer(sessionHandler.out); - string queueName = bridge.getQueueName(); - const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); - - //declare and bind an event queue - peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); - peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); - //subscribe to the queue - peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); - peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); - peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); - - //issue a query request for queues and another for exchanges using event queue as the reply-to address - sendQuery(QUEUE, queueName, sessionHandler); - sendQuery(EXCHANGE, queueName, sessionHandler); - sendQuery(BINDING, queueName, sessionHandler); -} - Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) { - // Create a link to replicate wiring if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary. Url url(s.brokerUrl); QPID_LOG(info, "HA backup broker connecting to: " << url); - string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol; - broker.getLinks().declare( // Declare the link + + // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over. + // Declare the link + std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( url[0].host, url[0].port, protocol, false, // durable s.mechanism, s.username, s.password); - - broker.getLinks().declare( // Declare the bridge - url[0].host, url[0].port, - false, // durable - QPID_WIRING_REPLICATOR, // src - QPID_WIRING_REPLICATOR, // dest - "", // key - false, // isQueue - false, // isLocal - "", // id/tag - "", // excludes - false, // dynamic - 0, // sync? - bridgeInitWiringReplicator - ); + assert(result.second); // FIXME aconway 2011-11-23: error handling + link = result.first; + boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link)); + broker.getExchanges().registerExchange(wr); + wr->initialize(); // Must be called after registering exchange. } - // FIXME aconway 2011-11-17: handle discovery of the primary broker and fail-over correctly. } }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h index 49997f9f5e..cee626379b 100644 --- a/qpid/cpp/src/qpid/ha/Backup.h +++ b/qpid/cpp/src/qpid/ha/Backup.h @@ -24,10 +24,12 @@ #include "Settings.h" #include "qpid/Url.h" +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { class Broker; +class Link; } namespace ha { @@ -44,6 +46,7 @@ class Backup private: broker::Broker& broker; Settings settings; + boost::shared_ptr<broker::Link> link; }; }} // namespace qpid::ha diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp index 5a1baaae55..2c37fa37d3 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp @@ -21,9 +21,13 @@ #include "WiringReplicator.h" #include "qpid/broker/Broker.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Link.h" +#include "qpid/framing/FieldTable.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" +#include "qpid/broker/SessionHandler.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/framing/MessageTransferBody.h" #include "qmf/org/apache/qpid/broker/EventBind.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" @@ -40,62 +44,69 @@ using qmf::org::apache::qpid::broker::EventExchangeDelete; using qmf::org::apache::qpid::broker::EventQueueDeclare; using qmf::org::apache::qpid::broker::EventQueueDelete; using qmf::org::apache::qpid::broker::EventSubscribe; - +using namespace framing; using std::string; using types::Variant; using namespace broker; -namespace{ +namespace { +const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); const string QPID_REPLICATE("qpid.replicate"); -const string ALL("all"); -const string WIRING("wiring"); const string CLASS_NAME("_class_name"); +const string EVENT("_event"); const string OBJECT_NAME("_object_name"); const string PACKAGE_NAME("_package_name"); -const string VALUES("_values"); -const string EVENT("_event"); -const string SCHEMA_ID("_schema_id"); const string QUERY_RESPONSE("_query_response"); +const string SCHEMA_ID("_schema_id"); +const string VALUES("_values"); -const string ARGUMENTS("arguments"); +const string ALL("all"); +const string ALTEX("altEx"); const string ARGS("args"); -const string QUEUE("queue"); -const string EXCHANGE("exchange"); +const string ARGUMENTS("arguments"); +const string AUTODEL("autoDel"); +const string AUTODELETE("autoDelete"); const string BIND("bind"); const string BINDING("binding"); +const string CREATED("created"); +const string DISP("disp"); const string DURABLE("durable"); -const string QNAME("qName"); -const string AUTODEL("autoDel"); -const string ALTEX("altEx"); -const string USER("user"); -const string RHOST("rhost"); -const string EXTYPE("exType"); +const string EXCHANGE("exchange"); const string EXNAME("exName"); -const string AUTODELETE("autoDelete"); +const string EXTYPE("exType"); +const string KEY("key"); const string NAME("name"); +const string QNAME("qName"); +const string QUEUE("queue"); +const string RHOST("rhost"); const string TYPE("type"); -const string DISP("disp"); -const string CREATED("created"); -const string KEY("key"); - +const string USER("user"); +const string WIRING("wiring"); -const string QMF_OPCODE("qmf.opcode"); -const string QMF_CONTENT("qmf.content"); +const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#"); const string QMF2("qmf2"); +const string QMF_CONTENT("qmf.content"); +const string QMF_DEFAULT_TOPIC("qmf.default.topic"); +const string QMF_OPCODE("qmf.opcode"); -const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator"); - - -bool isQMFv2(const Message& message) -{ +const string _WHAT("_what"); +const string _CLASS_NAME("_class_name"); +const string _PACKAGE_NAME("_package_name"); +const string _SCHEMA_ID("_schema_id"); +const string OBJECT("OBJECT"); +const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker"); +const string QMF_DEFAULT_DIRECT("qmf.default.direct"); +const string _QUERY_REQUEST("_query_request"); +const string BROKER("broker"); + +bool isQMFv2(const Message& message) { const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>(); return props && props->getAppId() == QMF2; } -template <class T> bool match(Variant::Map& schema) -{ +template <class T> bool match(Variant::Map& schema) { return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]); } @@ -110,12 +121,89 @@ bool isReplicated(const Variant::Map& m) { return i != m.end() && isReplicated(i->second.asString()); } +void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + Variant::Map request; + request[_WHAT] = OBJECT; + Variant::Map schema; + schema[_CLASS_NAME] = className; + schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER; + request[_SCHEMA_ID] = schema; + + AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0))); + method.setBof(true); + method.setEof(false); + method.setBos(true); + method.setEos(true); + AMQHeaderBody headerBody; + MessageProperties* props = headerBody.get<MessageProperties>(true); + props->setReplyTo(qpid::framing::ReplyTo("", queueName)); + props->setAppId(QMF2); + props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST); + headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER); + AMQFrame header(headerBody); + header.setBof(false); + header.setEof(false); + header.setBos(true); + header.setEos(true); + AMQContentBody data; + qpid::amqp_0_10::MapCodec::encode(request, data.getData()); + AMQFrame content(data); + content.setBof(false); + content.setEof(true); + content.setBos(true); + content.setEos(true); + sessionHandler.out->handle(method); + sessionHandler.out->handle(header); + sessionHandler.out->handle(content); +} } // namespace +WiringReplicator::~WiringReplicator() {} -WiringReplicator::WiringReplicator(const string& name, Broker& b) : Exchange(name), broker(b) {} +WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l) + : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l) +{} + +// We need to split out the initialization so that the WiringReplicator +// can be registered as an exchange before starting the bridge. +void WiringReplicator::initialize() { + assert(link->getBroker()); + broker.getLinks().declare( + link->getHost(), link->getPort(), + false, // durable + QPID_WIRING_REPLICATOR, // src + QPID_WIRING_REPLICATOR, // dest + "", // key + false, // isQueue + false, // isLocal + "", // id/tag + "", // excludes + false, // dynamic + 0, // sync? + boost::bind(&WiringReplicator::initializeBridge, this, _1, _2) + ); +} -WiringReplicator::~WiringReplicator() {} +// This is called in the connection IO thread when the bridge is started. +void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + framing::AMQP_ServerProxy peer(sessionHandler.out); + string queueName = bridge.getQueueName(); + const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); + + //declare and bind an event queue + peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable()); + peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable()); + //subscribe to the queue + peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable()); + peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF); + peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF); + + //issue a query request for queues and another for exchanges using event queue as the reply-to address + sendQuery(QUEUE, queueName, sessionHandler); + sendQuery(EXCHANGE, queueName, sessionHandler); + sendQuery(BINDING, queueName, sessionHandler); +} void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) { Variant::List list; @@ -176,7 +264,10 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) { args, values[USER].asString(), values[RHOST].asString()).second) { - // FIXME aconway 2011-11-22: should delete old queue and re-create from exchanges. + // FIXME aconway 2011-11-22: should delete old queue and + // re-create from event. + // Events are always up to date, whereas responses may be + // out of date. QPID_LOG(warning, "Replicated queue " << name << " already exists"); } } @@ -209,7 +300,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) { values[USER].asString(), values[RHOST].asString()).second) { // FIXME aconway 2011-11-22: should delete pre-exisitng exchange - // and re-create from event. Likewise for queues. + // and re-create from event. See comment in doEventQueueDeclare. QPID_LOG(warning, "Replicated exchange " << name << " already exists"); } } @@ -286,8 +377,6 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) { } } -// FIXME aconway 2011-11-21: refactor to remove redundancy between do* functions. - namespace { const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:"); const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:"); @@ -299,7 +388,7 @@ std::string getRefName(const std::string& prefix, const Variant& ref) { throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref)); const std::string name = i->second.asString(); if (name.compare(0, prefix.size(), prefix) != 0) - throw Exception(QPID_MSG("Replicator unexpected reference prefix: " << name)); + throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name)); std::string ret = name.substr(prefix.size()); return ret; } @@ -336,21 +425,6 @@ void WiringReplicator::doResponseBind(Variant::Map& values) { } catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange. } -boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, Broker& broker) -{ - boost::shared_ptr<Exchange> exchange; - if (isWiringReplicatorDestination(target)) { - //TODO: need to cache the exchange - exchange.reset(new WiringReplicator(target, broker)); - } - return exchange; -} - -bool WiringReplicator::isWiringReplicatorDestination(const string& target) -{ - return target == QPID_WIRING_REPLICATOR; -} - bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; } bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; } diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h index 66e5454ea7..fa4cf33b6e 100644 --- a/qpid/cpp/src/qpid/ha/WiringReplicator.h +++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h @@ -24,13 +24,15 @@ #include "qpid/broker/Exchange.h" #include "qpid/types/Variant.h" - -// FIXME aconway 2011-11-17: relocate to ../ha +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { class Broker; +class Link; +class Bridge; +class SessionHandler; } namespace ha { @@ -42,19 +44,23 @@ namespace ha { class WiringReplicator : public broker::Exchange { public: - WiringReplicator(const std::string&, broker::Broker&); + WiringReplicator(const boost::shared_ptr<broker::Link>&); ~WiringReplicator(); std::string getType() const; + + // Call this after the WiringReplicator has been registered as an exchange. + void initialize(); + + // Exchange methods bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); - static bool isWiringReplicatorDestination(const std::string&); - static boost::shared_ptr<broker::Exchange> create(const std::string&, broker::Broker&); static const std::string typeName; - private: + private: + void initializeBridge(broker::Bridge&, broker::SessionHandler&); void doEventQueueDeclare(types::Variant::Map& values); void doEventQueueDelete(types::Variant::Map& values); void doEventExchangeDeclare(types::Variant::Map& values); @@ -65,7 +71,10 @@ class WiringReplicator : public broker::Exchange void doResponseBind(types::Variant::Map& values); private: + void startQueueReplicator(const std::string& name); + broker::Broker& broker; + boost::shared_ptr<broker::Link> link; }; }} // namespace qpid::broker |