summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2011-11-25 21:51:40 +0000
committerAlan Conway <aconway@apache.org>2011-11-25 21:51:40 +0000
commita3101943e1facd947c4a9e9a3ac3c07d9e78606e (patch)
tree01a60bfb265a72a3f7e96aa46fecca3cdbe0e847
parent4884b20b88b740bcc66dfaec246dabcdd3dfa21d (diff)
downloadqpid-python-a3101943e1facd947c4a9e9a3ac3c07d9e78606e.tar.gz
QPID-3603: Move wiring-replicator creation out of SemanticState::route.
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3603@1206349 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/Makefile.am2
-rw-r--r--qpid/cpp/src/ha.mk10
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp1
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.cpp110
-rw-r--r--qpid/cpp/src/qpid/ha/Backup.h3
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.cpp178
-rw-r--r--qpid/cpp/src/qpid/ha/WiringReplicator.h21
7 files changed, 161 insertions, 164 deletions
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index 758130e2a0..58be6dfe70 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -595,8 +595,6 @@ libqpidbroker_la_SOURCES = \
qpid/broker/PriorityQueue.cpp \
qpid/broker/NameGenerator.cpp \
qpid/broker/NameGenerator.h \
- qpid/ha/WiringReplicator.h \
- qpid/ha/WiringReplicator.cpp \
qpid/broker/NullMessageStore.cpp \
qpid/broker/NullMessageStore.h \
qpid/broker/OwnershipToken.h \
diff --git a/qpid/cpp/src/ha.mk b/qpid/cpp/src/ha.mk
index 66dbc950ea..ca6415d8dd 100644
--- a/qpid/cpp/src/ha.mk
+++ b/qpid/cpp/src/ha.mk
@@ -23,12 +23,14 @@
dmoduleexec_LTLIBRARIES += ha.la
ha_la_SOURCES = \
- qpid/ha/HaPlugin.cpp \
- qpid/ha/HaBroker.cpp \
- qpid/ha/HaBroker.h \
qpid/ha/Backup.cpp \
qpid/ha/Backup.h \
- qpid/ha/Settings.h
+ qpid/ha/HaBroker.cpp \
+ qpid/ha/HaBroker.h \
+ qpid/ha/HaPlugin.cpp \
+ qpid/ha/Settings.h \
+ qpid/ha/WiringReplicator.cpp \
+ qpid/ha/WiringReplicator.h
ha_la_LIBADD = libqpidbroker.la
ha_la_LDFLAGS = $(PLUGINLDFLAGS)
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index aac7e34cbc..adfe645f41 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -499,7 +499,6 @@ void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName || cacheExchange->isDestroyed()) {
cacheExchange = QueueReplicator::create(exchangeName, getSession().getBroker().getQueues());
- if (!cacheExchange) cacheExchange = ha::WiringReplicator::create(exchangeName, getSession().getBroker());
if (!cacheExchange) cacheExchange = session.getBroker().getExchanges().get(exchangeName);
}
cacheExchange->setProperties(msg);
diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp
index 43c348b3ce..ceea6ccb68 100644
--- a/qpid/cpp/src/qpid/ha/Backup.cpp
+++ b/qpid/cpp/src/qpid/ha/Backup.cpp
@@ -20,11 +20,13 @@
*/
#include "Backup.h"
#include "Settings.h"
+#include "WiringReplicator.h"
#include "qpid/Url.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/broker/Bridge.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/Link.h"
#include "qpid/framing/AMQP_ServerProxy.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/FieldTable.h"
@@ -39,114 +41,24 @@ using namespace broker;
using types::Variant;
using std::string;
-namespace {
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
-const string _WHAT("_what");
-const string _CLASS_NAME("_class_name");
-const string _PACKAGE_NAME("_package_name");
-const string _SCHEMA_ID("_schema_id");
-const string OBJECT("OBJECT");
-const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
-const string QMF_DEFAULT_DIRECT("qmf.default.direct");
-const string QMF2("qmf2");
-const string QMF_OPCODE("qmf.opcode");
-const string _QUERY_REQUEST("_query_request");
-const string BROKER("broker");
-}
-
-void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
- framing::AMQP_ServerProxy peer(sessionHandler.out);
- Variant::Map request;
- request[_WHAT] = OBJECT;
- Variant::Map schema;
- schema[_CLASS_NAME] = className;
- schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
- request[_SCHEMA_ID] = schema;
-
- AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
- method.setBof(true);
- method.setEof(false);
- method.setBos(true);
- method.setEos(true);
- AMQHeaderBody headerBody;
- MessageProperties* props = headerBody.get<MessageProperties>(true);
- props->setReplyTo(qpid::framing::ReplyTo("", queueName));
- props->setAppId(QMF2);
- props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
- headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
- AMQFrame header(headerBody);
- header.setBof(false);
- header.setEof(false);
- header.setBos(true);
- header.setEos(true);
- AMQContentBody data;
- qpid::amqp_0_10::MapCodec::encode(request, data.getData());
- AMQFrame content(data);
- content.setBof(false);
- content.setEof(true);
- content.setBos(true);
- content.setEos(true);
- sessionHandler.out->handle(method);
- sessionHandler.out->handle(header);
- sessionHandler.out->handle(content);
-}
-
-namespace {
-const string QMF_DEFAULT_TOPIC("qmf.default.topic");
-const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
-const string QUEUE("queue");
-const string EXCHANGE("exchange");
-const string BINDING("binding");
-}
-
-// Initialize a bridge as a wiring replicator.
-void bridgeInitWiringReplicator(Bridge& bridge, SessionHandler& sessionHandler) {
- framing::AMQP_ServerProxy peer(sessionHandler.out);
- string queueName = bridge.getQueueName();
- const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
-
- //declare and bind an event queue
- peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
- peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
- //subscribe to the queue
- peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
- peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
- peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
-
- //issue a query request for queues and another for exchanges using event queue as the reply-to address
- sendQuery(QUEUE, queueName, sessionHandler);
- sendQuery(EXCHANGE, queueName, sessionHandler);
- sendQuery(BINDING, queueName, sessionHandler);
-}
-
Backup::Backup(broker::Broker& b, const Settings& s) : broker(b), settings(s) {
- // Create a link to replicate wiring
if (s.brokerUrl != "dummy") { // FIXME aconway 2011-11-22: temporary hack to identify primary.
Url url(s.brokerUrl);
QPID_LOG(info, "HA backup broker connecting to: " << url);
-
string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
- broker.getLinks().declare( // Declare the link
+
+ // FIXME aconway 2011-11-17: TBD: link management, discovery, fail-over.
+ // Declare the link
+ std::pair<Link::shared_ptr, bool> result = broker.getLinks().declare(
url[0].host, url[0].port, protocol,
false, // durable
s.mechanism, s.username, s.password);
-
- broker.getLinks().declare( // Declare the bridge
- url[0].host, url[0].port,
- false, // durable
- QPID_WIRING_REPLICATOR, // src
- QPID_WIRING_REPLICATOR, // dest
- "", // key
- false, // isQueue
- false, // isLocal
- "", // id/tag
- "", // excludes
- false, // dynamic
- 0, // sync?
- bridgeInitWiringReplicator
- );
+ assert(result.second); // FIXME aconway 2011-11-23: error handling
+ link = result.first;
+ boost::shared_ptr<WiringReplicator> wr(new WiringReplicator(link));
+ broker.getExchanges().registerExchange(wr);
+ wr->initialize(); // Must be called after registering exchange.
}
- // FIXME aconway 2011-11-17: handle discovery of the primary broker and fail-over correctly.
}
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/Backup.h b/qpid/cpp/src/qpid/ha/Backup.h
index 49997f9f5e..cee626379b 100644
--- a/qpid/cpp/src/qpid/ha/Backup.h
+++ b/qpid/cpp/src/qpid/ha/Backup.h
@@ -24,10 +24,12 @@
#include "Settings.h"
#include "qpid/Url.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
class Broker;
+class Link;
}
namespace ha {
@@ -44,6 +46,7 @@ class Backup
private:
broker::Broker& broker;
Settings settings;
+ boost::shared_ptr<broker::Link> link;
};
}} // namespace qpid::ha
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
index 5a1baaae55..2c37fa37d3 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.cpp
@@ -21,9 +21,13 @@
#include "WiringReplicator.h"
#include "qpid/broker/Broker.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/Link.h"
+#include "qpid/framing/FieldTable.h"
#include "qpid/log/Statement.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/broker/SessionHandler.h"
#include "qpid/framing/reply_exceptions.h"
+#include "qpid/framing/MessageTransferBody.h"
#include "qmf/org/apache/qpid/broker/EventBind.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h"
#include "qmf/org/apache/qpid/broker/EventExchangeDelete.h"
@@ -40,62 +44,69 @@ using qmf::org::apache::qpid::broker::EventExchangeDelete;
using qmf::org::apache::qpid::broker::EventQueueDeclare;
using qmf::org::apache::qpid::broker::EventQueueDelete;
using qmf::org::apache::qpid::broker::EventSubscribe;
-
+using namespace framing;
using std::string;
using types::Variant;
using namespace broker;
-namespace{
+namespace {
+const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
const string QPID_REPLICATE("qpid.replicate");
-const string ALL("all");
-const string WIRING("wiring");
const string CLASS_NAME("_class_name");
+const string EVENT("_event");
const string OBJECT_NAME("_object_name");
const string PACKAGE_NAME("_package_name");
-const string VALUES("_values");
-const string EVENT("_event");
-const string SCHEMA_ID("_schema_id");
const string QUERY_RESPONSE("_query_response");
+const string SCHEMA_ID("_schema_id");
+const string VALUES("_values");
-const string ARGUMENTS("arguments");
+const string ALL("all");
+const string ALTEX("altEx");
const string ARGS("args");
-const string QUEUE("queue");
-const string EXCHANGE("exchange");
+const string ARGUMENTS("arguments");
+const string AUTODEL("autoDel");
+const string AUTODELETE("autoDelete");
const string BIND("bind");
const string BINDING("binding");
+const string CREATED("created");
+const string DISP("disp");
const string DURABLE("durable");
-const string QNAME("qName");
-const string AUTODEL("autoDel");
-const string ALTEX("altEx");
-const string USER("user");
-const string RHOST("rhost");
-const string EXTYPE("exType");
+const string EXCHANGE("exchange");
const string EXNAME("exName");
-const string AUTODELETE("autoDelete");
+const string EXTYPE("exType");
+const string KEY("key");
const string NAME("name");
+const string QNAME("qName");
+const string QUEUE("queue");
+const string RHOST("rhost");
const string TYPE("type");
-const string DISP("disp");
-const string CREATED("created");
-const string KEY("key");
-
+const string USER("user");
+const string WIRING("wiring");
-const string QMF_OPCODE("qmf.opcode");
-const string QMF_CONTENT("qmf.content");
+const string AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER("agent.ind.event.org_apache_qpid_broker.#");
const string QMF2("qmf2");
+const string QMF_CONTENT("qmf.content");
+const string QMF_DEFAULT_TOPIC("qmf.default.topic");
+const string QMF_OPCODE("qmf.opcode");
-const string QPID_WIRING_REPLICATOR("qpid.wiring-replicator");
-
-
-bool isQMFv2(const Message& message)
-{
+const string _WHAT("_what");
+const string _CLASS_NAME("_class_name");
+const string _PACKAGE_NAME("_package_name");
+const string _SCHEMA_ID("_schema_id");
+const string OBJECT("OBJECT");
+const string ORG_APACHE_QPID_BROKER("org.apache.qpid.broker");
+const string QMF_DEFAULT_DIRECT("qmf.default.direct");
+const string _QUERY_REQUEST("_query_request");
+const string BROKER("broker");
+
+bool isQMFv2(const Message& message) {
const framing::MessageProperties* props = message.getProperties<framing::MessageProperties>();
return props && props->getAppId() == QMF2;
}
-template <class T> bool match(Variant::Map& schema)
-{
+template <class T> bool match(Variant::Map& schema) {
return T::match(schema[CLASS_NAME], schema[PACKAGE_NAME]);
}
@@ -110,12 +121,89 @@ bool isReplicated(const Variant::Map& m) {
return i != m.end() && isReplicated(i->second.asString());
}
+void sendQuery(const string className, const string& queueName, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ Variant::Map request;
+ request[_WHAT] = OBJECT;
+ Variant::Map schema;
+ schema[_CLASS_NAME] = className;
+ schema[_PACKAGE_NAME] = ORG_APACHE_QPID_BROKER;
+ request[_SCHEMA_ID] = schema;
+
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), QMF_DEFAULT_DIRECT, 0, 0)));
+ method.setBof(true);
+ method.setEof(false);
+ method.setBos(true);
+ method.setEos(true);
+ AMQHeaderBody headerBody;
+ MessageProperties* props = headerBody.get<MessageProperties>(true);
+ props->setReplyTo(qpid::framing::ReplyTo("", queueName));
+ props->setAppId(QMF2);
+ props->getApplicationHeaders().setString(QMF_OPCODE, _QUERY_REQUEST);
+ headerBody.get<qpid::framing::DeliveryProperties>(true)->setRoutingKey(BROKER);
+ AMQFrame header(headerBody);
+ header.setBof(false);
+ header.setEof(false);
+ header.setBos(true);
+ header.setEos(true);
+ AMQContentBody data;
+ qpid::amqp_0_10::MapCodec::encode(request, data.getData());
+ AMQFrame content(data);
+ content.setBof(false);
+ content.setEof(true);
+ content.setBos(true);
+ content.setEos(true);
+ sessionHandler.out->handle(method);
+ sessionHandler.out->handle(header);
+ sessionHandler.out->handle(content);
+}
} // namespace
+WiringReplicator::~WiringReplicator() {}
-WiringReplicator::WiringReplicator(const string& name, Broker& b) : Exchange(name), broker(b) {}
+WiringReplicator::WiringReplicator(const boost::shared_ptr<Link>& l)
+ : Exchange(QPID_WIRING_REPLICATOR), broker(*l->getBroker()), link(l)
+{}
+
+// We need to split out the initialization so that the WiringReplicator
+// can be registered as an exchange before starting the bridge.
+void WiringReplicator::initialize() {
+ assert(link->getBroker());
+ broker.getLinks().declare(
+ link->getHost(), link->getPort(),
+ false, // durable
+ QPID_WIRING_REPLICATOR, // src
+ QPID_WIRING_REPLICATOR, // dest
+ "", // key
+ false, // isQueue
+ false, // isLocal
+ "", // id/tag
+ "", // excludes
+ false, // dynamic
+ 0, // sync?
+ boost::bind(&WiringReplicator::initializeBridge, this, _1, _2)
+ );
+}
-WiringReplicator::~WiringReplicator() {}
+// This is called in the connection IO thread when the bridge is started.
+void WiringReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
+ framing::AMQP_ServerProxy peer(sessionHandler.out);
+ string queueName = bridge.getQueueName();
+ const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
+
+ //declare and bind an event queue
+ peer.getQueue().declare(queueName, "", false, false, true, true, FieldTable());
+ peer.getExchange().bind(queueName, QMF_DEFAULT_TOPIC, AGENT_IND_EVENT_ORG_APACHE_QPID_BROKER, FieldTable());
+ //subscribe to the queue
+ peer.getMessage().subscribe(queueName, args.i_dest, 1, 0, false, "", 0, FieldTable());
+ peer.getMessage().flow(args.i_dest, 0, 0xFFFFFFFF);
+ peer.getMessage().flow(args.i_dest, 1, 0xFFFFFFFF);
+
+ //issue a query request for queues and another for exchanges using event queue as the reply-to address
+ sendQuery(QUEUE, queueName, sessionHandler);
+ sendQuery(EXCHANGE, queueName, sessionHandler);
+ sendQuery(BINDING, queueName, sessionHandler);
+}
void WiringReplicator::route(Deliverable& msg, const string& /*key*/, const framing::FieldTable* headers) {
Variant::List list;
@@ -176,7 +264,10 @@ void WiringReplicator::doEventQueueDeclare(Variant::Map& values) {
args,
values[USER].asString(),
values[RHOST].asString()).second) {
- // FIXME aconway 2011-11-22: should delete old queue and re-create from exchanges.
+ // FIXME aconway 2011-11-22: should delete old queue and
+ // re-create from event.
+ // Events are always up to date, whereas responses may be
+ // out of date.
QPID_LOG(warning, "Replicated queue " << name << " already exists");
}
}
@@ -209,7 +300,7 @@ void WiringReplicator::doEventExchangeDeclare(Variant::Map& values) {
values[USER].asString(),
values[RHOST].asString()).second) {
// FIXME aconway 2011-11-22: should delete pre-exisitng exchange
- // and re-create from event. Likewise for queues.
+ // and re-create from event. See comment in doEventQueueDeclare.
QPID_LOG(warning, "Replicated exchange " << name << " already exists");
}
}
@@ -286,8 +377,6 @@ void WiringReplicator::doResponseExchange(Variant::Map& values) {
}
}
-// FIXME aconway 2011-11-21: refactor to remove redundancy between do* functions.
-
namespace {
const std::string QUEUE_REF_PREFIX("org.apache.qpid.broker:queue:");
const std::string EXCHANGE_REF_PREFIX("org.apache.qpid.broker:exchange:");
@@ -299,7 +388,7 @@ std::string getRefName(const std::string& prefix, const Variant& ref) {
throw Exception(QPID_MSG("Replicator: invalid object reference: " << ref));
const std::string name = i->second.asString();
if (name.compare(0, prefix.size(), prefix) != 0)
- throw Exception(QPID_MSG("Replicator unexpected reference prefix: " << name));
+ throw Exception(QPID_MSG("Replicator: unexpected reference prefix: " << name));
std::string ret = name.substr(prefix.size());
return ret;
}
@@ -336,21 +425,6 @@ void WiringReplicator::doResponseBind(Variant::Map& values) {
} catch (const framing::NotFoundException& e) {} // Ignore unreplicated queue or exchange.
}
-boost::shared_ptr<Exchange> WiringReplicator::create(const string& target, Broker& broker)
-{
- boost::shared_ptr<Exchange> exchange;
- if (isWiringReplicatorDestination(target)) {
- //TODO: need to cache the exchange
- exchange.reset(new WiringReplicator(target, broker));
- }
- return exchange;
-}
-
-bool WiringReplicator::isWiringReplicatorDestination(const string& target)
-{
- return target == QPID_WIRING_REPLICATOR;
-}
-
bool WiringReplicator::bind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool WiringReplicator::unbind(boost::shared_ptr<Queue>, const string&, const framing::FieldTable*) { return false; }
bool WiringReplicator::isBound(boost::shared_ptr<Queue>, const string* const, const framing::FieldTable* const) { return false; }
diff --git a/qpid/cpp/src/qpid/ha/WiringReplicator.h b/qpid/cpp/src/qpid/ha/WiringReplicator.h
index 66e5454ea7..fa4cf33b6e 100644
--- a/qpid/cpp/src/qpid/ha/WiringReplicator.h
+++ b/qpid/cpp/src/qpid/ha/WiringReplicator.h
@@ -24,13 +24,15 @@
#include "qpid/broker/Exchange.h"
#include "qpid/types/Variant.h"
-
-// FIXME aconway 2011-11-17: relocate to ../ha
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
class Broker;
+class Link;
+class Bridge;
+class SessionHandler;
}
namespace ha {
@@ -42,19 +44,23 @@ namespace ha {
class WiringReplicator : public broker::Exchange
{
public:
- WiringReplicator(const std::string&, broker::Broker&);
+ WiringReplicator(const boost::shared_ptr<broker::Link>&);
~WiringReplicator();
std::string getType() const;
+
+ // Call this after the WiringReplicator has been registered as an exchange.
+ void initialize();
+
+ // Exchange methods
bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
void route(broker::Deliverable&, const std::string&, const framing::FieldTable*);
bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const);
- static bool isWiringReplicatorDestination(const std::string&);
- static boost::shared_ptr<broker::Exchange> create(const std::string&, broker::Broker&);
static const std::string typeName;
- private:
+ private:
+ void initializeBridge(broker::Bridge&, broker::SessionHandler&);
void doEventQueueDeclare(types::Variant::Map& values);
void doEventQueueDelete(types::Variant::Map& values);
void doEventExchangeDeclare(types::Variant::Map& values);
@@ -65,7 +71,10 @@ class WiringReplicator : public broker::Exchange
void doResponseBind(types::Variant::Map& values);
private:
+ void startQueueReplicator(const std::string& name);
+
broker::Broker& broker;
+ boost::shared_ptr<broker::Link> link;
};
}} // namespace qpid::broker