diff options
| author | Alan Conway <aconway@apache.org> | 2012-05-09 15:38:32 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2012-05-09 15:38:32 +0000 |
| commit | 488c89479e258f9a551e0c357a4214f3e5b75892 (patch) | |
| tree | 73a46f42cd710d107f536f92e7d88844eed1aacf /qpid/cpp/src | |
| parent | eac63c52d9b8cd6f722bade2ab92861bfdd5f30d (diff) | |
| download | qpid-python-488c89479e258f9a551e0c357a4214f3e5b75892.tar.gz | |
QPID-3603: Make failover subscrption optional for Links.
Used by ha::Backup because HA brokers use a different reconnect list from clients.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 111 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.cpp | 12 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/LinkRegistry.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 5 |
5 files changed, 77 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 467d422721..ab28c5b01f 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -136,7 +136,8 @@ Link::Link(const string& _name, const string& _username, const string& _password, Broker* _broker, - Manageable* parent) + Manageable* parent, + bool failover_) : name(_name), links(_links), configuredTransport(_transport), configuredHost(_host), configuredPort(_port), host(_host), port(_port), transport(_transport), @@ -152,6 +153,7 @@ Link::Link(const string& _name, agent(0), listener(l), timerTask(new LinkTimerTask(*this, broker->getTimer())), + failover(failover_), failoverChannel(0) { if (parent != 0 && broker != 0) @@ -174,13 +176,15 @@ Link::Link(const string& _name, } broker->getTimer().add(timerTask); - stringstream exchangeName; - exchangeName << "qpid.link." << name; - std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(), - exchangeTypeName); - failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); - assert(failoverExchange); - failoverExchange->setLink(this); + if (failover) { + stringstream exchangeName; + exchangeName << "qpid.link." << name; + std::pair<Exchange::shared_ptr, bool> rc = + broker->getExchanges().declare(exchangeName.str(), exchangeTypeName); + failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first); + assert(failoverExchange); + failoverExchange->setLink(this); + } } Link::~Link () @@ -192,7 +196,8 @@ Link::~Link () if (mgmtObject != 0) mgmtObject->resourceDestroy (); - broker->getExchanges().destroy(failoverExchange->getName()); + if (failover) + broker->getExchanges().destroy(failoverExchange->getName()); } void Link::setStateLH (int newState) @@ -273,9 +278,7 @@ class DetachedCallback : public SessionHandler::ErrorListener { void connectionException(framing::connection::CloseCode, const std::string&) {} void channelException(framing::session::DetachCode, const std::string&) {} void executionException(framing::execution::ErrorCode, const std::string&) {} - void detach() { - QPID_LOG(notice, "detached from 'amq.failover' for link: " << name); - } + void detach() {} private: const std::string name; }; @@ -300,42 +303,44 @@ void Link::opened() { QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url); } - // - // attempt to subscribe to failover exchange for updates from remote - // - - const std::string queueName = "qpid.link." + framing::Uuid(true).str(); - failoverChannel = nextChannel(); - - SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - sessionHandler.setErrorListener( - boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); - failoverSession = queueName; - sessionHandler.attachAs(failoverSession); - - framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - - remoteBroker.getQueue().declare(queueName, - "", // alt-exchange - false, // passive - false, // durable - true, // exclusive - true, // auto-delete - FieldTable()); - remoteBroker.getExchange().bind(queueName, - FAILOVER_EXCHANGE, - "", // no key - FieldTable()); - remoteBroker.getMessage().subscribe(queueName, - failoverExchange->getName(), - 1, // implied-accept mode - 0, // pre-acquire mode - false, // exclusive - "", // resume-id - 0, // resume-ttl + if (failover) { + // + // attempt to subscribe to failover exchange for updates from remote + // + + const std::string queueName = "qpid.link." + framing::Uuid(true).str(); + failoverChannel = nextChannel(); + + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + sessionHandler.setErrorListener( + boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this))); + failoverSession = queueName; + sessionHandler.attachAs(failoverSession); + + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + + remoteBroker.getQueue().declare(queueName, + "", // alt-exchange + false, // passive + false, // durable + true, // exclusive + true, // auto-delete FieldTable()); - remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); - remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + remoteBroker.getExchange().bind(queueName, + FAILOVER_EXCHANGE, + "", // no key + FieldTable()); + remoteBroker.getMessage().subscribe(queueName, + failoverExchange->getName(), + 1, // implied-accept mode + 0, // pre-acquire mode + false, // exclusive + "", // resume-id + 0, // resume-ttl + FieldTable()); + remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF); + remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF); + } } void Link::closed(int, std::string text) @@ -711,11 +716,13 @@ void Link::closeConnection( const std::string& reason) { if (connection != 0) { // cancel our subscription to the failover exchange - SessionHandler& sessionHandler = connection->getChannel(failoverChannel); - if (sessionHandler.getSession()) { - framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); - remoteBroker.getMessage().cancel(failoverExchange->getName()); - remoteBroker.getSession().detach(failoverSession); + if (failover) { + SessionHandler& sessionHandler = connection->getChannel(failoverChannel); + if (sessionHandler.getSession()) { + framing::AMQP_ServerProxy remoteBroker(sessionHandler.out); + remoteBroker.getMessage().cancel(failoverExchange->getName()); + remoteBroker.getSession().detach(failoverSession); + } } connection->close(CLOSE_CODE_CONNECTION_FORCED, reason); connection = 0; diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 5b788bb947..8e4b6212d8 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -63,7 +63,8 @@ class Link : public PersistableConfig, public management::Manageable { uint16_t port; std::string transport; - bool durable; + bool durable; + std::string authMechanism; std::string username; std::string password; @@ -87,6 +88,7 @@ class Link : public PersistableConfig, public management::Manageable { boost::function<void(Link*)> listener; boost::intrusive_ptr<sys::TimerTask> timerTask; boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange + bool failover; // Do we subscribe to a failover exchange? uint failoverChannel; std::string failoverSession; @@ -132,7 +134,8 @@ class Link : public PersistableConfig, public management::Manageable { const std::string& username, const std::string& password, Broker* broker, - management::Manageable* parent = 0); + management::Manageable* parent = 0, + bool failover=true); virtual ~Link(); /** these return the *configured* transport/host/port, which does not change over the diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 3cad2c40c9..060eea412b 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -101,7 +101,8 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, bool durable, const string& authMechanism, const string& username, - const string& password) + const string& password, + bool failover) { Mutex::ScopedLock locker(lock); @@ -111,10 +112,11 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name, { Link::shared_ptr link; - link = Link::shared_ptr (new Link (name, this, host, port, transport, - boost::bind(&LinkRegistry::linkDestroyed, this, _1), - durable, authMechanism, username, password, broker, - parent)); + link = Link::shared_ptr ( + new Link (name, this, host, port, transport, + boost::bind(&LinkRegistry::linkDestroyed, this, _1), + durable, authMechanism, username, password, broker, + parent, failover)); if (durable && store) store->create(*link); links[name] = link; QPID_LOG(debug, "Creating new link; name=" << name ); diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index 5f79d9bb52..c81ee751c8 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -82,7 +82,9 @@ namespace broker { bool durable, const std::string& authMechanism, const std::string& username, - const std::string& password); + const std::string& password, + bool failover=true); + /** determine if Link exists */ QPID_BROKER_EXTERN boost::shared_ptr<Link> getLink(const std::string& name); diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 42cb2dbbce..5f053e0974 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -62,8 +62,9 @@ void Backup::initialize(const Url& url) { std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare( broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(), url[0].host, url[0].port, protocol, - false, // durable - settings.mechanism, settings.username, settings.password); + false, // durable + settings.mechanism, settings.username, settings.password, + false); // amq.failover link = result.first; link->setUrl(url); replicator.reset(new BrokerReplicator(haBroker, link)); |
