summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2012-05-09 15:38:32 +0000
committerAlan Conway <aconway@apache.org>2012-05-09 15:38:32 +0000
commit488c89479e258f9a551e0c357a4214f3e5b75892 (patch)
tree73a46f42cd710d107f536f92e7d88844eed1aacf /qpid/cpp/src
parenteac63c52d9b8cd6f722bade2ab92861bfdd5f30d (diff)
downloadqpid-python-488c89479e258f9a551e0c357a4214f3e5b75892.tar.gz
QPID-3603: Make failover subscrption optional for Links.
Used by ha::Backup because HA brokers use a different reconnect list from clients. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1336246 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp111
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h7
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h4
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp5
5 files changed, 77 insertions, 62 deletions
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index 467d422721..ab28c5b01f 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -136,7 +136,8 @@ Link::Link(const string& _name,
const string& _username,
const string& _password,
Broker* _broker,
- Manageable* parent)
+ Manageable* parent,
+ bool failover_)
: name(_name), links(_links),
configuredTransport(_transport), configuredHost(_host), configuredPort(_port),
host(_host), port(_port), transport(_transport),
@@ -152,6 +153,7 @@ Link::Link(const string& _name,
agent(0),
listener(l),
timerTask(new LinkTimerTask(*this, broker->getTimer())),
+ failover(failover_),
failoverChannel(0)
{
if (parent != 0 && broker != 0)
@@ -174,13 +176,15 @@ Link::Link(const string& _name,
}
broker->getTimer().add(timerTask);
- stringstream exchangeName;
- exchangeName << "qpid.link." << name;
- std::pair<Exchange::shared_ptr, bool> rc = broker->getExchanges().declare(exchangeName.str(),
- exchangeTypeName);
- failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
- assert(failoverExchange);
- failoverExchange->setLink(this);
+ if (failover) {
+ stringstream exchangeName;
+ exchangeName << "qpid.link." << name;
+ std::pair<Exchange::shared_ptr, bool> rc =
+ broker->getExchanges().declare(exchangeName.str(), exchangeTypeName);
+ failoverExchange = boost::static_pointer_cast<LinkExchange>(rc.first);
+ assert(failoverExchange);
+ failoverExchange->setLink(this);
+ }
}
Link::~Link ()
@@ -192,7 +196,8 @@ Link::~Link ()
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
- broker->getExchanges().destroy(failoverExchange->getName());
+ if (failover)
+ broker->getExchanges().destroy(failoverExchange->getName());
}
void Link::setStateLH (int newState)
@@ -273,9 +278,7 @@ class DetachedCallback : public SessionHandler::ErrorListener {
void connectionException(framing::connection::CloseCode, const std::string&) {}
void channelException(framing::session::DetachCode, const std::string&) {}
void executionException(framing::execution::ErrorCode, const std::string&) {}
- void detach() {
- QPID_LOG(notice, "detached from 'amq.failover' for link: " << name);
- }
+ void detach() {}
private:
const std::string name;
};
@@ -300,42 +303,44 @@ void Link::opened() {
QPID_LOG(debug, "Known hosts for peer of inter-broker link: " << url);
}
- //
- // attempt to subscribe to failover exchange for updates from remote
- //
-
- const std::string queueName = "qpid.link." + framing::Uuid(true).str();
- failoverChannel = nextChannel();
-
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- sessionHandler.setErrorListener(
- boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
- failoverSession = queueName;
- sessionHandler.attachAs(failoverSession);
-
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
-
- remoteBroker.getQueue().declare(queueName,
- "", // alt-exchange
- false, // passive
- false, // durable
- true, // exclusive
- true, // auto-delete
- FieldTable());
- remoteBroker.getExchange().bind(queueName,
- FAILOVER_EXCHANGE,
- "", // no key
- FieldTable());
- remoteBroker.getMessage().subscribe(queueName,
- failoverExchange->getName(),
- 1, // implied-accept mode
- 0, // pre-acquire mode
- false, // exclusive
- "", // resume-id
- 0, // resume-ttl
+ if (failover) {
+ //
+ // attempt to subscribe to failover exchange for updates from remote
+ //
+
+ const std::string queueName = "qpid.link." + framing::Uuid(true).str();
+ failoverChannel = nextChannel();
+
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ sessionHandler.setErrorListener(
+ boost::shared_ptr<SessionHandler::ErrorListener>(new DetachedCallback(*this)));
+ failoverSession = queueName;
+ sessionHandler.attachAs(failoverSession);
+
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+
+ remoteBroker.getQueue().declare(queueName,
+ "", // alt-exchange
+ false, // passive
+ false, // durable
+ true, // exclusive
+ true, // auto-delete
FieldTable());
- remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
- remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ remoteBroker.getExchange().bind(queueName,
+ FAILOVER_EXCHANGE,
+ "", // no key
+ FieldTable());
+ remoteBroker.getMessage().subscribe(queueName,
+ failoverExchange->getName(),
+ 1, // implied-accept mode
+ 0, // pre-acquire mode
+ false, // exclusive
+ "", // resume-id
+ 0, // resume-ttl
+ FieldTable());
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 0, 0xFFFFFFFF);
+ remoteBroker.getMessage().flow(failoverExchange->getName(), 1, 0xFFFFFFFF);
+ }
}
void Link::closed(int, std::string text)
@@ -711,11 +716,13 @@ void Link::closeConnection( const std::string& reason)
{
if (connection != 0) {
// cancel our subscription to the failover exchange
- SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
- if (sessionHandler.getSession()) {
- framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
- remoteBroker.getMessage().cancel(failoverExchange->getName());
- remoteBroker.getSession().detach(failoverSession);
+ if (failover) {
+ SessionHandler& sessionHandler = connection->getChannel(failoverChannel);
+ if (sessionHandler.getSession()) {
+ framing::AMQP_ServerProxy remoteBroker(sessionHandler.out);
+ remoteBroker.getMessage().cancel(failoverExchange->getName());
+ remoteBroker.getSession().detach(failoverSession);
+ }
}
connection->close(CLOSE_CODE_CONNECTION_FORCED, reason);
connection = 0;
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 5b788bb947..8e4b6212d8 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -63,7 +63,8 @@ class Link : public PersistableConfig, public management::Manageable {
uint16_t port;
std::string transport;
- bool durable;
+ bool durable;
+
std::string authMechanism;
std::string username;
std::string password;
@@ -87,6 +88,7 @@ class Link : public PersistableConfig, public management::Manageable {
boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
boost::shared_ptr<broker::LinkExchange> failoverExchange; // subscribed to remote's amq.failover exchange
+ bool failover; // Do we subscribe to a failover exchange?
uint failoverChannel;
std::string failoverSession;
@@ -132,7 +134,8 @@ class Link : public PersistableConfig, public management::Manageable {
const std::string& username,
const std::string& password,
Broker* broker,
- management::Manageable* parent = 0);
+ management::Manageable* parent = 0,
+ bool failover=true);
virtual ~Link();
/** these return the *configured* transport/host/port, which does not change over the
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 3cad2c40c9..060eea412b 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -101,7 +101,8 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
bool durable,
const string& authMechanism,
const string& username,
- const string& password)
+ const string& password,
+ bool failover)
{
Mutex::ScopedLock locker(lock);
@@ -111,10 +112,11 @@ pair<Link::shared_ptr, bool> LinkRegistry::declare(const string& name,
{
Link::shared_ptr link;
- link = Link::shared_ptr (new Link (name, this, host, port, transport,
- boost::bind(&LinkRegistry::linkDestroyed, this, _1),
- durable, authMechanism, username, password, broker,
- parent));
+ link = Link::shared_ptr (
+ new Link (name, this, host, port, transport,
+ boost::bind(&LinkRegistry::linkDestroyed, this, _1),
+ durable, authMechanism, username, password, broker,
+ parent, failover));
if (durable && store) store->create(*link);
links[name] = link;
QPID_LOG(debug, "Creating new link; name=" << name );
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index 5f79d9bb52..c81ee751c8 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -82,7 +82,9 @@ namespace broker {
bool durable,
const std::string& authMechanism,
const std::string& username,
- const std::string& password);
+ const std::string& password,
+ bool failover=true);
+
/** determine if Link exists */
QPID_BROKER_EXTERN boost::shared_ptr<Link>
getLink(const std::string& name);
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 42cb2dbbce..5f053e0974 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -62,8 +62,9 @@ void Backup::initialize(const Url& url) {
std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
broker::QPID_NAME_PREFIX + string("ha.link.") + uuid.str(),
url[0].host, url[0].port, protocol,
- false, // durable
- settings.mechanism, settings.username, settings.password);
+ false, // durable
+ settings.mechanism, settings.username, settings.password,
+ false); // amq.failover
link = result.first;
link->setUrl(url);
replicator.reset(new BrokerReplicator(haBroker, link));