summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-04-16 17:58:58 +0000
committerGordon Sim <gsim@apache.org>2014-04-16 17:58:58 +0000
commit6c3683c899dae331ae6185f63aba5ed0e16a9d9b (patch)
tree29e169406d39c2accff2db8e16a152870d8a20aa /qpid/cpp/src
parent8f71eec10acc866b680b81a3ec24fd3b82469f1d (diff)
downloadqpid-python-6c3683c899dae331ae6185f63aba5ed0e16a9d9b.tar.gz
QPID-5706: add optional domain to incoming and outgoing link objects
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588000 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Domain.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp52
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp3
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Sasl.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/management-schema.xml2
12 files changed, 82 insertions, 18 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index c95a2d3537..7f40683aa1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -66,8 +66,8 @@ void Connection::trace(const char* message) const
QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message);
}
-Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse)
- : BrokerContext(b), ManagedConnection(getBroker(), i),
+Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse, bool brokerInitiated)
+ : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
connection(pn_connection()),
transport(pn_transport()),
out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false)
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index 961c21012e..90478ae181 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -46,7 +46,7 @@ class Session;
class Connection : public BrokerContext, public sys::ConnectionCodec, public ManagedConnection
{
public:
- Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse);
+ Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse, bool brokerInitiated);
virtual ~Connection();
size_t decode(const char* buffer, size_t size);
virtual size_t encode(char* buffer, size_t size);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
index dc6ad052b7..0ae13eb17b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp
@@ -31,6 +31,7 @@
#include "qpid/management/ManagementAgent.h"
#include <boost/shared_ptr.hpp>
#include <boost/lexical_cast.hpp>
+#include <sstream>
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -176,7 +177,7 @@ qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolV
qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t)
{
bool useSasl = domain->getMechanisms() != NONE;
- boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target));
+ boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target, domain->getName()));
if (!relay) getInterconnects().add(name, connection);
else connection->setRelay(relay);
@@ -199,7 +200,9 @@ bool InterconnectFactory::connect()
next++;
hostname = address.host;
QPID_LOG (info, "Inter-broker connection initiated (" << address << ")");
- getBroker().connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
+ std::stringstream identifier;
+ identifier << name << "@" << domain->getName();
+ getBroker().connect(identifier.str(), address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2));
return true;
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
index 649a21775a..41d22d39cc 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
@@ -28,6 +28,7 @@
#include "qpid/SaslFactory.h"
#include "qpid/sys/ConnectionCodec.h"
#include "qpid/sys/OutputControl.h"
+#include "qpid/sys/SystemInfo.h"
#include "qpid/log/Statement.h"
#include <boost/shared_ptr.hpp>
extern "C" {
@@ -40,9 +41,9 @@ namespace broker {
namespace amqp {
Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse,
- bool i, const std::string& n, const std::string& s, const std::string& t)
- : Connection(out, id, broker, true), incoming(i), name(n), source(s), target(t), headerDiscarded(saslInUse),
- closeRequested(false), isTransportDeleted(false)
+ bool i, const std::string& n, const std::string& s, const std::string& t, const std::string& d)
+ : Connection(out, id, broker, true, true), incoming(i), name(n), source(s), target(t), domain(d), headerDiscarded(saslInUse),
+ isOpened(false), closeRequested(false), isTransportDeleted(false)
{}
Interconnect::~Interconnect()
@@ -74,6 +75,32 @@ size_t Interconnect::encode(char* buffer, size_t size)
}
}
+namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+void setProperties(pn_connection_t* connection)
+{
+ pn_data_t* data = pn_connection_properties(connection);
+ pn_data_put_map(data);
+ pn_data_enter(data);
+
+ pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+ std::string processName = sys::SystemInfo::getProcessName();
+ pn_data_put_string(data, convert(processName));
+
+ pn_data_put_symbol(data, convert(CLIENT_PID));
+ pn_data_put_int(data, sys::SystemInfo::getProcessId());
+ pn_data_exit(data);
+}
+}
+
void Interconnect::process()
{
QPID_LOG(trace, id << " processing interconnect");
@@ -81,8 +108,23 @@ void Interconnect::process()
close();
} else {
if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
- QPID_LOG_CAT(debug, model, id << " interconnect opened");
- open();
+ QPID_LOG_CAT(debug, model, id << " interconnect open initiated");
+ pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
+ setProperties(connection);
+ pn_connection_open(connection);
+ out.connectionEstablished();
+ setInterconnectDomain(domain);
+ }
+ if (!isOpened && (pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ QPID_LOG_CAT(debug, model, id << " interconnect open completed, attaching link");
+ isOpened = true;
+ readPeerProperties();
+ const char* containerid(pn_connection_remote_container(connection));
+ if (containerid) {
+ setContainerId(std::string(containerid));
+ }
+ opened();
+ getBroker().getConnectionObservers().opened(*this);
pn_session_t* s = pn_session(connection);
pn_session_open(s);
boost::shared_ptr<Session> ssn(new Session(s, *this, out));
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
index cd35110873..d8eb457be2 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
@@ -38,7 +38,8 @@ class Interconnect : public Connection
{
public:
Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse,
- bool incoming, const std::string& name, const std::string& source, const std::string& target);
+ bool incoming, const std::string& name,
+ const std::string& source, const std::string& target, const std::string& domain);
void setRelay(boost::shared_ptr<Relay>);
~Interconnect();
size_t encode(char* buffer, size_t size);
@@ -50,8 +51,10 @@ class Interconnect : public Connection
std::string name;
std::string source;
std::string target;
+ std::string domain;
bool headerDiscarded;
boost::shared_ptr<Relay> relay;
+ bool isOpened;
bool closeRequested;
bool isTransportDeleted;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
index 6a2f8c5698..8f29833c8a 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -47,14 +47,14 @@ template <typename T> T getProperty(const std::string& key, const qpid::types::V
}
}
}
-ManagedConnection::ManagedConnection(Broker& broker, const std::string i) : id(i), agent(0)
+ManagedConnection::ManagedConnection(Broker& broker, const std::string i, bool brokerInitiated) : id(i), agent(0)
{
//management integration:
agent = broker.getManagementAgent();
if (agent != 0) {
qpid::management::Manageable* parent = broker.GetVhostObject();
// TODO set last bool true if system connection
- connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, id, true, false, "AMQP 1.0"));
+ connection = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, id, !brokerInitiated, brokerInitiated, "AMQP 1.0"));
connection->set_shadow(false);
agent->addObject(connection);
}
@@ -132,6 +132,15 @@ const std::string& ManagedConnection::getContainerId() const
return containerid;
}
+void ManagedConnection::setInterconnectDomain(const std::string& d)
+{
+ domain = d;
+}
+const std::string& ManagedConnection::getInterconnectDomain() const
+{
+ return domain;
+}
+
qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementObject() const
{
return connection;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
index dc97b55862..225aab9eed 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -39,7 +39,7 @@ namespace amqp {
class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection
{
public:
- ManagedConnection(Broker& broker, const std::string id);
+ ManagedConnection(Broker& broker, const std::string id, bool brokerInitiated);
virtual ~ManagedConnection();
virtual void setUserId(const std::string&);
std::string getId() const;
@@ -47,6 +47,8 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT
void setSaslSsf(int);
void setContainerId(const std::string&);
const std::string& getContainerId() const;
+ void setInterconnectDomain(const std::string&);
+ const std::string& getInterconnectDomain() const;
void setPeerProperties(std::map<std::string, types::Variant>&);
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const OwnershipToken* t) const;
@@ -70,6 +72,7 @@ class ManagedConnection : public qpid::management::Manageable, public OwnershipT
const std::string id;
std::string userid;
std::string containerid;
+ std::string domain;
qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
qpid::management::ManagementAgent* agent;
std::map<std::string, types::Variant> peerProperties;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
index d6ba18077c..a30349d50e 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedIncomingLink.cpp
@@ -36,7 +36,8 @@ ManagedIncomingLink::ManagedIncomingLink(Broker& broker, ManagedSession& p, cons
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target));
+ incoming = _qmf::Incoming::shared_ptr(new _qmf::Incoming(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target,
+ parent.getParent().getInterconnectDomain()));
agent->addObject(incoming);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
index d60fbf4b59..9c538f864c 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedOutgoingLink.cpp
@@ -36,7 +36,8 @@ ManagedOutgoingLink::ManagedOutgoingLink(Broker& broker, ManagedSession& p, cons
{
qpid::management::ManagementAgent* agent = broker.getManagementAgent();
if (agent) {
- outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target));
+ outgoing = _qmf::Outgoing::shared_ptr(new _qmf::Outgoing(agent, this, &parent, parent.getParent().getContainerId(), _name, source, target,
+ parent.getParent().getInterconnectDomain()));
agent->addObject(outgoing);
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
index cd31ef7788..32fbff1e96 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp
@@ -126,7 +126,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe
throw qpid::Exception("SASL layer required!");
} else {
QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)");
- return new qpid::broker::amqp::Connection(out, id, *this, false);
+ return new qpid::broker::amqp::Connection(out, id, *this, false, false);
}
}
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
index 44ac33c896..907e04f5ed 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
@@ -32,7 +32,7 @@ namespace broker {
namespace amqp {
Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> auth)
- : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true),
+ : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true, false),
authenticator(auth),
state(INCOMPLETE), writeHeader(true), haveOutput(true)
{
diff --git a/qpid/cpp/src/qpid/broker/management-schema.xml b/qpid/cpp/src/qpid/broker/management-schema.xml
index 2db8df4e85..7fbc762caa 100644
--- a/qpid/cpp/src/qpid/broker/management-schema.xml
+++ b/qpid/cpp/src/qpid/broker/management-schema.xml
@@ -399,6 +399,7 @@
<property name="name" type="sstr" access="RC" index="y"/>
<property name="source" type="sstr" access="RC"/>
<property name="target" type="sstr" access="RC"/>
+ <property name="domain" type="sstr" access="RC"/>
<statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/>
</class>
<!--
@@ -412,6 +413,7 @@
<property name="name" type="sstr" access="RC" index="y"/>
<property name="source" type="sstr" access="RC"/>
<property name="target" type="sstr" access="RC"/>
+ <property name="domain" type="sstr" access="RC"/>
<statistic name="transfers" type="count64" unit="message" desc="Messages transfered"/>
</class>
<!--