summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-06-25 13:28:15 +0000
committerGordon Sim <gsim@apache.org>2013-06-25 13:28:15 +0000
commit3615070a058ee43b3305d6b4464ee3a6e39e7b99 (patch)
tree6bcdc2593132f88e02f7c3ecbc35c6e827322531 /qpid/cpp/src
parent59b8d464a2a3b36f0985c10c057e14b284e3bc7c (diff)
downloadqpid-python-3615070a058ee43b3305d6b4464ee3a6e39e7b99.tar.gz
QPID-4712: authorisation for AMQP 1.0 connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt4
-rw-r--r--qpid/cpp/src/Makefile.am12
-rw-r--r--qpid/cpp/src/amqp.cmake4
-rw-r--r--qpid/cpp/src/qpid/amqp/MapBuilder.cpp130
-rw-r--r--qpid/cpp/src/qpid/amqp/MapBuilder.h63
-rw-r--r--qpid/cpp/src/qpid/amqp/descriptors.h9
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/Bridge.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp18
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h10
-rw-r--r--qpid/cpp/src/qpid/broker/Connection.h226
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.cpp12
-rw-r--r--qpid/cpp/src/qpid/broker/ConnectionHandler.h8
-rw-r--r--qpid/cpp/src/qpid/broker/HandlerImpl.h4
-rw-r--r--qpid/cpp/src/qpid/broker/Link.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/Link.h10
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/LinkRegistry.h7
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp11
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h13
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp7
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp16
-rw-r--r--qpid/cpp/src/qpid/broker/SaslAuthenticator.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp6
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionAdapter.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/SessionContext.h4
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/SessionHandler.h11
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/SessionState.h2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Authorise.cpp131
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Authorise.h57
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp95
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/DataReader.cpp14
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/DataReader.h6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Exception.cpp30
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Exception.h (renamed from qpid/cpp/src/qpid/broker/ConnectionIdentity.h)42
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Filter.cpp17
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Filter.h6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Incoming.h16
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp9
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Interconnect.h1
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp49
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h18
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Sasl.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp34
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.h5
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp (renamed from qpid/cpp/src/qpid/broker/Connection.cpp)6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h235
-rw-r--r--qpid/cpp/src/qpid/ha/BrokerReplicator.cpp4
-rw-r--r--qpid/cpp/src/qpid/ha/ConnectionObserver.cpp24
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp33
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp51
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp16
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp7
61 files changed, 1222 insertions, 410 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index 48279a2aed..3ecbac5cc2 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1034,6 +1034,8 @@ set (qpidcommon_SOURCES
qpid/amqp/MapEncoder.cpp
qpid/amqp/MapSizeCalculator.h
qpid/amqp/MapSizeCalculator.cpp
+ qpid/amqp/MapBuilder.h
+ qpid/amqp/MapBuilder.cpp
qpid/amqp/MapReader.h
qpid/amqp/MapReader.cpp
qpid/amqp/MessageEncoder.h
@@ -1261,7 +1263,7 @@ set (qpidbroker_SOURCES
qpid/broker/MessageGroupManager.cpp
qpid/broker/PersistableMessage.cpp
qpid/broker/Bridge.cpp
- qpid/broker/Connection.cpp
+ qpid/broker/amqp_0_10/Connection.cpp
qpid/broker/ConnectionHandler.cpp
qpid/broker/DeliverableMessage.cpp
qpid/broker/DeliveryRecord.cpp
diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am
index a3825f7508..942575ec3d 100644
--- a/qpid/cpp/src/Makefile.am
+++ b/qpid/cpp/src/Makefile.am
@@ -536,6 +536,8 @@ libqpidcommon_la_SOURCES += \
qpid/amqp/MapEncoder.cpp \
qpid/amqp/MapSizeCalculator.h \
qpid/amqp/MapSizeCalculator.cpp \
+ qpid/amqp/MapBuilder.h \
+ qpid/amqp/MapBuilder.cpp \
qpid/amqp/MapReader.h \
qpid/amqp/MapReader.cpp \
qpid/amqp/MessageEncoder.h \
@@ -598,15 +600,15 @@ libqpidbroker_la_SOURCES = \
qpid/broker/Broker.cpp \
qpid/broker/Broker.h \
qpid/broker/BrokerImportExport.h \
- qpid/broker/Connection.cpp \
- qpid/broker/Connection.h \
+ qpid/broker/amqp_0_10/Connection.cpp \
+ qpid/broker/amqp_0_10/Connection.h \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionHandler.h \
qpid/broker/Consumer.h \
qpid/broker/Credit.h \
qpid/broker/Credit.cpp \
qpid/broker/ConsumerFactory.h \
- qpid/broker/ConnectionIdentity.h \
+ qpid/broker/Connection.h \
qpid/broker/ConnectionObserver.h \
qpid/broker/ConnectionObservers.h \
qpid/broker/ConfigurationObserver.h \
@@ -801,12 +803,16 @@ if HAVE_PROTON
dmoduleexec_LTLIBRARIES += amqp.la
amqp_la_LIBADD = libqpidcommon.la
amqp_la_SOURCES = \
+ qpid/broker/amqp/Authorise.h \
+ qpid/broker/amqp/Authorise.cpp \
qpid/broker/amqp/Connection.h \
qpid/broker/amqp/Connection.cpp \
qpid/broker/amqp/DataReader.h \
qpid/broker/amqp/DataReader.cpp \
qpid/broker/amqp/Domain.h \
qpid/broker/amqp/Domain.cpp \
+ qpid/broker/amqp/Exception.h \
+ qpid/broker/amqp/Exception.cpp \
qpid/broker/amqp/Filter.h \
qpid/broker/amqp/Filter.cpp \
qpid/broker/amqp/Header.h \
diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake
index a2ee1d1cb6..5875dd9f9d 100644
--- a/qpid/cpp/src/amqp.cmake
+++ b/qpid/cpp/src/amqp.cmake
@@ -53,12 +53,16 @@ if (BUILD_AMQP)
set (amqp_SOURCES
+ qpid/broker/amqp/Authorise.h
+ qpid/broker/amqp/Authorise.cpp
qpid/broker/amqp/Connection.h
qpid/broker/amqp/Connection.cpp
qpid/broker/amqp/DataReader.h
qpid/broker/amqp/DataReader.cpp
qpid/broker/amqp/Domain.h
qpid/broker/amqp/Domain.cpp
+ qpid/broker/amqp/Exception.h
+ qpid/broker/amqp/Exception.cpp
qpid/broker/amqp/Filter.h
qpid/broker/amqp/Filter.cpp
qpid/broker/amqp/Header.h
diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
new file mode 100644
index 0000000000..a554497791
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapBuilder.h"
+#include <assert.h>
+
+namespace qpid {
+namespace amqp {
+namespace {
+const std::string BINARY("binary");
+const std::string UTF8("utf8");
+const std::string ASCII("ascii");
+}
+
+qpid::types::Variant::Map MapBuilder::getMap()
+{
+ return map;
+}
+const qpid::types::Variant::Map MapBuilder::getMap() const
+{
+ return map;
+}
+
+void MapBuilder::onNullValue(const CharSequence& key, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = qpid::types::Variant();
+}
+void MapBuilder::onBooleanValue(const CharSequence& key, bool value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+void MapBuilder::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onByteValue(const CharSequence& key, int8_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onShortValue(const CharSequence& key, int16_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onIntValue(const CharSequence& key, int32_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onLongValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onFloatValue(const CharSequence& key, float value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onDoubleValue(const CharSequence& key, double value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ assert(value.size == 16);
+ map[std::string(key.data, key.size)] = qpid::types::Uuid(value.data);
+}
+
+void MapBuilder::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*)
+{
+ map[std::string(key.data, key.size)] = value;
+}
+
+void MapBuilder::onBinaryValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ qpid::types::Variant& v = map[std::string(key.data, key.size)];
+ v = std::string(value.data, value.size);
+ v.setEncoding(BINARY);
+}
+
+void MapBuilder::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ qpid::types::Variant& v = map[std::string(key.data, key.size)];
+ v = std::string(value.data, value.size);
+ v.setEncoding(UTF8);
+}
+
+void MapBuilder::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*)
+{
+ qpid::types::Variant& v = map[std::string(key.data, key.size)];
+ v = std::string(value.data, value.size);
+ v.setEncoding(ASCII);
+}
+}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.h b/qpid/cpp/src/qpid/amqp/MapBuilder.h
new file mode 100644
index 0000000000..0e3b95f633
--- /dev/null
+++ b/qpid/cpp/src/qpid/amqp/MapBuilder.h
@@ -0,0 +1,63 @@
+#ifndef QPID_AMQP_MAPBUILDER_H
+#define QPID_AMQP_MAPBUILDER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MapReader.h"
+#include "qpid/types/Variant.h"
+
+namespace qpid {
+namespace amqp {
+
+/**
+ * Utility to build a Variant::Map from a data stream (doesn't handle
+ * nested maps or lists yet)
+ */
+class MapBuilder : public MapReader
+{
+ public:
+ void onNullValue(const CharSequence& /*key*/, const Descriptor*);
+ void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*);
+ void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*);
+ void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*);
+ void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*);
+ void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*);
+ void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*);
+ void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*);
+ void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*);
+ void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
+ void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*);
+ void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*);
+ void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+ void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*);
+
+ void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+ void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+ void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*);
+
+ qpid::types::Variant::Map getMap();
+ const qpid::types::Variant::Map getMap() const;
+ private:
+ qpid::types::Variant::Map map;
+};
+}} // namespace qpid::amqp
+
+#endif /*!QPID_AMQP_MAPBUILDER_H*/
diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h
index 6545433947..2a5691beaf 100644
--- a/qpid/cpp/src/qpid/amqp/descriptors.h
+++ b/qpid/cpp/src/qpid/amqp/descriptors.h
@@ -89,6 +89,15 @@ const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL);
const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL);
}
+namespace error_conditions {
+//note these are not actually descriptors
+const std::string INTERNAL_ERROR("amqp:internal-error");
+const std::string NOT_FOUND("amqp:not-found");
+const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access");
+const std::string DECODE_ERROR("amqp:decode-error");
+const std::string NOT_ALLOWED("amqp:not-allowed");
+const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded");
+}
}} // namespace qpid::amqp
#endif /*!QPID_AMQP_DESCRIPTORS_H*/
diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp
index 0c2655f507..6b34898158 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.cpp
+++ b/qpid/cpp/src/qpid/broker/Bridge.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/FedOps.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/SessionState.h"
@@ -100,7 +100,7 @@ Bridge::~Bridge()
mgmtObject->resourceDestroy();
}
-void Bridge::create(Connection& c)
+void Bridge::create(amqp_0_10::Connection& c)
{
detached = false; // Reset detached in case we are recovering.
conn = &c;
@@ -200,7 +200,7 @@ void Bridge::create(Connection& c)
if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking();
}
-void Bridge::cancel(Connection&)
+void Bridge::cancel(amqp_0_10::Connection&)
{
if (resetProxy()) {
peer->getMessage().cancel(args.i_dest);
diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h
index 54a5f1600a..604a8473f3 100644
--- a/qpid/cpp/src/qpid/broker/Bridge.h
+++ b/qpid/cpp/src/qpid/broker/Bridge.h
@@ -39,8 +39,9 @@
namespace qpid {
namespace broker {
-
+namespace amqp_0_10 {
class Connection;
+}
class Link;
class LinkRegistry;
@@ -115,9 +116,9 @@ class Bridge : public PersistableConfig,
void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; }
private:
struct PushHandler : framing::FrameHandler {
- PushHandler(Connection* c) { conn = c; }
+ PushHandler(amqp_0_10::Connection* c) { conn = c; }
void handle(framing::AMQFrame& frame);
- Connection* conn;
+ amqp_0_10::Connection* conn;
};
std::auto_ptr<PushHandler> pushHandler;
@@ -134,14 +135,14 @@ class Bridge : public PersistableConfig,
std::string queueName;
std::string altEx;
mutable uint64_t persistenceId;
- Connection* conn;
+ amqp_0_10::Connection* conn;
InitializeCallback initialize;
bool detached; // Set when session is detached.
bool resetProxy();
// connection Management (called by owning Link)
- void create(Connection& c);
- void cancel(Connection& c);
+ void create(amqp_0_10::Connection& c);
+ void cancel(amqp_0_10::Connection& c);
void closed();
friend class Link; // to call create, cancel, closed()
boost::shared_ptr<ErrorListener> errorListener;
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 8cdfd42f02..bbcd7de017 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/Broker.h"
#include "qpid/broker/AclModule.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/DirectExchange.h"
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/HeadersExchange.h"
@@ -707,13 +707,13 @@ struct InvalidParameter : public qpid::Exception
};
void Broker::createObject(const std::string& type, const std::string& name,
- const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context)
+ const Variant::Map& properties, bool /*strict*/, const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
- connectionId = context->getUrl();
+ connectionId = context->getMgmtId();
}
//TODO: implement 'strict' option (check there are no unrecognised properties)
QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")");
@@ -898,13 +898,13 @@ void Broker::createObject(const std::string& type, const std::string& name,
}
void Broker::deleteObject(const std::string& type, const std::string& name,
- const Variant::Map& options, const ConnectionIdentity* context)
+ const Variant::Map& options, const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
- connectionId = context->getUrl();
+ connectionId = context->getMgmtId();
}
QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")");
if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) {
@@ -952,13 +952,13 @@ void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmp
Manageable::status_t Broker::queryObject(const std::string& type,
const std::string& name,
Variant::Map& results,
- const ConnectionIdentity* context)
+ const Connection* context)
{
std::string userId;
std::string connectionId;
if (context) {
userId = context->getUserId();
- connectionId = context->getUrl();
+ connectionId = context->getMgmtId();
}
QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")");
@@ -994,7 +994,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name,
}
Manageable::status_t Broker::getTimestampConfig(bool& receive,
- const ConnectionIdentity* context)
+ const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
@@ -1006,7 +1006,7 @@ Manageable::status_t Broker::getTimestampConfig(bool& receive,
}
Manageable::status_t Broker::setTimestampConfig(const bool receive,
- const ConnectionIdentity* context)
+ const Connection* context)
{
std::string name; // none needed for broker
std::string userId = context->getUserId();
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index 20e0c16e70..52b79a0944 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -149,20 +149,20 @@ class Broker : public sys::Runnable, public Plugin::Target,
void setLogHiresTimestamp(bool enabled);
bool getLogHiresTimestamp();
void createObject(const std::string& type, const std::string& name,
- const qpid::types::Variant::Map& properties, bool strict, const ConnectionIdentity* context);
+ const qpid::types::Variant::Map& properties, bool strict, const Connection* context);
void deleteObject(const std::string& type, const std::string& name,
- const qpid::types::Variant::Map& options, const ConnectionIdentity* context);
+ const qpid::types::Variant::Map& options, const Connection* context);
void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty);
Manageable::status_t queryObject(const std::string& type, const std::string& name,
- qpid::types::Variant::Map& results, const ConnectionIdentity* context);
+ qpid::types::Variant::Map& results, const Connection* context);
Manageable::status_t queryQueue( const std::string& name,
const std::string& userId,
const std::string& connectionId,
qpid::types::Variant::Map& results);
Manageable::status_t getTimestampConfig(bool& receive,
- const ConnectionIdentity* context);
+ const Connection* context);
Manageable::status_t setTimestampConfig(const bool receive,
- const ConnectionIdentity* context);
+ const Connection* context);
Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue);
void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs);
boost::shared_ptr<sys::Poller> poller;
diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h
index 0f94a32fbf..ecc48123cf 100644
--- a/qpid/cpp/src/qpid/broker/Connection.h
+++ b/qpid/cpp/src/qpid/broker/Connection.h
@@ -21,215 +21,35 @@
* under the License.
*
*/
-
-#include <memory>
-#include <sstream>
-#include <vector>
-#include <queue>
-
-#include "qpid/broker/BrokerImportExport.h"
-
-#include "qpid/broker/ConnectionHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
-#include "qpid/broker/OwnershipToken.h"
-#include "qpid/management/Manageable.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/SecuritySettings.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/RefCounted.h"
-#include "qpid/Url.h"
-#include "qpid/ptr_map.h"
-
-#include "qmf/org/apache/qpid/broker/Connection.h"
-
-#include <boost/ptr_container/ptr_map.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/bind.hpp>
-
-#include <algorithm>
+#include <map>
+#include <string>
namespace qpid {
-namespace sys {
-class ConnectionOutputHandler;
-class Timer;
-class TimerTask;
+namespace management {
+class ObjectId;
+}
+namespace types {
+class Variant;
}
-namespace broker {
-
-class Broker;
-class LinkRegistry;
-class Queue;
-class SecureConnection;
-class SessionHandler;
-struct ConnectionTimeoutTask;
-
-class Connection : public sys::ConnectionInputHandler, public ConnectionIdentity,
- public OwnershipToken, public management::Manageable,
- public RefCounted
-{
- public:
- uint32_t getFrameMax() const { return framemax; }
- uint16_t getHeartbeat() const { return heartbeat; }
- uint16_t getHeartbeatMax() const { return heartbeatmax; }
-
- void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
- void setHeartbeat(uint16_t hb) { heartbeat = hb; }
- void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
-
- void setUrl(const std::string& _url) { url = _url; }
-
- const OwnershipToken* getOwnership() const { return this; };
- const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
- const std::string& getUserId() const { return userId; }
- const std::string& getUrl() const { return url; }
-
- void setUserProxyAuth(const bool b);
- bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
- bool isFederationLink() const { return federationPeerTag.size() > 0; }
- void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
- const std::string& getFederationPeerTag() const { return federationPeerTag; }
- std::vector<Url>& getKnownHosts() { return knownHosts; }
-
- /**@return true if user is the authenticated user on this connection.
- * If id has the default realm will also compare plain username.
- */
- bool isAuthenticatedUser(const std::string& id) const {
- return (id == userId || (isDefaultRealm && id == userName));
- }
-
- Broker& getBroker() { return broker; }
-
- sys::ConnectionOutputHandler& getOutput() { return *out; }
- void activateOutput();
- void addOutputTask(OutputTask*);
- void removeOutputTask(OutputTask*);
- framing::ProtocolVersion getVersion() const { return version; }
-
- Connection(sys::ConnectionOutputHandler* out,
- Broker& broker,
- const std::string& mgmtId,
- const qpid::sys::SecuritySettings&,
- bool isLink = false,
- uint64_t objectId = 0);
-
- ~Connection ();
-
- /** Get the SessionHandler for channel. Create if it does not already exist */
- SessionHandler& getChannel(framing::ChannelId channel);
-
- /** Close the connection. Waits for the client to respond with close-ok
- * before actually destroying the connection.
- */
- QPID_BROKER_EXTERN void close(
- framing::connection::CloseCode code, const std::string& text);
-
- /** Abort the connection. Close abruptly and immediately. */
- QPID_BROKER_EXTERN void abort();
-
- // ConnectionInputHandler methods
- void received(framing::AMQFrame& frame);
- bool doOutput();
- void closed();
-
- void closeChannel(framing::ChannelId channel);
-
- // Manageable entry points
- management::ManagementObject::shared_ptr GetManagementObject(void) const;
- management::Manageable::status_t
- ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
-
- void requestIOProcessing (boost::function0<void>);
- void recordFromServer (const framing::AMQFrame& frame);
- void recordFromClient (const framing::AMQFrame& frame);
-
- // gets for configured federation links
- std::string getAuthMechanism();
- std::string getAuthCredentials();
- std::string getUsername();
- std::string getPassword();
- std::string getHost();
- uint16_t getPort();
-
- void notifyConnectionForced(const std::string& text);
- void setUserId(const std::string& uid);
-
- // credentials for connected client
- const std::string& getMgmtId() const { return mgmtId; }
- management::ManagementAgent* getAgent() const { return agent; }
-
- void setHeartbeatInterval(uint16_t heartbeat);
- void sendHeartbeat();
- void restartTimeout();
-
- void setSecureConnection(SecureConnection* secured);
-
- const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
- {
- return securitySettings;
- }
-
- /** @return true if the initial connection negotiation is complete. */
- bool isOpen();
-
- bool isLink() { return link; }
- void startLinkHeartbeatTimeoutTask();
-
- void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; }
- const framing::FieldTable& getClientProperties() const { return clientProperties; }
-
- private:
- // Management object is used in the constructor so must be early
- qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
-
- //contained output tasks
- sys::AggregateOutput outputTasks;
-
- boost::scoped_ptr<framing::FrameHandler> outboundTracker;
- boost::scoped_ptr<sys::ConnectionOutputHandler> out;
-
- Broker& broker;
-
- framing::ProtocolVersion version;
- uint32_t framemax;
- uint16_t heartbeat;
- uint16_t heartbeatmax;
- std::string userId;
- std::string url;
- bool userProxyAuth;
- std::string federationPeerTag;
- std::vector<Url> knownHosts;
- std::string userName;
- bool isDefaultRealm;
-
- typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
-
- ChannelMap channels;
- qpid::sys::SecuritySettings securitySettings;
- const bool link;
- ConnectionHandler adapter;
- bool mgmtClosing;
- const std::string mgmtId;
- sys::Mutex ioCallbackLock;
- std::queue<boost::function0<void> > ioCallbacks;
- LinkRegistry& links;
- management::ManagementAgent* agent;
- sys::Timer& timer;
- boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
- boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
- uint64_t objectId;
- framing::FieldTable clientProperties;
-
-friend class OutboundFrameTracker;
- void sent(const framing::AMQFrame& f);
- void doIoCallbacks();
+namespace broker {
- public:
+class OwnershipToken;
- qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
+/**
+ * Protocol independent connection abstraction.
+ */
+class Connection {
+public:
+ virtual ~Connection() {}
+ virtual const OwnershipToken* getOwnership() const = 0;
+ virtual const management::ObjectId getObjectId() const = 0;
+ virtual const std::string& getUserId() const = 0;
+ virtual const std::string& getMgmtId() const = 0;
+ virtual const std::map<std::string, types::Variant>& getClientProperties() const = 0;
+ virtual bool isLink() const = 0;
+ virtual void abort() = 0;
};
-
-}}
+}} // namespace qpid::broker
#endif /*!QPID_BROKER_CONNECTION_H*/
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
index 40393f1920..fd4af963ad 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -24,7 +24,7 @@
#include "qpid/SaslFactory.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/Url.h"
#include "qpid/framing/AllInvoker.h"
@@ -109,10 +109,10 @@ void ConnectionHandler::setSecureConnection(SecureConnection* secured)
handler->secured = secured;
}
-ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) :
+ConnectionHandler::ConnectionHandler(amqp_0_10::Connection& connection, bool isClient) :
handler(new Handler(connection, isClient)) {}
-ConnectionHandler::Handler::Handler(Connection& c, bool isClient) :
+ConnectionHandler::Handler::Handler(amqp_0_10::Connection& c, bool isClient) :
proxy(c.getOutput()),
connection(c), serverMode(!isClient), secured(0),
isOpen(false)
@@ -153,14 +153,14 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
{
const framing::FieldTable& clientProperties = body.getClientProperties();
qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject();
+ types::Variant::Map properties;
+ qpid::amqp_0_10::translate(clientProperties, properties);
if (mgmtObject != 0) {
string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME);
uint32_t pid = clientProperties.getAsInt(CLIENT_PID);
uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID);
- types::Variant::Map properties;
- qpid::amqp_0_10::translate(clientProperties, properties);
mgmtObject->set_remoteProperties(properties);
if (!procName.empty())
mgmtObject->set_remoteProcessName(procName);
@@ -192,7 +192,7 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body)
throw;
}
- connection.setClientProperties(clientProperties);
+ connection.setClientProperties(properties);
if (clientProperties.isSet(QPID_FED_TAG)) {
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
}
diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
index 9346e7b1ac..7af2fe3cb4 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h
@@ -47,7 +47,9 @@ struct SecuritySettings;
namespace broker {
+namespace amqp_0_10 {
class Connection;
+}
class SecureConnection;
class ConnectionHandler : public framing::FrameHandler
@@ -55,13 +57,13 @@ class ConnectionHandler : public framing::FrameHandler
struct Handler : public framing::AMQP_AllOperations::ConnectionHandler
{
framing::AMQP_AllProxy::Connection proxy;
- Connection& connection;
+ amqp_0_10::Connection& connection;
bool serverMode;
std::auto_ptr<SaslAuthenticator> authenticator;
SecureConnection* secured;
bool isOpen;
- Handler(Connection& connection, bool isClient);
+ Handler(amqp_0_10::Connection& connection, bool isClient);
~Handler();
void startOk(const qpid::framing::ConnectionStartOkBody& body);
void startOk(const qpid::framing::FieldTable& clientProperties,
@@ -99,7 +101,7 @@ class ConnectionHandler : public framing::FrameHandler
bool handle(const qpid::framing::AMQMethodBody& method);
public:
- ConnectionHandler(Connection& connection, bool isClient );
+ ConnectionHandler(amqp_0_10::Connection& connection, bool isClient );
void close(framing::connection::CloseCode code, const std::string& text);
void heartbeat();
void handle(framing::AMQFrame& frame);
diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h
index 72bfb1c474..c41438fd90 100644
--- a/qpid/cpp/src/qpid/broker/HandlerImpl.h
+++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h
@@ -21,7 +21,7 @@
#include "qpid/broker/SemanticState.h"
#include "qpid/broker/SessionContext.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
namespace qpid {
namespace broker {
@@ -40,7 +40,7 @@ class HandlerImpl {
HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {}
framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); }
- Connection& getConnection() { return session.getConnection(); }
+ amqp_0_10::Connection& getConnection() { return session.getConnection(); }
Broker& getBroker() { return session.getConnection().getBroker(); }
};
diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp
index c6ac6832c0..31685eb1de 100644
--- a/qpid/cpp/src/qpid/broker/Link.cpp
+++ b/qpid/cpp/src/qpid/broker/Link.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/Link.h"
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/sys/Timer.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h"
#include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h"
@@ -233,7 +233,7 @@ void Link::startConnectionLH ()
}
}
-void Link::established(Connection* c)
+void Link::established(qpid::broker::amqp_0_10::Connection* c)
{
stringstream addr;
addr << host << ":" << port;
diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h
index 01ddc68d97..d9924feec3 100644
--- a/qpid/cpp/src/qpid/broker/Link.h
+++ b/qpid/cpp/src/qpid/broker/Link.h
@@ -45,8 +45,10 @@ namespace broker {
class LinkRegistry;
class Broker;
-class Connection;
class LinkExchange;
+namespace amqp_0_10 {
+class Connection;
+}
class Link : public PersistableConfig, public management::Manageable {
private:
@@ -83,7 +85,7 @@ class Link : public PersistableConfig, public management::Manageable {
Bridges cancellations; // Bridges pending cancellation
framing::ChannelId nextFreeChannel;
RangeSet<framing::ChannelId> freeChannels;
- Connection* connection;
+ amqp_0_10::Connection* connection;
management::ManagementAgent* agent;
boost::function<void(Link*)> listener;
boost::intrusive_ptr<sys::TimerTask> timerTask;
@@ -109,7 +111,7 @@ class Link : public PersistableConfig, public management::Manageable {
void reconnectLH(const Address&); //called by LinkRegistry
// connection management (called by LinkRegistry)
- void established(Connection*); // Called when connection is created
+ void established(amqp_0_10::Connection*); // Called when connection is created
void opened(); // Called when connection is open (after create)
void closed(int, std::string); // Called when connection goes away
void notifyConnectionForced(const std::string text);
@@ -194,7 +196,7 @@ class Link : public PersistableConfig, public management::Manageable {
/** The current connction for this link. Note returns 0 if the link is not
* presently connected.
*/
- Connection* getConnection() { return connection; }
+ amqp_0_10::Connection* getConnection() { return connection; }
};
}
}
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
index 8642294d06..ed1e314bda 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
@@ -21,7 +21,7 @@
#include "qpid/broker/LinkRegistry.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/Link.h"
#include "qpid/log/Statement.h"
#include <iostream>
@@ -53,10 +53,26 @@ class LinkRegistryConnectionObserver : public ConnectionObserver {
LinkRegistry& links;
public:
LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {}
- void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); }
- void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); }
- void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); }
- void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); }
+ void connection(Connection& in)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyConnection(c->getMgmtId(), c);
+ }
+ void opened(Connection& in)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyOpened(c->getMgmtId());
+ }
+ void closed(Connection& in)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyClosed(c->getMgmtId());
+ }
+ void forced(Connection& in, const string& text)
+ {
+ amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in);
+ if (c) links.notifyConnectionForced(c->getMgmtId(), text);
+ }
};
LinkRegistry::LinkRegistry (Broker* _broker) :
@@ -287,7 +303,7 @@ Link::shared_ptr LinkRegistry::findLink(const std::string& connId)
return Link::shared_ptr();
}
-void LinkRegistry::notifyConnection(const std::string& key, Connection* c)
+void LinkRegistry::notifyConnection(const std::string& key, amqp_0_10::Connection* c)
{
// find a link that is attempting to connect to the remote, and
// create a mapping from connection id to link
diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h
index e5b1c40781..a156a53624 100644
--- a/qpid/cpp/src/qpid/broker/LinkRegistry.h
+++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h
@@ -35,10 +35,11 @@
namespace qpid {
namespace broker {
-
+namespace amqp_0_10 {
+ class Connection;
+}
class Link;
class Broker;
- class Connection;
class LinkRegistry {
typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap;
typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
@@ -58,7 +59,7 @@ namespace broker {
boost::shared_ptr<Link> findLink(const std::string& key);
// Methods called by the connection observer, key is connection identifier
- void notifyConnection (const std::string& key, Connection* c);
+ void notifyConnection (const std::string& key, amqp_0_10::Connection* c);
void notifyOpened (const std::string& key);
void notifyClosed (const std::string& key);
void notifyConnectionForced (const std::string& key, const std::string& text);
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 1d901025b6..ec44404793 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -23,7 +23,8 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/MapHandler.h"
-#include "qpid/broker/ConnectionIdentity.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/OwnershipToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/Manageable.h"
#include "qpid/StringUtils.h"
@@ -203,10 +204,10 @@ uint8_t Message::getPriority() const
bool Message::getIsManagementMessage() const { return isManagementMessage; }
void Message::setIsManagementMessage(bool b) { isManagementMessage = b; }
-const OwnershipToken* Message::getPublisherOwnership() const { return publisher->getOwnership(); }
-const management::ObjectId Message::getPublisherObjectId() const { return publisher->getObjectId(); }
-const std::string& Message::getPublisherUserId() const { return publisher->getUserId(); }
-const std::string& Message::getPublisherUrl() const { return publisher->getUrl(); }
+const Connection* Message::getPublisher() const { return publisher; }
+void Message::setPublisher(const Connection& p) { publisher = &p; }
+bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher->getOwnership()); }
+
qpid::framing::SequenceNumber Message::getSequence() const
{
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 41ce2ec1a2..8f12b06a9d 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -47,7 +47,7 @@ class Manageable;
namespace broker {
class OwnershipToken;
-class ConnectionIdentity;
+class Connection;
enum MessageState
{
@@ -85,12 +85,9 @@ public:
int getDeliveryCount() const { return deliveryCount; }
void resetDeliveryCount() { deliveryCount = -1; }
- void setPublisher(const ConnectionIdentity& p) { publisher = &p; }
- const ConnectionIdentity& getPublisher() const { return *publisher; }
- const OwnershipToken* getPublisherOwnership() const;
- const management::ObjectId getPublisherObjectId() const;
- const std::string& getPublisherUserId() const;
- const std::string& getPublisherUrl() const;
+ void setPublisher(const Connection& p);
+ const Connection* getPublisher() const;
+ bool isLocalTo(const OwnershipToken*) const;
QPID_BROKER_EXTERN std::string getRoutingKey() const;
QPID_BROKER_EXTERN bool isPersistent() const;
@@ -148,7 +145,7 @@ public:
boost::intrusive_ptr<Encoding> encoding;
boost::intrusive_ptr<PersistableMessage> persistentContext;
int deliveryCount;
- const ConnectionIdentity* publisher;
+ const Connection* publisher;
qpid::sys::AbsTime expiration;
boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
uint64_t timestamp;
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index e1782b01ce..c402e3e016 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -220,18 +220,13 @@ Queue::~Queue()
{
}
-bool isLocalTo(const OwnershipToken* token, const Message& msg)
-{
- return token && token->isLocal(msg.getPublisherOwnership());
-}
-
bool Queue::isLocal(const Message& msg)
{
//message is considered local if it was published on the same
//connection as that of the session which declared this queue
//exclusive (owner) or which has an exclusive subscription
//(exclusive)
- return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
+ return settings.noLocal && (msg.isLocalTo(owner) || msg.isLocalTo(exclusive));
}
bool Queue::isExcluded(const Message& msg)
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
index a5ef8c560c..e5d6db1a04 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp
@@ -21,7 +21,7 @@
#include "qpid/broker/AclModule.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/FieldValue.h"
#include "qpid/log/Statement.h"
@@ -54,12 +54,12 @@ namespace broker {
class NullAuthenticator : public SaslAuthenticator
{
- Connection& connection;
+ amqp_0_10::Connection& connection;
framing::AMQP_ClientProxy::Connection client;
std::string realm;
const bool encrypt;
public:
- NullAuthenticator(Connection& connection, bool encrypt);
+ NullAuthenticator(amqp_0_10::Connection& connection, bool encrypt);
~NullAuthenticator();
void getMechanisms(framing::Array& mechanisms);
void start(const std::string& mechanism, const std::string* response);
@@ -74,7 +74,7 @@ public:
class CyrusAuthenticator : public SaslAuthenticator
{
sasl_conn_t *sasl_conn;
- Connection& connection;
+ amqp_0_10::Connection& connection;
framing::AMQP_ClientProxy::Connection client;
const bool encrypt;
@@ -82,7 +82,7 @@ class CyrusAuthenticator : public SaslAuthenticator
bool getUsername(std::string& uid);
public:
- CyrusAuthenticator(Connection& connection, bool encrypt);
+ CyrusAuthenticator(amqp_0_10::Connection& connection, bool encrypt);
~CyrusAuthenticator();
void init();
void getMechanisms(framing::Array& mechanisms);
@@ -167,7 +167,7 @@ void SaslAuthenticator::fini(void)
#endif
-std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c )
+std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(amqp_0_10::Connection& c )
{
if (c.getBroker().getOptions().auth) {
return std::auto_ptr<SaslAuthenticator>(
@@ -179,7 +179,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti
}
-NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()),
+NullAuthenticator::NullAuthenticator(amqp_0_10::Connection& c, bool e) : connection(c), client(c.getOutput()),
realm(c.getBroker().getOptions().realm), encrypt(e) {}
NullAuthenticator::~NullAuthenticator() {}
@@ -246,7 +246,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t)
#if HAVE_SASL
-CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) :
+CyrusAuthenticator::CyrusAuthenticator(amqp_0_10::Connection& c, bool _encrypt) :
sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt)
{
init();
diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
index e5ecc9f6ec..97434d6ffe 100644
--- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
+++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h
@@ -34,7 +34,9 @@
namespace qpid {
namespace broker {
+namespace amqp_0_10 {
class Connection;
+}
class SaslAuthenticator
{
@@ -54,7 +56,7 @@ public:
static void init(const std::string& saslName, std::string const & saslConfigPath );
static void fini(void);
- static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection);
+ static std::auto_ptr<SaslAuthenticator> createAuthenticator(amqp_0_10::Connection& connection);
virtual void callUserIdCallbacks() { }
};
diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
index 6b4f6b3025..0258350043 100644
--- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
+++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp
@@ -22,7 +22,7 @@
#include "qpid/amqp_0_10/Connection.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/SecureConnection.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
@@ -35,7 +35,7 @@ using framing::ProtocolVersion;
using qpid::sys::SecuritySettings;
typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr;
typedef std::auto_ptr<SecureConnection> SecureConnectionPtr;
-typedef std::auto_ptr<Connection> ConnectionPtr;
+typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr;
typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr;
SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {}
@@ -64,7 +64,7 @@ SecureConnectionFactory::create_0_10(sys::OutputControl& out, const std::string&
{
SecureConnectionPtr sc(new SecureConnection());
CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient));
- ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, brokerActsAsClient));
+ ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), broker, id, external, brokerActsAsClient));
i->setSecureConnection(sc.get());
c->setInputHandler(InputPtr(i.release()));
sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c));
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 54069df591..dd7a25aaa4 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -22,7 +22,7 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/DeliverableMessage.h"
#include "qpid/broker/DtxAck.h"
#include "qpid/broker/DtxTimeout.h"
@@ -83,7 +83,7 @@ SemanticState::SemanticState(SessionState& ss)
authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()),
userID(getSession().getConnection().getUserId()),
closeComplete(false),
- connectionId(getSession().getConnection().getUrl())
+ connectionId(getSession().getConnection().getMgmtId())
{}
SemanticState::~SemanticState() {
diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
index f7ca4890b4..2d4868628f 100644
--- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -18,7 +18,7 @@
#include "qpid/broker/SessionAdapter.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/DtxTimeout.h"
#include "qpid/broker/Queue.h"
#include "qpid/Exception.h"
@@ -96,14 +96,14 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const
try{
std::pair<Exchange::shared_ptr, bool> response =
getBroker().createExchange(exchange, type, durable, alternateExchange, args,
- getConnection().getUserId(), getConnection().getUrl());
+ getConnection().getUserId(), getConnection().getMgmtId());
if (!response.second) {
//exchange already there, not created
checkType(response.first, type);
checkAlternate(response.first, alternate);
QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl()
+ << " rhost:" << getConnection().getMgmtId()
<< " type:" << type
<< " alternateExchange:" << alternateExchange
<< " durable:" << (durable ? "T" : "F"));
@@ -134,7 +134,7 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex
void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/)
{
//TODO: implement if-unused
- getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl());
+ getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getMgmtId());
}
ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name)
@@ -156,7 +156,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName,
const FieldTable& arguments)
{
getBroker().bind(queueName, exchangeName, routingKey, arguments,
- getConnection().getUserId(), getConnection().getUrl());
+ getConnection().getUserId(), getConnection().getMgmtId());
state.addBinding(queueName, exchangeName, routingKey, arguments);
}
@@ -166,7 +166,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName,
{
state.removeBinding(queueName, exchangeName, routingKey);
getBroker().unbind(queueName, exchangeName, routingKey,
- getConnection().getUserId(), getConnection().getUrl());
+ getConnection().getUserId(), getConnection().getMgmtId());
}
ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName,
@@ -209,7 +209,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string
SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session)
: HandlerHelper(session), broker(getBroker()),
//record connection id and userid for deleting exclsuive queues after session has ended:
- connectionId(getConnection().getUrl()), userId(getConnection().getUserId())
+ connectionId(getConnection().getMgmtId()), userId(getConnection().getUserId())
{}
@@ -302,7 +302,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
exclusive ? &session : 0,
alternateExchange,
getConnection().getUserId(),
- getConnection().getUrl());
+ getConnection().getMgmtId());
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -316,7 +316,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string&
}
QPID_LOG_CAT(debug, model, "Create queue. name:" << name
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl()
+ << " rhost:" << getConnection().getMgmtId()
<< " durable:" << (durable ? "T" : "F")
<< " exclusive:" << (exclusive ? "T" : "F")
<< " autodelete:" << (autoDelete ? "T" : "F")
@@ -363,7 +363,7 @@ void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool
void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty)
{
- getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(),
+ getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getMgmtId(),
boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty));
}
@@ -429,12 +429,12 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName,
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(),
+ agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(),
queueName, destination, exclusive, ManagementAgent::toMap(arguments)));
QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName
<< " destination:" << destination
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl()
+ << " rhost:" << getConnection().getMgmtId()
<< " exclusive:" << (exclusive ? "T" : "F")
);
}
@@ -448,10 +448,10 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination )
ManagementAgent* agent = getBroker().getManagementAgent();
if (agent)
- agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination));
+ agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination));
QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination
<< " user:" << getConnection().getUserId()
- << " rhost:" << getConnection().getUrl() );
+ << " rhost:" << getConnection().getMgmtId() );
}
void
diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h
index 25b8e22949..92a3dcecc2 100644
--- a/qpid/cpp/src/qpid/broker/SessionContext.h
+++ b/qpid/cpp/src/qpid/broker/SessionContext.h
@@ -36,14 +36,16 @@ class AMQP_ClientProxy;
namespace broker {
class Broker;
+namespace amqp_0_10 {
class Connection;
+}
class SessionContext : public OwnershipToken
{
public:
virtual ~SessionContext(){}
virtual bool isAttached() const = 0;
- virtual Connection& getConnection() = 0;
+ virtual amqp_0_10::Connection& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
virtual uint16_t getChannel() const = 0;
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
index 8cbecbc6f7..93977c8a6e 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp
@@ -20,7 +20,7 @@
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/SessionState.h"
#include "qpid/log/Statement.h"
#include "qpid/sys/ConnectionOutputHandler.h"
@@ -33,7 +33,7 @@ using namespace framing;
using namespace std;
using namespace qpid::sys;
-SessionHandler::SessionHandler(Connection& c, ChannelId ch)
+SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch)
: qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch),
connection(c),
proxy(out)
@@ -64,9 +64,9 @@ void SessionHandler::executionException(
errorListener->executionException(code, msg);
}
-Connection& SessionHandler::getConnection() { return connection; }
+amqp_0_10::Connection& SessionHandler::getConnection() { return connection; }
-const Connection& SessionHandler::getConnection() const { return connection; }
+const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; }
void SessionHandler::handleDetach() {
qpid::amqp_0_10::SessionHandler::handleDetach();
diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h
index da57fb103e..3ee1538ccd 100644
--- a/qpid/cpp/src/qpid/broker/SessionHandler.h
+++ b/qpid/cpp/src/qpid/broker/SessionHandler.h
@@ -31,8 +31,9 @@ namespace qpid {
class SessionState;
namespace broker {
-
+namespace amqp_0_10 {
class Connection;
+}
class SessionState;
/**
@@ -57,15 +58,15 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
/**
*@param e must not be deleted until ErrorListener::detach has been called */
- SessionHandler(Connection&, framing::ChannelId);
+ SessionHandler(amqp_0_10::Connection&, framing::ChannelId);
~SessionHandler();
/** Get broker::SessionState */
SessionState* getSession() { return session.get(); }
const SessionState* getSession() const { return session.get(); }
- Connection& getConnection();
- const Connection& getConnection() const;
+ amqp_0_10::Connection& getConnection();
+ const amqp_0_10::Connection& getConnection() const;
framing::AMQP_ClientProxy& getProxy() { return proxy; }
const framing::AMQP_ClientProxy& getProxy() const { return proxy; }
@@ -93,7 +94,7 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler {
: framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {}
};
- Connection& connection;
+ amqp_0_10::Connection& connection;
framing::AMQP_ClientProxy proxy;
std::auto_ptr<SessionState> session;
boost::shared_ptr<ErrorListener> errorListener;
diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp
index f9b84dc9fb..421dc008a9 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.cpp
+++ b/qpid/cpp/src/qpid/broker/SessionState.cpp
@@ -96,7 +96,7 @@ uint16_t SessionState::getChannel() const {
return handler->getChannel();
}
-Connection& SessionState::getConnection() {
+amqp_0_10::Connection& SessionState::getConnection() {
assert(isAttached());
return handler->getConnection();
}
diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h
index eef9cf70c7..daf3767969 100644
--- a/qpid/cpp/src/qpid/broker/SessionState.h
+++ b/qpid/cpp/src/qpid/broker/SessionState.h
@@ -89,7 +89,7 @@ class SessionState : public qpid::SessionState,
uint16_t getChannel() const;
/** @pre isAttached() */
- Connection& getConnection();
+ amqp_0_10::Connection& getConnection();
bool isLocal(const OwnershipToken* t) const;
Broker& getBroker();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
new file mode 100644
index 0000000000..10b7fa5f7a
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Authorise.h"
+#include "Exception.h"
+#include "Filter.h"
+#include "qpid/amqp/descriptors.h"
+#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Exchange.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
+#include "qpid/types/Variant.h"
+#include <map>
+#include <boost/lexical_cast.hpp>
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+namespace {
+const std::string B_TRUE("true");
+const std::string B_FALSE("false");
+const std::string POLICY_TYPE("qpid.policy_type");
+}
+
+Authorise::Authorise(const std::string& u, AclModule* a) : user(u), acl(a) {}
+void Authorise::access(boost::shared_ptr<Exchange> exchange)
+{
+ if (acl) {
+ std::map<acl::Property, std::string> params;
+ params.insert(std::make_pair(acl::PROP_TYPE, exchange->getType()));
+ params.insert(std::make_pair(acl::PROP_DURABLE, exchange->isDurable() ? B_TRUE : B_FALSE));
+ if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_EXCHANGE, exchange->getName(), &params))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange access request from " << user));
+ }
+}
+void Authorise::access(boost::shared_ptr<Queue> queue)
+{
+ if (acl) {
+ const QueueSettings& settings = queue->getSettings();
+ std::map<acl::Property, std::string> params;
+ boost::shared_ptr<Exchange> altex = queue->getAlternateExchange();
+ if (altex)
+ params.insert(std::make_pair(acl::PROP_ALTERNATE, altex->getName()));
+ params.insert(std::make_pair(acl::PROP_DURABLE, settings.durable ? B_TRUE : B_FALSE));
+ params.insert(std::make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? B_TRUE : B_FALSE));
+ params.insert(std::make_pair(acl::PROP_AUTODELETE, settings.autodelete ? B_TRUE : B_FALSE));
+ qpid::types::Variant::Map::const_iterator i = settings.original.find(POLICY_TYPE);
+ if (i != settings.original.end())
+ params.insert(std::make_pair(acl::PROP_POLICYTYPE, i->second.asString()));
+ if (settings.maxDepth.hasCount())
+ params.insert(std::make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<std::string>(settings.maxDepth.getCount())));
+ if (settings.maxDepth.hasCount())
+ params.insert(std::make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<std::string>(settings.maxDepth.getSize())));
+ if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_QUEUE, queue->getName(), &params) )
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue access request from " << user));
+ }
+}
+
+void Authorise::incoming(boost::shared_ptr<Exchange> exchange)
+{
+ access(exchange);
+ //can't check publish permission here as do not yet know routing key
+}
+void Authorise::incoming(boost::shared_ptr<Queue> queue)
+{
+ access(queue);
+ if (acl) {
+ if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, std::string()/*default exchange*/, queue->getName()))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to queue " << queue->getName()));
+ }
+}
+void Authorise::outgoing(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue, const Filter& filter)
+{
+ access(exchange);
+ if (acl) {
+ std::map<qpid::acl::Property, std::string> params;
+ params.insert(std::make_pair(acl::PROP_QUEUENAME, queue->getName()));
+ params.insert(std::make_pair(acl::PROP_ROUTINGKEY, filter.getBindingKey(exchange)));
+
+ if (!acl->authorise(user, acl::ACT_BIND, acl::OBJ_EXCHANGE, exchange->getName(), &params))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange bind request from " << user));
+
+ if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user));
+ }
+}
+
+void Authorise::outgoing(boost::shared_ptr<Queue> queue)
+{
+ access(queue);
+ if (acl) {
+ if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user));
+ }
+}
+
+void Authorise::route(boost::shared_ptr<Exchange> exchange, const Message& msg)
+{
+ if (acl && acl->doTransferAcl()) {
+ if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, exchange->getName(), msg.getRoutingKey()))
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to " << exchange->getName() << " with routing-key " << msg.getRoutingKey()));
+ }
+}
+
+void Authorise::interlink()
+{
+ if (acl) {
+ if (!acl->authorise(user, acl::ACT_CREATE, acl::OBJ_LINK, "")){
+ throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied " << user << " a AMQP 1.0 link"));
+ }
+ }
+}
+
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.h b/qpid/cpp/src/qpid/broker/amqp/Authorise.h
new file mode 100644
index 0000000000..7bdb75375f
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.h
@@ -0,0 +1,57 @@
+#ifndef QPID_BROKER_AMQP_AUTHORISE_H
+#define QPID_BROKER_AMQP_AUTHORISE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+class AclModule;
+class Exchange;
+class Message;
+class Queue;
+namespace amqp {
+class Filter;
+
+/**
+ * Class to handle authorisation requests (and hide the ACL mess behind)
+ */
+class Authorise
+{
+ public:
+ Authorise(const std::string& user, AclModule*);
+ void access(boost::shared_ptr<Exchange>);
+ void access(boost::shared_ptr<Queue>);
+ void incoming(boost::shared_ptr<Exchange>);
+ void incoming(boost::shared_ptr<Queue>);
+ void outgoing(boost::shared_ptr<Exchange>, boost::shared_ptr<Queue>, const Filter&);
+ void outgoing(boost::shared_ptr<Queue>);
+ void route(boost::shared_ptr<Exchange>, const Message&);
+ void interlink();
+ private:
+ const std::string user;
+ AclModule* const acl;
+
+};
+}}} // namespace qpid::broker::amqp
+
+#endif /*!QPID_BROKER_AMQP_AUTHORISE_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 51576e9577..4433419402 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -19,9 +19,12 @@
*
*/
#include "Connection.h"
+#include "DataReader.h"
#include "Session.h"
-#include "qpid/Exception.h"
+#include "Exception.h"
+#include "qpid/broker/AclModule.h"
#include "qpid/broker/Broker.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/framing/Buffer.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
@@ -36,7 +39,6 @@ extern "C" {
namespace qpid {
namespace broker {
namespace amqp {
-
Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d)
: ManagedConnection(b, i),
connection(pn_connection()),
@@ -52,6 +54,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::
QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM);
+ broker.getConnectionObservers().connection(*this);
if (!saslInUse) {
//feed in a dummy AMQP 1.0 header as engine expects one, but
//we already read it (if sasl is in use we read the sasl
@@ -62,15 +65,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::
pi.encode(buffer);
pn_transport_input(transport, &protocolHeader[0], protocolHeader.size());
- //wont get a userid, so set a dummy one on the ManagedConnection to trigger event
- setUserid("no authentication used");
+ setUserId("none");
}
}
Connection::~Connection()
{
-
+ broker.getConnectionObservers().closed(*this);
pn_transport_free(transport);
pn_connection_free(connection);
}
@@ -97,8 +99,17 @@ size_t Connection::decode(const char* buffer, size_t size)
QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size);
try {
process();
+ } catch (const Exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ close();
} catch (const std::exception& e) {
QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
close();
}
pn_transport_tick(transport, 0);
@@ -108,7 +119,7 @@ size_t Connection::decode(const char* buffer, size_t size)
}
return n;
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on input: " << getError()));
+ throw Exception(qpid::amqp::error_conditions::DECODE_ERROR, QPID_MSG("Error on input: " << getError()));
} else {
return 0;
}
@@ -126,7 +137,7 @@ size_t Connection::encode(char* buffer, size_t size)
haveOutput = size;
return size;//Is this right?
} else if (n == PN_ERR) {
- throw qpid::Exception(QPID_MSG("Error on output: " << getError()));
+ throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError()));
} else {
haveOutput = false;
return 0;
@@ -139,8 +150,17 @@ bool Connection::canEncode()
if (i->second->dispatch()) haveOutput = true;
}
process();
+ } catch (const Exception& e) {
+ QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ close();
} catch (const std::exception& e) {
QPID_LOG(error, id << ": " << e.what());
+ pn_condition_t* error = pn_connection_condition(connection);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
close();
}
//TODO: proper handling of time in and out of tick
@@ -148,6 +168,28 @@ bool Connection::canEncode()
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
}
+
+void Connection::open()
+{
+ readPeerProperties();
+
+ pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ pn_connection_open(connection);
+ out.connectionEstablished();
+ opened();
+ broker.getConnectionObservers().opened(*this);
+}
+
+void Connection::readPeerProperties()
+{
+ /**
+ * TODO: enable when proton 0.5 has been released:
+ qpid::types::Variant::Map properties;
+ DataReader::read(pn_connection_remote_properties(connection), properties);
+ setPeerProperties(properties);
+ */
+}
+
void Connection::closed()
{
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
@@ -178,10 +220,8 @@ void Connection::process()
QPID_LOG(trace, id << " process()");
if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) {
QPID_LOG_CAT(debug, model, id << " connection opened");
- pn_connection_set_container(connection, broker.getFederationTag().c_str());
+ open();
setContainerId(pn_connection_remote_container(connection));
- pn_connection_open(connection);
- out.connectionEstablished();
}
for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) {
@@ -200,9 +240,17 @@ void Connection::process()
try {
session->second->attach(l);
QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l));
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
+ pn_condition_t* error = pn_link_condition(l);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(l);
} catch (const std::exception& e) {
QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what());
- //TODO: set error details on detach when that is exposed via engine API
+ pn_condition_t* error = pn_link_condition(l);
+ pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str());
+ pn_condition_set_description(error, e.what());
pn_link_close(l);
}
}
@@ -214,7 +262,15 @@ void Connection::process()
if (pn_link_is_receiver(link)) {
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
- i->second->readable(link, delivery);
+ try {
+ i->second->readable(link, delivery);
+ } catch (const Exception& e) {
+ QPID_LOG_CAT(error, protocol, "Error on publish: " << e.what());
+ pn_condition_t* error = pn_link_condition(link);
+ pn_condition_set_name(error, e.symbol());
+ pn_condition_set_description(error, e.what());
+ pn_link_close(link);
+ }
} else {
pn_delivery_update(delivery, PN_REJECTED);
}
@@ -271,4 +327,19 @@ std::string Connection::getDomain() const
{
return domain;
}
+
+void Connection::abort()
+{
+ out.abort();
+}
+
+void Connection::setUserId(const std::string& user)
+{
+ ManagedConnection::setUserId(user);
+ AclModule* acl = broker.getAcl();
+ if (acl && !acl->approveConnection(*this))
+ {
+ throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit");
+ }
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index d61db82e60..d460f972d2 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -58,6 +58,9 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection
pn_transport_t* getTransport();
Interconnects& getInterconnects();
std::string getDomain() const;
+ void setUserId(const std::string&);
+ void abort();
+
protected:
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
@@ -73,6 +76,8 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection
virtual void process();
std::string getError();
void close();
+ void open();
+ void readPeerProperties();
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
index 519dd71c9c..1140032174 100644
--- a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp
@@ -21,6 +21,7 @@
#include "DataReader.h"
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp/Descriptor.h"
+#include "qpid/amqp/MapBuilder.h"
#include "qpid/log/Statement.h"
#include <string>
extern "C" {
@@ -52,11 +53,6 @@ DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {}
void DataReader::read(pn_data_t* data)
{
- /*
- while (pn_data_next(data)) {
- readOne(data);
- }
- */
do {
readOne(data);
} while (pn_data_next(data));
@@ -184,4 +180,12 @@ void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descript
pn_data_exit(data);
reader.onEndMap(count, descriptor);
}
+
+void DataReader::read(pn_data_t* data, std::map<std::string, qpid::types::Variant>& out)
+{
+ qpid::amqp::MapBuilder builder;
+ DataReader reader(builder);
+ reader.read(data);
+ out = builder.getMap();
+}
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.h b/qpid/cpp/src/qpid/broker/amqp/DataReader.h
index 024507e7f2..99ff77b3dd 100644
--- a/qpid/cpp/src/qpid/broker/amqp/DataReader.h
+++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.h
@@ -22,10 +22,15 @@
*
*/
#include "qpid/amqp/Reader.h"
+#include <map>
+#include <string>
struct pn_data_t;
namespace qpid {
+namespace types {
+class Variant;
+}
namespace amqp {
struct Descriptor;
}
@@ -40,6 +45,7 @@ class DataReader
public:
DataReader(qpid::amqp::Reader& reader);
void read(pn_data_t*);
+ static void read(pn_data_t*, std::map<std::string, qpid::types::Variant>&);
private:
qpid::amqp::Reader& reader;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Exception.cpp b/qpid/cpp/src/qpid/broker/amqp/Exception.cpp
new file mode 100644
index 0000000000..6b874aa272
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp/Exception.cpp
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Exception.h"
+
+namespace qpid {
+namespace broker {
+namespace amqp {
+Exception::Exception(const std::string& n, const std::string& d) : name(n), description(d) {}
+Exception::~Exception() throw() {}
+const char* Exception::what() const throw() { return description.c_str(); }
+const char* Exception::symbol() const throw() { return name.c_str(); }
+}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/ConnectionIdentity.h b/qpid/cpp/src/qpid/broker/amqp/Exception.h
index 4e28ca11e3..c2fe470e55 100644
--- a/qpid/cpp/src/qpid/broker/ConnectionIdentity.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Exception.h
@@ -1,5 +1,5 @@
-#ifndef QPID_BROKER_CONNECTIONIDENTITY_H
-#define QPID_BROKER_CONNECTIONIDENTITY_H
+#ifndef QPID_BROKER_AMQP_EXCEPTION_H
+#define QPID_BROKER_AMQP_EXCEPTION_H
/*
*
@@ -21,31 +21,25 @@
* under the License.
*
*/
-
#include <string>
namespace qpid {
-
-namespace management {
-class ObjectId;
-}
-
namespace broker {
-
-class OwnershipToken;
-
-// Interface used to hold Connection authentication and object details for use when authenticating
-// publihed management requests.
-class ConnectionIdentity {
-protected:
- virtual ~ConnectionIdentity() {}
-
-public:
- virtual const OwnershipToken* getOwnership() const = 0;
- virtual const management::ObjectId getObjectId() const = 0;
- virtual const std::string& getUserId() const = 0;
- virtual const std::string& getUrl() const = 0;
+namespace amqp {
+/**
+ * Exception to signal various AMQP 1.0 defined conditions
+ */
+class Exception : public std::exception
+{
+ public:
+ Exception(const std::string& name, const std::string& description);
+ virtual ~Exception() throw();
+ const char* what() const throw();
+ const char* symbol() const throw();
+ private:
+ std::string name;
+ std::string description;
};
+}}} // namespace qpid::broker::amqp
-}}
-#endif // QPID_BROKER_CONNECTIONIDENTITY_H
+#endif /*!QPID_BROKER_AMQP_EXCEPTION_H*/
diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
index 48d9334387..9b110b219d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/amqp/Filter.h"
+#include "qpid/broker/amqp/Authorise.h"
#include "qpid/broker/amqp/DataReader.h"
#include "qpid/broker/amqp/Outgoing.h"
#include "qpid/broker/DirectExchange.h"
@@ -235,6 +236,15 @@ void Filter::configure(QueueSettings& settings)
}
}
+std::string Filter::getBindingKey(boost::shared_ptr<Exchange> exchange) const
+{
+ if (subjectFilter.value.empty() && exchange->getType() == TopicExchange::typeName) {
+ return WILDCARD;
+ } else {
+ return subjectFilter.value;
+ }
+}
+
void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
{
qpid::framing::FieldTable bindingArgs;
@@ -379,5 +389,12 @@ void Filter::MapFilter::writeValue(pn_data_t* data)
pn_data_exit(data);
}
+void Filter::write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target)
+{
+ MapFilter dummy;
+ dummy.value = source;
+ dummy.writeValue(target);
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.h b/qpid/cpp/src/qpid/broker/amqp/Filter.h
index a76928eb01..3f395c9c19 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Filter.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Filter.h
@@ -43,6 +43,7 @@ class Filter : qpid::amqp::MapReader
Filter();
void read(pn_data_t*);
void write(pn_data_t*);
+ std::string getBindingKey(boost::shared_ptr<Exchange> exchange) const;
/**
* Apply filters where source is a queue
@@ -57,6 +58,11 @@ class Filter : qpid::amqp::MapReader
* Bind subscription queue for case where source is an exchange
*/
void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
+
+ /**
+ * Not really the ideal place for this, but the logic is already implemented here...
+ */
+ static void write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target);
private:
struct FilterBase
{
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
index 14614b0b87..119d05af60 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp
@@ -19,8 +19,10 @@
*
*/
#include "Incoming.h"
+#include "Exception.h"
#include "Message.h"
#include "Session.h"
+#include "qpid/amqp/descriptors.h"
#include "qpid/broker/AsyncCompletion.h"
#include "qpid/broker/Message.h"
@@ -60,6 +62,30 @@ void Incoming::wakeup()
{
session.wakeup();
}
+
+void Incoming::verify(const std::string& u, const std::string& r)
+{
+ userid.init(u, r);
+}
+
+Incoming::UserId::UserId() : inDefaultRealm(false) {}
+void Incoming::UserId::init(const std::string& u, const std::string& defaultRealm)
+{
+ userid = u;
+ size_t at = userid.find('@');
+ if (at != std::string::npos) {
+ unqualified = userid.substr(0, at);
+ inDefaultRealm = defaultRealm == userid.substr(at+1);
+ }
+}
+void Incoming::UserId::verify(const std::string& claimed)
+{
+ if(!userid.empty() && !claimed.empty() && userid != claimed && !(inDefaultRealm && claimed == unqualified)) {
+ throw Exception(qpid::amqp::error_conditions::NOT_ALLOWED, QPID_MSG("Authenticated user id is " << userid << " but user id in message declared as " << claimed));
+ }
+}
+
+
namespace {
class Transfer : public qpid::broker::AsyncCompletion::Callback
{
@@ -89,7 +115,7 @@ void DecodingIncoming::readable(pn_delivery_t* delivery)
pn_link_advance(link);
qpid::broker::Message message(received, received);
-
+ userid.verify(message.getUserId());
handle(message);
--window;
received->begin();
diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
index a7c706aed9..8852766eda 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h
@@ -43,12 +43,28 @@ class Incoming : public ManagedIncomingLink
virtual bool haveWork();//called when handling input to see whether any output work is needed
virtual void detached();
virtual void readable(pn_delivery_t* delivery) = 0;
+ void verify(const std::string& userid, const std::string& defaultRealm);
void wakeup();
protected:
+ class UserId
+ {
+ public:
+ UserId();
+ void init(const std::string& userid, const std::string& defaultRealm);
+ void verify(const std::string& claimed);
+ private:
+ std::string userid;
+ bool inDefaultRealm;
+ std::string unqualified;
+ };
+
const uint32_t credit;
uint32_t window;
+
+
pn_link_t* link;
Session& session;
+ UserId userid;
virtual uint32_t getCredit();
};
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
index f7f12be6c4..4741130bd1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp
@@ -82,9 +82,7 @@ void Interconnect::process()
} else {
if ((pn_connection_state(connection) & UNINIT) == UNINIT) {
QPID_LOG_CAT(debug, model, id << " interconnect opened");
- pn_connection_set_container(connection, broker.getFederationTag().c_str());
- pn_connection_open(connection);
- out.connectionEstablished();
+ open();
pn_session_t* s = pn_session(connection);
pn_session_open(s);
@@ -116,4 +114,9 @@ void Interconnect::transportDeleted()
registry.remove(name);
}
+bool Interconnect::isLink() const
+{
+ return true;
+}
+
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
index 230abbc667..64d037dae5 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h
@@ -44,6 +44,7 @@ class Interconnect : public Connection
size_t encode(char* buffer, size_t size);
void deletedFromRegistry();
void transportDeleted();
+ bool isLink() const;
private:
bool incoming;
std::string name;
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
index b62a07d067..d1595b47cc 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp
@@ -54,11 +54,17 @@ ManagedConnection::~ManagedConnection()
QPID_LOG_CAT(debug, model, "Delete connection. user:" << userid << " rhost:" << id);
}
-void ManagedConnection::setUserid(const std::string& uid)
+void ManagedConnection::setUserId(const std::string& uid)
{
userid = uid;
- if (agent && connection) {
+ if (connection) {
connection->set_authIdentity(userid);
+ }
+}
+
+void ManagedConnection::opened()
+{
+ if (agent) {
agent->raiseEvent(_qmf::EventClientConnect(id, userid, connection->get_remoteProperties()));
}
QPID_LOG_CAT(debug, model, "Create connection. user:" << userid << " rhost:" << id );
@@ -78,13 +84,20 @@ void ManagedConnection::setSaslSsf(int ssf)
}
}
+void ManagedConnection::setPeerProperties(std::map<std::string, types::Variant>& p)
+{
+ peerProperties = p;
+ if (connection) {
+ connection->set_remoteProperties(peerProperties);
+ }
+}
+
void ManagedConnection::setContainerId(const std::string& container)
{
containerid = container;
+ peerProperties["container-id"] = containerid;
if (connection) {
- qpid::types::Variant::Map props;
- props["container-id"] = containerid;
- connection->set_remoteProperties(props);
+ connection->set_remoteProperties(peerProperties);
}
}
const std::string& ManagedConnection::getContainerId() const
@@ -98,7 +111,31 @@ qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementO
}
std::string ManagedConnection::getId() const { return id; }
-std::string ManagedConnection::getUserid() const { return userid; }
+
+const OwnershipToken* ManagedConnection::getOwnership() const
+{
+ return this;
+}
+const management::ObjectId ManagedConnection::getObjectId() const
+{
+ return GetManagementObject()->getObjectId();
+}
+const std::string& ManagedConnection::getUserId() const
+{
+ return userid;
+}
+const std::string& ManagedConnection::getMgmtId() const
+{
+ return id;
+}
+const std::map<std::string, types::Variant>& ManagedConnection::getClientProperties() const
+{
+ return connection->get_remoteProperties();
+}
+bool ManagedConnection::isLink() const
+{
+ return false;
+}
bool ManagedConnection::isLocal(const OwnershipToken* t) const
{
diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
index 634c0fca99..a9f90cefcf 100644
--- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h
@@ -22,7 +22,9 @@
*
*/
#include "qpid/management/Manageable.h"
+#include "qpid/broker/Connection.h"
#include "qpid/broker/OwnershipToken.h"
+#include "qpid/types/Variant.h"
#include "qmf/org/apache/qpid/broker/Connection.h"
namespace qpid {
@@ -34,28 +36,38 @@ namespace broker {
class Broker;
namespace amqp {
-class ManagedConnection : public qpid::management::Manageable, public OwnershipToken
+class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection
{
public:
ManagedConnection(Broker& broker, const std::string id);
virtual ~ManagedConnection();
- void setUserid(const std::string&);
+ virtual void setUserId(const std::string&);
std::string getId() const;
- std::string getUserid() const;
void setSaslMechanism(const std::string&);
void setSaslSsf(int);
void setContainerId(const std::string&);
const std::string& getContainerId() const;
+ void setPeerProperties(std::map<std::string, types::Variant>&);
qpid::management::ManagementObject::shared_ptr GetManagementObject() const;
bool isLocal(const OwnershipToken* t) const;
void incomingMessageReceived();
void outgoingMessageSent();
+
+ //ConnectionIdentity
+ const OwnershipToken* getOwnership() const;
+ const management::ObjectId getObjectId() const;
+ const std::string& getUserId() const;
+ const std::string& getMgmtId() const;
+ const std::map<std::string, types::Variant>& getClientProperties() const;
+ virtual bool isLink() const;
+ void opened();
private:
const std::string id;
std::string userid;
std::string containerid;
qmf::org::apache::qpid::broker::Connection::shared_ptr connection;
qpid::management::ManagementAgent* agent;
+ std::map<std::string, types::Variant> peerProperties;
};
}}} // namespace qpid::broker::amqp
diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
index 820aaf87d4..1ce5586ace 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp
@@ -139,7 +139,7 @@ void Sasl::respond(qpid::SaslServer::Status status, const std::string& chllnge)
{
switch (status) {
case qpid::SaslServer::OK:
- connection.setUserid(authenticator->getUserid());
+ connection.setUserId(authenticator->getUserid());
completed(true);
//can't set authenticated & failed until we have actually sent the outcome
state = SUCCESS_PENDING;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index d1bab7f775..f90bfd1cd9 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -120,14 +120,16 @@ class IncomingToQueue : public DecodingIncoming
class IncomingToExchange : public DecodingIncoming
{
public:
- IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {}
+ IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source)
+ : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {}
void handle(qpid::broker::Message& m);
private:
boost::shared_ptr<qpid::broker::Exchange> exchange;
+ Authorise& authorise;
};
Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o)
- : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {}
+ : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {}
Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming)
@@ -140,11 +142,11 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te
//is it a queue or an exchange?
node.properties.read(pn_terminus_properties(terminus));
if (node.properties.isQueue()) {
- node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first;
+ node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first;
} else {
qpid::framing::FieldTable args;
node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(),
- args, connection.getUserid(), connection.getId()).first;
+ args, connection.getUserId(), connection.getId()).first;
}
} else {
size_t i = name.find('@');
@@ -236,8 +238,13 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
{
ResolvedNode node = resolve(name, target, true);
//set capabilities
- if (node.queue) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
- else if (node.exchange) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+ if (node.queue) {
+ setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
+ authorise.incoming(node.queue);
+ } else if (node.exchange) {
+ setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+ authorise.incoming(node.exchange);
+ }
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
if (!sourceAddress) {
@@ -260,6 +267,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
throw qpid::Exception("Node not found: " + name);/*not-found*/
}
+ if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm);
QPID_LOG(debug, "Incoming link attached");
}
@@ -282,11 +290,13 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
if (node.queue) {
+ authorise.outgoing(node.queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false));
q->init();
filter.apply(q);
outgoing[link] = q;
} else if (node.exchange) {
+ authorise.access(node.exchange);//do separate access check before trying to create the queue
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));
bool durable = pn_terminus_get_durability(source);
QueueSettings settings(durable, !durable);
@@ -295,7 +305,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
std::stringstream queueName;
if (shared) {
//just use link name (TODO: could allow this to be
- //overridden when acces to link properties is provided
+ //overridden when access to link properties is provided
//(PROTON-335))
queueName << pn_link_name(link);
} else {
@@ -303,9 +313,9 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s
queueName << connection.getContainerId() << "_" << pn_link_name(link);
}
boost::shared_ptr<qpid::broker::Queue> queue
- = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first;
+ = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first;
if (!shared) queue->setExclusiveOwner(this);
-
+ authorise.outgoing(node.exchange, queue, filter);
filter.bind(node.exchange, queue);
boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared));
outgoing[link] = q;
@@ -460,6 +470,11 @@ void Session::wakeup()
out.activateOutput();
}
+Authorise& Session::getAuthorise()
+{
+ return authorise;
+}
+
void IncomingToQueue::handle(qpid::broker::Message& message)
{
queue->deliver(message);
@@ -467,6 +482,7 @@ void IncomingToQueue::handle(qpid::broker::Message& message)
void IncomingToExchange::handle(qpid::broker::Message& message)
{
+ authorise.route(exchange, message);
DeliverableMessage deliverable(message, 0);
exchange->route(deliverable);
}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h
index 19922f3ee1..78d44a1a18 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.h
@@ -23,6 +23,7 @@
*/
#include "qpid/sys/Mutex.h"
#include "qpid/sys/OutputControl.h"
+#include "qpid/broker/amqp/Authorise.h"
#include "qpid/broker/amqp/ManagedSession.h"
#include "qpid/broker/amqp/NodeProperties.h"
#include <deque>
@@ -75,6 +76,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
void accepted(pn_delivery_t*, bool sync);
void wakeup();
+
+ Authorise& getAuthorise();
private:
typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks;
typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks;
@@ -88,6 +91,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses
bool deleted;
qpid::sys::Mutex lock;
std::set< boost::shared_ptr<Queue> > exclusiveQueues;
+ Authorise authorise;
+
struct ResolvedNode
{
boost::shared_ptr<qpid::broker::Exchange> exchange;
diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
index a127f9bee2..6732b66ed4 100644
--- a/qpid/cpp/src/qpid/broker/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
@@ -18,7 +18,7 @@
* under the License.
*
*/
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/SessionOutputException.h"
@@ -59,6 +59,7 @@ namespace _qmf = qmf::org::apache::qpid::broker;
namespace qpid {
namespace broker {
+namespace amqp_0_10 {
struct ConnectionTimeoutTask : public sys::TimerTask {
sys::Timer& timer;
@@ -160,7 +161,6 @@ Connection::Connection(ConnectionOutputHandler* out_,
mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10"));
agent->addObject(mgmtObject, objectId);
}
- setUrl(mgmtId);
}
}
@@ -542,4 +542,4 @@ void Connection::restartTimeout()
bool Connection::isOpen() { return adapter.isOpen(); }
-}}
+}}}
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
new file mode 100644
index 0000000000..5411b883ef
--- /dev/null
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h
@@ -0,0 +1,235 @@
+#ifndef QPID_BROKER_AMQP_0_10_CONNECTION_H
+#define QPID_BROKER_AMQP_0_10_CONNECTION_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <memory>
+#include <sstream>
+#include <vector>
+#include <queue>
+
+#include "qpid/broker/BrokerImportExport.h"
+
+#include "qpid/broker/ConnectionHandler.h"
+#include "qpid/broker/Connection.h"
+#include "qpid/broker/OwnershipToken.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/SecuritySettings.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/types/Variant.h"
+#include "qpid/RefCounted.h"
+#include "qpid/Url.h"
+#include "qpid/ptr_map.h"
+
+#include "qmf/org/apache/qpid/broker/Connection.h"
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
+
+namespace qpid {
+namespace sys {
+class ConnectionOutputHandler;
+class Timer;
+class TimerTask;
+}
+namespace broker {
+
+class Broker;
+class LinkRegistry;
+class Queue;
+class SecureConnection;
+class SessionHandler;
+
+namespace amqp_0_10 {
+struct ConnectionTimeoutTask;
+
+class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection,
+ public OwnershipToken, public management::Manageable,
+ public RefCounted
+{
+ public:
+ uint32_t getFrameMax() const { return framemax; }
+ uint16_t getHeartbeat() const { return heartbeat; }
+ uint16_t getHeartbeatMax() const { return heartbeatmax; }
+
+ void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); }
+ void setHeartbeat(uint16_t hb) { heartbeat = hb; }
+ void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; }
+
+
+ const OwnershipToken* getOwnership() const { return this; };
+ const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); };
+ const std::string& getUserId() const { return userId; }
+
+ void setUserProxyAuth(const bool b);
+ bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids
+ bool isFederationLink() const { return federationPeerTag.size() > 0; }
+ void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); }
+ const std::string& getFederationPeerTag() const { return federationPeerTag; }
+ std::vector<Url>& getKnownHosts() { return knownHosts; }
+
+ /**@return true if user is the authenticated user on this connection.
+ * If id has the default realm will also compare plain username.
+ */
+ bool isAuthenticatedUser(const std::string& id) const {
+ return (id == userId || (isDefaultRealm && id == userName));
+ }
+
+ Broker& getBroker() { return broker; }
+
+ sys::ConnectionOutputHandler& getOutput() { return *out; }
+ void activateOutput();
+ void addOutputTask(OutputTask*);
+ void removeOutputTask(OutputTask*);
+ framing::ProtocolVersion getVersion() const { return version; }
+
+ Connection(sys::ConnectionOutputHandler* out,
+ Broker& broker,
+ const std::string& mgmtId,
+ const qpid::sys::SecuritySettings&,
+ bool isLink = false,
+ uint64_t objectId = 0);
+
+ ~Connection ();
+
+ /** Get the SessionHandler for channel. Create if it does not already exist */
+ SessionHandler& getChannel(framing::ChannelId channel);
+
+ /** Close the connection. Waits for the client to respond with close-ok
+ * before actually destroying the connection.
+ */
+ QPID_BROKER_EXTERN void close(
+ framing::connection::CloseCode code, const std::string& text);
+
+ /** Abort the connection. Close abruptly and immediately. */
+ QPID_BROKER_EXTERN void abort();
+
+ // ConnectionInputHandler methods
+ void received(framing::AMQFrame& frame);
+ bool doOutput();
+ void closed();
+
+ void closeChannel(framing::ChannelId channel);
+
+ // Manageable entry points
+ management::ManagementObject::shared_ptr GetManagementObject(void) const;
+ management::Manageable::status_t
+ ManagementMethod (uint32_t methodId, management::Args& args, std::string&);
+
+ void requestIOProcessing (boost::function0<void>);
+ void recordFromServer (const framing::AMQFrame& frame);
+ void recordFromClient (const framing::AMQFrame& frame);
+
+ // gets for configured federation links
+ std::string getAuthMechanism();
+ std::string getAuthCredentials();
+ std::string getUsername();
+ std::string getPassword();
+ std::string getHost();
+ uint16_t getPort();
+
+ void notifyConnectionForced(const std::string& text);
+ void setUserId(const std::string& uid);
+
+ // credentials for connected client
+ const std::string& getMgmtId() const { return mgmtId; }
+ management::ManagementAgent* getAgent() const { return agent; }
+
+ void setHeartbeatInterval(uint16_t heartbeat);
+ void sendHeartbeat();
+ void restartTimeout();
+
+ void setSecureConnection(SecureConnection* secured);
+
+ const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
+ {
+ return securitySettings;
+ }
+
+ /** @return true if the initial connection negotiation is complete. */
+ bool isOpen();
+
+ bool isLink() const { return link; }
+ void startLinkHeartbeatTimeoutTask();
+
+ void setClientProperties(const types::Variant::Map& cp) { clientProperties = cp; }
+ const types::Variant::Map& getClientProperties() const { return clientProperties; }
+
+ private:
+ // Management object is used in the constructor so must be early
+ qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject;
+
+ //contained output tasks
+ sys::AggregateOutput outputTasks;
+
+ boost::scoped_ptr<framing::FrameHandler> outboundTracker;
+ boost::scoped_ptr<sys::ConnectionOutputHandler> out;
+
+ Broker& broker;
+
+ framing::ProtocolVersion version;
+ uint32_t framemax;
+ uint16_t heartbeat;
+ uint16_t heartbeatmax;
+ std::string userId;
+ bool userProxyAuth;
+ std::string federationPeerTag;
+ std::vector<Url> knownHosts;
+ std::string userName;
+ bool isDefaultRealm;
+
+ typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
+
+ ChannelMap channels;
+ qpid::sys::SecuritySettings securitySettings;
+ const bool link;
+ ConnectionHandler adapter;
+ bool mgmtClosing;
+ const std::string mgmtId;
+ sys::Mutex ioCallbackLock;
+ std::queue<boost::function0<void> > ioCallbacks;
+ LinkRegistry& links;
+ management::ManagementAgent* agent;
+ sys::Timer& timer;
+ boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer;
+ boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
+ uint64_t objectId;
+ types::Variant::Map clientProperties;
+
+friend class OutboundFrameTracker;
+
+ void sent(const framing::AMQFrame& f);
+ void doIoCallbacks();
+
+ public:
+
+ qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; }
+};
+
+}}}
+
+#endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/
diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
index bb706e53b1..42ced2988d 100644
--- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
+++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
@@ -22,7 +22,7 @@
#include "HaBroker.h"
#include "QueueReplicator.h"
#include "qpid/broker/Broker.h"
-#include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
#include "qpid/broker/ConnectionObserver.h"
#include "qpid/broker/Queue.h"
#include "qpid/broker/QueueSettings.h"
@@ -378,7 +378,7 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler)
connection = link->getConnection();
assert(connection);
userId = link->getConnection()->getUserId();
- remoteHost = link->getConnection()->getUrl();
+ remoteHost = link->getConnection()->getMgmtId();
link->getRemoteAddress(primary);
string queueName = bridge.getQueueName();
diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
index 66e841e988..c9c5c2e576 100644
--- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
+++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp
@@ -23,7 +23,7 @@
#include "BrokerInfo.h"
#include "HaBroker.h"
#include "qpid/Url.h"
-#include "qpid/framing/FieldTable.h"
+#include "qpid/types/Variant.h"
#include "qpid/broker/Connection.h"
#include "qpid/log/Statement.h"
@@ -34,21 +34,23 @@ ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid)
: haBroker(hb), logPrefix("Backup: "), self(uuid) {}
bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) {
- framing::FieldTable ft;
- if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) {
- info = BrokerInfo(ft);
+ qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG);
+ if (i != connection.getClientProperties().end() && i->second.getType() == qpid::types::VAR_MAP) {
+ info = BrokerInfo(i->second.asMap());
return true;
}
return false;
}
bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) {
- Url url;
- url.parseNoThrow(
- connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str());
- if (!url.empty()) {
- addr = url[0];
- return true;
+ qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::ADDRESS_TAG);
+ if (i != connection.getClientProperties().end()) {
+ Url url;
+ url.parseNoThrow(i->second.asString().c_str());
+ if (!url.empty()) {
+ addr = url[0];
+ return true;
+ }
}
return false;
}
@@ -86,7 +88,7 @@ void ConnectionObserver::opened(broker::Connection& connection) {
return;
}
if (connection.isLink()) return; // Allow outgoing links.
- if (connection.getClientProperties().isSet(ADMIN_TAG)) {
+ if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) {
QPID_LOG(debug, logPrefix << "Accepted admin connection: "
<< connection.getMgmtId());
return; // No need to call observer, always allow admins.
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index c5c979bfb0..5f653939ec 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -88,10 +88,27 @@ const string keyifyNameStr(const string& name)
struct ScopedManagementContext
{
- ScopedManagementContext(const ConnectionIdentity& p)
+ const Connection* context;
+
+ ScopedManagementContext(const Connection* p) : context(p)
+ {
+ if (p) setManagementExecutionContext(*p);
+ }
+
+ management::ObjectId getObjectId() const
+ {
+ return context ? context->getObjectId() : management::ObjectId();
+ }
+ std::string getUserId() const
+ {
+ return context ? context->getUserId() : std::string();
+ }
+ std::string getMgmtId() const
{
- setManagementExecutionContext(p);
+ return context ? context->getMgmtId() : std::string();
}
+
+
~ScopedManagementContext()
{
resetManagementExecutionContext();
@@ -2288,7 +2305,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
}
if (opcode == "_method_request")
- return handleMethodRequest(body, rte, rtk, cid, msg.getPublisherUserId(), viaLocal);
+ return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal);
else if (opcode == "_query_request")
return handleGetQuery(body, rte, rtk, cid, viaLocal);
else if (opcode == "_agent_locate_request")
@@ -2311,9 +2328,9 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence);
else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence);
else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence);
- else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisherObjectId());
+ else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, context.getObjectId());
else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence);
- else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisherUserId());
+ else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId());
}
}
@@ -2752,10 +2769,10 @@ ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents(
}
namespace {
-QPID_TSS const ConnectionIdentity* currentPublisher = 0;
+QPID_TSS const Connection* currentPublisher = 0;
}
-void setManagementExecutionContext(const ConnectionIdentity& p)
+void setManagementExecutionContext(const Connection& p)
{
currentPublisher = &p;
}
@@ -2765,7 +2782,7 @@ void resetManagementExecutionContext()
currentPublisher = 0;
}
-const ConnectionIdentity* getCurrentPublisher()
+const Connection* getCurrentPublisher()
{
return currentPublisher;
}
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index cb8bb588b9..d2869a705f 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -44,7 +44,6 @@
namespace qpid {
namespace broker {
class Connection;
-class ConnectionIdentity;
}
namespace sys {
class Timer;
@@ -379,9 +378,9 @@ private:
std::auto_ptr<EventQueue> sendQueue;
};
-void setManagementExecutionContext(const broker::ConnectionIdentity&);
+void setManagementExecutionContext(const broker::Connection&);
void resetManagementExecutionContext();
-const broker::ConnectionIdentity* getCurrentPublisher();
+const broker::Connection* getCurrentPublisher();
}}
#endif /*!_ManagementAgent_*/
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 869d0caebc..4553ebddb3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -34,6 +34,7 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/SystemInfo.h"
#include "qpid/sys/Time.h"
#include <vector>
extern "C" {
@@ -125,6 +126,7 @@ void ConnectionContext::open()
}
QPID_LOG(debug, id << " Opening...");
+ setProperties();
pn_connection_open(connection);
wakeupDriver(); //want to write
while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
@@ -148,7 +150,7 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn)
//wait for outstanding sends to settle
while (!ssn->settled()) {
QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait();//wait until message has been confirmed
+ wait(ssn);//wait until message has been confirmed
}
pn_session_close(ssn->session);
@@ -165,7 +167,7 @@ void ConnectionContext::close()
//wait for outstanding sends to settle
while (!i->second->settled()) {
QPID_LOG(debug, "Waiting for sends to settle before closing");
- wait();//wait until message has been confirmed
+ wait(i->second);//wait until message has been confirmed
}
@@ -304,6 +306,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
}
lnk->verify(t);
+ checkClosed(ssn, lnk);
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
@@ -322,6 +325,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
}
lnk->verify(s);
+ checkClosed(ssn, lnk);
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
@@ -471,8 +475,15 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li
{
checkClosed(ssn);
if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_condition_t* error = pn_link_remote_condition(lnk);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Link detached by peer";
+ }
pn_link_close(lnk);
- throw qpid::messaging::LinkError("Link detached by peer");
+ throw qpid::messaging::LinkError(text.str());
} else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) {
throw qpid::messaging::LinkError("Link is not attached");
}
@@ -692,5 +703,39 @@ bool ConnectionContext::CodecSwitch::canEncode()
return parent.canEncode();
}
+namespace {
+const std::string CLIENT_PROCESS_NAME("qpid.client_process");
+const std::string CLIENT_PID("qpid.client_pid");
+const std::string CLIENT_PPID("qpid.client_ppid");
+pn_bytes_t convert(const std::string& s)
+{
+ pn_bytes_t result;
+ result.start = const_cast<char*>(s.data());
+ result.size = s.size();
+ return result;
+}
+}
+void ConnectionContext::setProperties()
+{
+ /**
+ * Enable when proton 0.5 is released and qpidc has been updated
+ * to use it
+ *
+ pn_data_t* data = pn_connection_properties(connection);
+ pn_data_put_map(data);
+ pn_data_enter(data);
+
+ pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME));
+ std::string processName = sys::SystemInfo::getProcessName();
+ pn_data_put_string(data, convert(processName));
+
+ pn_data_put_symbol(data, convert(CLIENT_PID));
+ pn_data_put_int(data, sys::SystemInfo::getProcessId());
+
+ pn_data_put_symbol(data, convert(CLIENT_PPID));
+ pn_data_put_int(data, sys::SystemInfo::getParentProcessId());
+ pn_data_exit(data);
+ **/
+}
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index fbff27c288..5627bd903d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -150,6 +150,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
std::size_t writeProtocolHeader(char* buffer, std::size_t size);
std::string getError();
bool useSasl();
+ void setProperties();
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 0fa97ab933..d4a5ca4292 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -93,8 +93,24 @@ SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& mes
}
}
+void SenderContext::check()
+{
+ if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) {
+ pn_condition_t* error = pn_link_remote_condition(sender);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Link detached by peer";
+ }
+ pn_link_close(sender);
+ throw qpid::messaging::LinkError(text.str());
+ }
+}
+
uint32_t SenderContext::processUnsettled()
{
+ check();
//remove messages from front of deque once peer has confirmed receipt
while (!deliveries.empty() && deliveries.front().delivered()) {
deliveries.front().settle();
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 81d306bab3..e389cd2e35 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -74,6 +74,7 @@ class SenderContext
Delivery* send(const qpid::messaging::Message& message);
void configure();
void verify(pn_terminus_t*);
+ void check();
bool settled();
Address getAddress() const;
private:
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 64462215f3..9815721fa0 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -144,7 +144,12 @@ bool SessionContext::settled()
{
bool result = true;
for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
- if (!i->second->settled()) result = false;
+ try {
+ if (!i->second->settled()) result = false;
+ } catch (const std::exception&) {
+ senders.erase(i);
+ throw;
+ }
}
return result;
}