diff options
| author | Gordon Sim <gsim@apache.org> | 2014-04-16 17:58:58 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-04-16 17:58:58 +0000 |
| commit | 6c3683c899dae331ae6185f63aba5ed0e16a9d9b (patch) | |
| tree | 29e169406d39c2accff2db8e16a152870d8a20aa /qpid/cpp/src | |
| parent | 8f71eec10acc866b680b81a3ec24fd3b82469f1d (diff) | |
| download | qpid-python-6c3683c899dae331ae6185f63aba5ed0e16a9d9b.tar.gz | |
QPID-5706: add optional domain to incoming and outgoing link objects
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Domain.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp | 52 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Interconnect.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Sasl.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/management-schema.xml | 2 |
12 files changed, 82 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index c95a2d3537..7f40683aa1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -66,8 +66,8 @@ void Connection::trace(const char* message) const QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message); } -Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse) - : BrokerContext(b), ManagedConnection(getBroker(), i), +Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse, bool brokerInitiated) + : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated), connection(pn_connection()), transport(pn_transport()), out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 961c21012e..90478ae181 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -46,7 +46,7 @@ class Session; class Connection : public BrokerContext, public sys::ConnectionCodec, public ManagedConnection { public: - Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse); + Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse, bool brokerInitiated); virtual ~Connection(); size_t decode(const char* buffer, size_t size); virtual size_t encode(char* buffer, size_t size); diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp index dc6ad052b7..0ae13eb17b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -31,6 +31,7 @@ #include "qpid/management/ManagementAgent.h" #include <boost/shared_ptr.hpp> #include <boost/lexical_cast.hpp> +#include <sstream> namespace _qmf = qmf::org::apache::qpid::broker; @@ -176,7 +177,7 @@ qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolV qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t) { bool useSasl = domain->getMechanisms() != NONE; - boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target)); + boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target, domain->getName())); if (!relay) getInterconnects().add(name, connection); else connection->setRelay(relay); @@ -199,7 +200,9 @@ bool InterconnectFactory::connect() next++; hostname = address.host; QPID_LOG (info, "Inter-broker connection initiated (" << address << ")"); - getBroker().connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2)); + std::stringstream identifier; + identifier << name << "@" << domain->getName(); + getBroker().connect(identifier.str(), address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2)); return true; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp index 649a21775a..41d22d39cc 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp @@ -28,6 +28,7 @@ #include "qpid/SaslFactory.h" #include "qpid/sys/ConnectionCodec.h" #include "qpid/sys/OutputControl.h" +#include "qpid/sys/SystemInfo.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> extern "C" { @@ -40,9 +41,9 @@ namespace broker { namespace amqp { Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse, - bool i, const std::string& n, const std::string& s, const std::string& t) - : Connection(out, id, broker, true), incoming(i), name(n), source(s), target(t), headerDiscarded(saslInUse), - closeRequested(false), isTransportDeleted(false) + bool i, const std::string& n, const std::string& s, const std::string& t, const std::string& d) + : Connection(out, id, broker, true, true), incoming(i), name(n), source(s), target(t), domain(d), headerDiscarded(saslInUse), + isOpened(false), closeRequested(false), isTransportDeleted(false) {} Interconnect::~Interconnect() @@ -74,6 +75,32 @@ size_t Interconnect::encode(char* buffer, size_t size) } } +namespace { +const std::string CLIENT_PROCESS_NAME("qpid.client_process"); +const std::string CLIENT_PID("qpid.client_pid"); +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} +void setProperties(pn_connection_t* connection) +{ + pn_data_t* data = pn_connection_properties(connection); + pn_data_put_map(data); + pn_data_enter(data); + + pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME)); + std::string processName = sys::SystemInfo::getProcessName(); + pn_data_put_string(data, convert(processName)); + + pn_data_put_symbol(data, convert(CLIENT_PID)); + pn_data_put_int(data, sys::SystemInfo::getProcessId()); + pn_data_exit(data); +} +} + void Interconnect::process() { QPID_LOG(trace, id << " processing interconnect"); @@ -81,8 +108,23 @@ void Interconnect::process() close(); } else { if ((pn_connection_state(connection) & UNINIT) == UNINIT) { - QPID_LOG_CAT(debug, model, id << " interconnect opened"); - open(); + QPID_LOG_CAT(debug, model, id << " interconnect open initiated"); + pn_connection_set_container(connection, getBroker().getFederationTag().c_str()); + setProperties(connection); + pn_connection_open(connection); + out.connectionEstablished(); + setInterconnectDomain(domain); + } + if (!isOpened && (pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + QPID_LOG_CAT(debug, model, id << " interconnect open completed, attaching link"); + isOpened = true; + readPeerProperties(); + const char* containerid(pn_connection_remote_container(connection)); + if (containerid) { + setContainerId(std::string(containerid)); + } + opened(); + getBroker().getConnectionObservers().opened(*this); pn_session_t* s = pn_session(connection); pn_session_open(s); boost::shared_ptr<Session> ssn(new Session(s, *this, out)); diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h index cd35110873..d8eb457be2 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h @@ -38,7 +38,8 @@ class Interconnect : public Connection { public: Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse, - bool incoming, const std::string& name, const std::string& source, const std::string& target); + bool incoming, const std::string& name, + const std::string& source, const std::string& target, const std::string& domain); void setRelay(boost::shared_ptr<Relay>); ~Interconnect(); size_t encode(char* buffer, size_t size); @@ -50,8 +51,10 @@ class Interconnect : public Connection std::string name; std::string source; std::string target; + std::string domain; bool headerDiscarded; boost::shared_ptr<Relay> relay; + bool isOpened; bool closeRequested; bool isTransportDeleted; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index 6a2f8c5698..8f29833c8a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -47,14 +47,14 @@ template <typename T> T getProperty(const std::string& key, const qpid::types::V } } } -ManagedConnection::ManagedConnection(Broker& broker, const std::string i) : id(i), agent(0) +ManagedConnection::ManagedConnection(Broker& broker, const std::string i, bool brokerInitiated) : id(i), agent(0) { //management integration: agent = broker.getManagementAgent(); if (agent != 0) { qpid::management::Manageable* parent = broker.GetVhostObject(); // TODO set last bool true if system connection - connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, id, true, false, "AMQP 1.0")); + connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, id, !brokerInitiated, brokerInitiated, "AMQP 1.0")); connection->set_shadow(false); agent->addObject(connection); } @@ -132,6 +132,15 @@ const std::string& ManagedConnection::getContainerId() const return containerid; } +void ManagedConnection::setInterconnectDomain(const std::string& d) +{ + domain = d; +} +const std::string& ManagedConnection::getInterconnectDomain() const +{ + return domain; +} + qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const { return connection; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index dc97b55862..225aab9eed 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -39,7 +39,7 @@ namespace amqp { class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection { public: - ManagedConnection(Broker& broker, const std::string id); + ManagedConnection(Broker& broker, const std::string id, bool brokerInitiated); virtual ~ManagedConnection(); virtual void setUserId(const std::string&); std::string getId() const; @@ -47,6 +47,8 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT void setSaslSsf(int); void setContainerId(const std::string&); const std::string& getContainerId() const; + void setInterconnectDomain(const std::string&); + const std::string& getInterconnectDomain() const; void setPeerProperties(std::map<std::string, types::Variant>&); qpid::management::ManagementObject::shared_ptr GetManagementObject() const; bool isLocal(const OwnershipToken* t) const; @@ -70,6 +72,7 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT const std::string id; std::string userid; std::string containerid; + std::string domain; qmf::org::apache::qpid::broker::Connection::shared_ptr connection; qpid::management::ManagementAgent* agent; std::map<std::string, types::Variant> peerProperties; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp index d6ba18077c..a30349d50e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp @@ -36,7 +36,8 @@ ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, cons { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target)); + incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target, + parent.getParent().getInterconnectDomain())); agent->addObject(incoming); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp index d60fbf4b59..9c538f864c 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp @@ -36,7 +36,8 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, cons { qpid::management::ManagementAgent* agent = broker.getManagementAgent(); if (agent) { - outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target)); + outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target, + parent.getParent().getInterconnectDomain())); agent->addObject(outgoing); } } diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index cd31ef7788..32fbff1e96 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -126,7 +126,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe throw qpid::Exception("SASL layer required!"); } else { QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)"); - return new qpid::broker::amqp::Connection(out, id, *this, false); + return new qpid::broker::amqp::Connection(out, id, *this, false, false); } } } diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp index 44ac33c896..907e04f5ed 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp @@ -32,7 +32,7 @@ namespace broker { namespace amqp { Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> auth) - : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true), + : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true, false), authenticator(auth), state(INCOMPLETE), writeHeader(true), haveOutput(true) { diff --git a/qpid/cpp/src/qpid/broker/management-schema.xml b/qpid/cpp/src/qpid/broker/management-schema.xml index 2db8df4e85..7fbc762caa 100644 --- a/qpid/cpp/src/qpid/broker/management-schema.xml +++ b/qpid/cpp/src/qpid/broker/management-schema.xml @@ -399,6 +399,7 @@ <property name="name" type="sstr" access="RC" index="y"/> <property name="source" type="sstr" access="RC"/> <property name="target" type="sstr" access="RC"/> + <property name="domain" type="sstr" access="RC"/> <statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/> </class> <!-- @@ -412,6 +413,7 @@ <property name="name" type="sstr" access="RC" index="y"/> <property name="source" type="sstr" access="RC"/> <property name="target" type="sstr" access="RC"/> + <property name="domain" type="sstr" access="RC"/> <statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/> </class> <!-- |
