diff options
| author | Gordon Sim <gsim@apache.org> | 2013-06-25 13:28:15 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-06-25 13:28:15 +0000 |
| commit | 3615070a058ee43b3305d6b4464ee3a6e39e7b99 (patch) | |
| tree | 6bcdc2593132f88e02f7c3ecbc35c6e827322531 /qpid/cpp/src | |
| parent | 59b8d464a2a3b36f0985c10c057e14b284e3bc7c (diff) | |
| download | qpid-python-3615070a058ee43b3305d6b4464ee3a6e39e7b99.tar.gz | |
QPID-4712: authorisation for AMQP 1.0 connections
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1496466 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
61 files changed, 1222 insertions, 410 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 48279a2aed..3ecbac5cc2 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1034,6 +1034,8 @@ set (qpidcommon_SOURCES qpid/amqp/MapEncoder.cpp qpid/amqp/MapSizeCalculator.h qpid/amqp/MapSizeCalculator.cpp + qpid/amqp/MapBuilder.h + qpid/amqp/MapBuilder.cpp qpid/amqp/MapReader.h qpid/amqp/MapReader.cpp qpid/amqp/MessageEncoder.h @@ -1261,7 +1263,7 @@ set (qpidbroker_SOURCES qpid/broker/MessageGroupManager.cpp qpid/broker/PersistableMessage.cpp qpid/broker/Bridge.cpp - qpid/broker/Connection.cpp + qpid/broker/amqp_0_10/Connection.cpp qpid/broker/ConnectionHandler.cpp qpid/broker/DeliverableMessage.cpp qpid/broker/DeliveryRecord.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index a3825f7508..942575ec3d 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -536,6 +536,8 @@ libqpidcommon_la_SOURCES += \ qpid/amqp/MapEncoder.cpp \ qpid/amqp/MapSizeCalculator.h \ qpid/amqp/MapSizeCalculator.cpp \ + qpid/amqp/MapBuilder.h \ + qpid/amqp/MapBuilder.cpp \ qpid/amqp/MapReader.h \ qpid/amqp/MapReader.cpp \ qpid/amqp/MessageEncoder.h \ @@ -598,15 +600,15 @@ libqpidbroker_la_SOURCES = \ qpid/broker/Broker.cpp \ qpid/broker/Broker.h \ qpid/broker/BrokerImportExport.h \ - qpid/broker/Connection.cpp \ - qpid/broker/Connection.h \ + qpid/broker/amqp_0_10/Connection.cpp \ + qpid/broker/amqp_0_10/Connection.h \ qpid/broker/ConnectionHandler.cpp \ qpid/broker/ConnectionHandler.h \ qpid/broker/Consumer.h \ qpid/broker/Credit.h \ qpid/broker/Credit.cpp \ qpid/broker/ConsumerFactory.h \ - qpid/broker/ConnectionIdentity.h \ + qpid/broker/Connection.h \ qpid/broker/ConnectionObserver.h \ qpid/broker/ConnectionObservers.h \ qpid/broker/ConfigurationObserver.h \ @@ -801,12 +803,16 @@ if HAVE_PROTON dmoduleexec_LTLIBRARIES += amqp.la amqp_la_LIBADD = libqpidcommon.la amqp_la_SOURCES = \ + qpid/broker/amqp/Authorise.h \ + qpid/broker/amqp/Authorise.cpp \ qpid/broker/amqp/Connection.h \ qpid/broker/amqp/Connection.cpp \ qpid/broker/amqp/DataReader.h \ qpid/broker/amqp/DataReader.cpp \ qpid/broker/amqp/Domain.h \ qpid/broker/amqp/Domain.cpp \ + qpid/broker/amqp/Exception.h \ + qpid/broker/amqp/Exception.cpp \ qpid/broker/amqp/Filter.h \ qpid/broker/amqp/Filter.cpp \ qpid/broker/amqp/Header.h \ diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index a2ee1d1cb6..5875dd9f9d 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -53,12 +53,16 @@ if (BUILD_AMQP) set (amqp_SOURCES + qpid/broker/amqp/Authorise.h + qpid/broker/amqp/Authorise.cpp qpid/broker/amqp/Connection.h qpid/broker/amqp/Connection.cpp qpid/broker/amqp/DataReader.h qpid/broker/amqp/DataReader.cpp qpid/broker/amqp/Domain.h qpid/broker/amqp/Domain.cpp + qpid/broker/amqp/Exception.h + qpid/broker/amqp/Exception.cpp qpid/broker/amqp/Filter.h qpid/broker/amqp/Filter.cpp qpid/broker/amqp/Header.h diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.cpp b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp new file mode 100644 index 0000000000..a554497791 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapBuilder.cpp @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapBuilder.h" +#include <assert.h> + +namespace qpid { +namespace amqp { +namespace { +const std::string BINARY("binary"); +const std::string UTF8("utf8"); +const std::string ASCII("ascii"); +} + +qpid::types::Variant::Map MapBuilder::getMap() +{ + return map; +} +const qpid::types::Variant::Map MapBuilder::getMap() const +{ + return map; +} + +void MapBuilder::onNullValue(const CharSequence& key, const Descriptor*) +{ + map[std::string(key.data, key.size)] = qpid::types::Variant(); +} +void MapBuilder::onBooleanValue(const CharSequence& key, bool value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} +void MapBuilder::onUByteValue(const CharSequence& key, uint8_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onUShortValue(const CharSequence& key, uint16_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onUIntValue(const CharSequence& key, uint32_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onULongValue(const CharSequence& key, uint64_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onByteValue(const CharSequence& key, int8_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onShortValue(const CharSequence& key, int16_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onIntValue(const CharSequence& key, int32_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onLongValue(const CharSequence& key, int64_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onFloatValue(const CharSequence& key, float value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onDoubleValue(const CharSequence& key, double value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onUuidValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + assert(value.size == 16); + map[std::string(key.data, key.size)] = qpid::types::Uuid(value.data); +} + +void MapBuilder::onTimestampValue(const CharSequence& key, int64_t value, const Descriptor*) +{ + map[std::string(key.data, key.size)] = value; +} + +void MapBuilder::onBinaryValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + qpid::types::Variant& v = map[std::string(key.data, key.size)]; + v = std::string(value.data, value.size); + v.setEncoding(BINARY); +} + +void MapBuilder::onStringValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + qpid::types::Variant& v = map[std::string(key.data, key.size)]; + v = std::string(value.data, value.size); + v.setEncoding(UTF8); +} + +void MapBuilder::onSymbolValue(const CharSequence& key, const CharSequence& value, const Descriptor*) +{ + qpid::types::Variant& v = map[std::string(key.data, key.size)]; + v = std::string(value.data, value.size); + v.setEncoding(ASCII); +} +}} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/MapBuilder.h b/qpid/cpp/src/qpid/amqp/MapBuilder.h new file mode 100644 index 0000000000..0e3b95f633 --- /dev/null +++ b/qpid/cpp/src/qpid/amqp/MapBuilder.h @@ -0,0 +1,63 @@ +#ifndef QPID_AMQP_MAPBUILDER_H +#define QPID_AMQP_MAPBUILDER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "MapReader.h" +#include "qpid/types/Variant.h" + +namespace qpid { +namespace amqp { + +/** + * Utility to build a Variant::Map from a data stream (doesn't handle + * nested maps or lists yet) + */ +class MapBuilder : public MapReader +{ + public: + void onNullValue(const CharSequence& /*key*/, const Descriptor*); + void onBooleanValue(const CharSequence& /*key*/, bool, const Descriptor*); + void onUByteValue(const CharSequence& /*key*/, uint8_t, const Descriptor*); + void onUShortValue(const CharSequence& /*key*/, uint16_t, const Descriptor*); + void onUIntValue(const CharSequence& /*key*/, uint32_t, const Descriptor*); + void onULongValue(const CharSequence& /*key*/, uint64_t, const Descriptor*); + void onByteValue(const CharSequence& /*key*/, int8_t, const Descriptor*); + void onShortValue(const CharSequence& /*key*/, int16_t, const Descriptor*); + void onIntValue(const CharSequence& /*key*/, int32_t, const Descriptor*); + void onLongValue(const CharSequence& /*key*/, int64_t, const Descriptor*); + void onFloatValue(const CharSequence& /*key*/, float, const Descriptor*); + void onDoubleValue(const CharSequence& /*key*/, double, const Descriptor*); + void onUuidValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); + void onTimestampValue(const CharSequence& /*key*/, int64_t, const Descriptor*); + + void onBinaryValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); + void onStringValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); + void onSymbolValue(const CharSequence& /*key*/, const CharSequence&, const Descriptor*); + + qpid::types::Variant::Map getMap(); + const qpid::types::Variant::Map getMap() const; + private: + qpid::types::Variant::Map map; +}; +}} // namespace qpid::amqp + +#endif /*!QPID_AMQP_MAPBUILDER_H*/ diff --git a/qpid/cpp/src/qpid/amqp/descriptors.h b/qpid/cpp/src/qpid/amqp/descriptors.h index 6545433947..2a5691beaf 100644 --- a/qpid/cpp/src/qpid/amqp/descriptors.h +++ b/qpid/cpp/src/qpid/amqp/descriptors.h @@ -89,6 +89,15 @@ const uint64_t SELECTOR_FILTER_CODE(0x0000468C00000004ULL); const uint64_t XQUERY_FILTER_CODE(0x0000468C00000005ULL); } +namespace error_conditions { +//note these are not actually descriptors +const std::string INTERNAL_ERROR("amqp:internal-error"); +const std::string NOT_FOUND("amqp:not-found"); +const std::string UNAUTHORIZED_ACCESS("amqp:unauthorized-access"); +const std::string DECODE_ERROR("amqp:decode-error"); +const std::string NOT_ALLOWED("amqp:not-allowed"); +const std::string RESOURCE_LIMIT_EXCEEDED("amqp:resource-limit-exceeded"); +} }} // namespace qpid::amqp #endif /*!QPID_AMQP_DESCRIPTORS_H*/ diff --git a/qpid/cpp/src/qpid/broker/Bridge.cpp b/qpid/cpp/src/qpid/broker/Bridge.cpp index 0c2655f507..6b34898158 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.cpp +++ b/qpid/cpp/src/qpid/broker/Bridge.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/FedOps.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/SessionState.h" @@ -100,7 +100,7 @@ Bridge::~Bridge() mgmtObject->resourceDestroy(); } -void Bridge::create(Connection& c) +void Bridge::create(amqp_0_10::Connection& c) { detached = false; // Reset detached in case we are recovering. conn = &c; @@ -200,7 +200,7 @@ void Bridge::create(Connection& c) if (args.i_srcIsLocal) sessionHandler.getSession()->enableReceiverTracking(); } -void Bridge::cancel(Connection&) +void Bridge::cancel(amqp_0_10::Connection&) { if (resetProxy()) { peer->getMessage().cancel(args.i_dest); diff --git a/qpid/cpp/src/qpid/broker/Bridge.h b/qpid/cpp/src/qpid/broker/Bridge.h index 54a5f1600a..604a8473f3 100644 --- a/qpid/cpp/src/qpid/broker/Bridge.h +++ b/qpid/cpp/src/qpid/broker/Bridge.h @@ -39,8 +39,9 @@ namespace qpid { namespace broker { - +namespace amqp_0_10 { class Connection; +} class Link; class LinkRegistry; @@ -115,9 +116,9 @@ class Bridge : public PersistableConfig, void setErrorListener(boost::shared_ptr<ErrorListener> e) { errorListener = e; } private: struct PushHandler : framing::FrameHandler { - PushHandler(Connection* c) { conn = c; } + PushHandler(amqp_0_10::Connection* c) { conn = c; } void handle(framing::AMQFrame& frame); - Connection* conn; + amqp_0_10::Connection* conn; }; std::auto_ptr<PushHandler> pushHandler; @@ -134,14 +135,14 @@ class Bridge : public PersistableConfig, std::string queueName; std::string altEx; mutable uint64_t persistenceId; - Connection* conn; + amqp_0_10::Connection* conn; InitializeCallback initialize; bool detached; // Set when session is detached. bool resetProxy(); // connection Management (called by owning Link) - void create(Connection& c); - void cancel(Connection& c); + void create(amqp_0_10::Connection& c); + void cancel(amqp_0_10::Connection& c); void closed(); friend class Link; // to call create, cancel, closed() boost::shared_ptr<ErrorListener> errorListener; diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 8cdfd42f02..bbcd7de017 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/AclModule.h" -#include "qpid/broker/ConnectionIdentity.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/HeadersExchange.h" @@ -707,13 +707,13 @@ struct InvalidParameter : public qpid::Exception }; void Broker::createObject(const std::string& type, const std::string& name, - const Variant::Map& properties, bool /*strict*/, const ConnectionIdentity* context) + const Variant::Map& properties, bool /*strict*/, const Connection* context) { std::string userId; std::string connectionId; if (context) { userId = context->getUserId(); - connectionId = context->getUrl(); + connectionId = context->getMgmtId(); } //TODO: implement 'strict' option (check there are no unrecognised properties) QPID_LOG (debug, "Broker::create(" << type << ", " << name << "," << properties << ")"); @@ -898,13 +898,13 @@ void Broker::createObject(const std::string& type, const std::string& name, } void Broker::deleteObject(const std::string& type, const std::string& name, - const Variant::Map& options, const ConnectionIdentity* context) + const Variant::Map& options, const Connection* context) { std::string userId; std::string connectionId; if (context) { userId = context->getUserId(); - connectionId = context->getUrl(); + connectionId = context->getMgmtId(); } QPID_LOG (debug, "Broker::delete(" << type << ", " << name << "," << options << ")"); if (objectFactory.deleteObject(*this, type, name, options, userId, connectionId)) { @@ -952,13 +952,13 @@ void Broker::checkDeleteQueue(Queue::shared_ptr queue, bool ifUnused, bool ifEmp Manageable::status_t Broker::queryObject(const std::string& type, const std::string& name, Variant::Map& results, - const ConnectionIdentity* context) + const Connection* context) { std::string userId; std::string connectionId; if (context) { userId = context->getUserId(); - connectionId = context->getUrl(); + connectionId = context->getMgmtId(); } QPID_LOG (debug, "Broker::query(" << type << ", " << name << ")"); @@ -994,7 +994,7 @@ Manageable::status_t Broker::queryQueue( const std::string& name, } Manageable::status_t Broker::getTimestampConfig(bool& receive, - const ConnectionIdentity* context) + const Connection* context) { std::string name; // none needed for broker std::string userId = context->getUserId(); @@ -1006,7 +1006,7 @@ Manageable::status_t Broker::getTimestampConfig(bool& receive, } Manageable::status_t Broker::setTimestampConfig(const bool receive, - const ConnectionIdentity* context) + const Connection* context) { std::string name; // none needed for broker std::string userId = context->getUserId(); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 20e0c16e70..52b79a0944 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -149,20 +149,20 @@ class Broker : public sys::Runnable, public Plugin::Target, void setLogHiresTimestamp(bool enabled); bool getLogHiresTimestamp(); void createObject(const std::string& type, const std::string& name, - const qpid::types::Variant::Map& properties, bool strict, const ConnectionIdentity* context); + const qpid::types::Variant::Map& properties, bool strict, const Connection* context); void deleteObject(const std::string& type, const std::string& name, - const qpid::types::Variant::Map& options, const ConnectionIdentity* context); + const qpid::types::Variant::Map& options, const Connection* context); void checkDeleteQueue(boost::shared_ptr<Queue> queue, bool ifUnused, bool ifEmpty); Manageable::status_t queryObject(const std::string& type, const std::string& name, - qpid::types::Variant::Map& results, const ConnectionIdentity* context); + qpid::types::Variant::Map& results, const Connection* context); Manageable::status_t queryQueue( const std::string& name, const std::string& userId, const std::string& connectionId, qpid::types::Variant::Map& results); Manageable::status_t getTimestampConfig(bool& receive, - const ConnectionIdentity* context); + const Connection* context); Manageable::status_t setTimestampConfig(const bool receive, - const ConnectionIdentity* context); + const Connection* context); Manageable::status_t queueRedirect(const std::string& srcQueue, const std::string& tgtQueue); void queueRedirectDestroy(boost::shared_ptr<Queue> srcQ, boost::shared_ptr<Queue> tgtQ, bool moveMsgs); boost::shared_ptr<sys::Poller> poller; diff --git a/qpid/cpp/src/qpid/broker/Connection.h b/qpid/cpp/src/qpid/broker/Connection.h index 0f94a32fbf..ecc48123cf 100644 --- a/qpid/cpp/src/qpid/broker/Connection.h +++ b/qpid/cpp/src/qpid/broker/Connection.h @@ -21,215 +21,35 @@ * under the License. * */ - -#include <memory> -#include <sstream> -#include <vector> -#include <queue> - -#include "qpid/broker/BrokerImportExport.h" - -#include "qpid/broker/ConnectionHandler.h" -#include "qpid/broker/ConnectionIdentity.h" -#include "qpid/broker/OwnershipToken.h" -#include "qpid/management/Manageable.h" -#include "qpid/sys/AggregateOutput.h" -#include "qpid/sys/ConnectionInputHandler.h" -#include "qpid/sys/SecuritySettings.h" -#include "qpid/sys/Mutex.h" -#include "qpid/RefCounted.h" -#include "qpid/Url.h" -#include "qpid/ptr_map.h" - -#include "qmf/org/apache/qpid/broker/Connection.h" - -#include <boost/ptr_container/ptr_map.hpp> -#include <boost/scoped_ptr.hpp> -#include <boost/bind.hpp> - -#include <algorithm> +#include <map> +#include <string> namespace qpid { -namespace sys { -class ConnectionOutputHandler; -class Timer; -class TimerTask; +namespace management { +class ObjectId; +} +namespace types { +class Variant; } -namespace broker { - -class Broker; -class LinkRegistry; -class Queue; -class SecureConnection; -class SessionHandler; -struct ConnectionTimeoutTask; - -class Connection : public sys::ConnectionInputHandler, public ConnectionIdentity, - public OwnershipToken, public management::Manageable, - public RefCounted -{ - public: - uint32_t getFrameMax() const { return framemax; } - uint16_t getHeartbeat() const { return heartbeat; } - uint16_t getHeartbeatMax() const { return heartbeatmax; } - - void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); } - void setHeartbeat(uint16_t hb) { heartbeat = hb; } - void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } - - void setUrl(const std::string& _url) { url = _url; } - - const OwnershipToken* getOwnership() const { return this; }; - const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); }; - const std::string& getUserId() const { return userId; } - const std::string& getUrl() const { return url; } - - void setUserProxyAuth(const bool b); - bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids - bool isFederationLink() const { return federationPeerTag.size() > 0; } - void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } - const std::string& getFederationPeerTag() const { return federationPeerTag; } - std::vector<Url>& getKnownHosts() { return knownHosts; } - - /**@return true if user is the authenticated user on this connection. - * If id has the default realm will also compare plain username. - */ - bool isAuthenticatedUser(const std::string& id) const { - return (id == userId || (isDefaultRealm && id == userName)); - } - - Broker& getBroker() { return broker; } - - sys::ConnectionOutputHandler& getOutput() { return *out; } - void activateOutput(); - void addOutputTask(OutputTask*); - void removeOutputTask(OutputTask*); - framing::ProtocolVersion getVersion() const { return version; } - - Connection(sys::ConnectionOutputHandler* out, - Broker& broker, - const std::string& mgmtId, - const qpid::sys::SecuritySettings&, - bool isLink = false, - uint64_t objectId = 0); - - ~Connection (); - - /** Get the SessionHandler for channel. Create if it does not already exist */ - SessionHandler& getChannel(framing::ChannelId channel); - - /** Close the connection. Waits for the client to respond with close-ok - * before actually destroying the connection. - */ - QPID_BROKER_EXTERN void close( - framing::connection::CloseCode code, const std::string& text); - - /** Abort the connection. Close abruptly and immediately. */ - QPID_BROKER_EXTERN void abort(); - - // ConnectionInputHandler methods - void received(framing::AMQFrame& frame); - bool doOutput(); - void closed(); - - void closeChannel(framing::ChannelId channel); - - // Manageable entry points - management::ManagementObject::shared_ptr GetManagementObject(void) const; - management::Manageable::status_t - ManagementMethod (uint32_t methodId, management::Args& args, std::string&); - - void requestIOProcessing (boost::function0<void>); - void recordFromServer (const framing::AMQFrame& frame); - void recordFromClient (const framing::AMQFrame& frame); - - // gets for configured federation links - std::string getAuthMechanism(); - std::string getAuthCredentials(); - std::string getUsername(); - std::string getPassword(); - std::string getHost(); - uint16_t getPort(); - - void notifyConnectionForced(const std::string& text); - void setUserId(const std::string& uid); - - // credentials for connected client - const std::string& getMgmtId() const { return mgmtId; } - management::ManagementAgent* getAgent() const { return agent; } - - void setHeartbeatInterval(uint16_t heartbeat); - void sendHeartbeat(); - void restartTimeout(); - - void setSecureConnection(SecureConnection* secured); - - const qpid::sys::SecuritySettings& getExternalSecuritySettings() const - { - return securitySettings; - } - - /** @return true if the initial connection negotiation is complete. */ - bool isOpen(); - - bool isLink() { return link; } - void startLinkHeartbeatTimeoutTask(); - - void setClientProperties(const framing::FieldTable& cp) { clientProperties = cp; } - const framing::FieldTable& getClientProperties() const { return clientProperties; } - - private: - // Management object is used in the constructor so must be early - qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; - - //contained output tasks - sys::AggregateOutput outputTasks; - - boost::scoped_ptr<framing::FrameHandler> outboundTracker; - boost::scoped_ptr<sys::ConnectionOutputHandler> out; - - Broker& broker; - - framing::ProtocolVersion version; - uint32_t framemax; - uint16_t heartbeat; - uint16_t heartbeatmax; - std::string userId; - std::string url; - bool userProxyAuth; - std::string federationPeerTag; - std::vector<Url> knownHosts; - std::string userName; - bool isDefaultRealm; - - typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; - - ChannelMap channels; - qpid::sys::SecuritySettings securitySettings; - const bool link; - ConnectionHandler adapter; - bool mgmtClosing; - const std::string mgmtId; - sys::Mutex ioCallbackLock; - std::queue<boost::function0<void> > ioCallbacks; - LinkRegistry& links; - management::ManagementAgent* agent; - sys::Timer& timer; - boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer; - boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; - uint64_t objectId; - framing::FieldTable clientProperties; - -friend class OutboundFrameTracker; - void sent(const framing::AMQFrame& f); - void doIoCallbacks(); +namespace broker { - public: +class OwnershipToken; - qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } +/** + * Protocol independent connection abstraction. + */ +class Connection { +public: + virtual ~Connection() {} + virtual const OwnershipToken* getOwnership() const = 0; + virtual const management::ObjectId getObjectId() const = 0; + virtual const std::string& getUserId() const = 0; + virtual const std::string& getMgmtId() const = 0; + virtual const std::map<std::string, types::Variant>& getClientProperties() const = 0; + virtual bool isLink() const = 0; + virtual void abort() = 0; }; - -}} +}} // namespace qpid::broker #endif /*!QPID_BROKER_CONNECTION_H*/ diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 40393f1920..fd4af963ad 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -24,7 +24,7 @@ #include "qpid/SaslFactory.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/SecureConnection.h" #include "qpid/Url.h" #include "qpid/framing/AllInvoker.h" @@ -109,10 +109,10 @@ void ConnectionHandler::setSecureConnection(SecureConnection* secured) handler->secured = secured; } -ConnectionHandler::ConnectionHandler(Connection& connection, bool isClient) : +ConnectionHandler::ConnectionHandler(amqp_0_10::Connection& connection, bool isClient) : handler(new Handler(connection, isClient)) {} -ConnectionHandler::Handler::Handler(Connection& c, bool isClient) : +ConnectionHandler::Handler::Handler(amqp_0_10::Connection& c, bool isClient) : proxy(c.getOutput()), connection(c), serverMode(!isClient), secured(0), isOpen(false) @@ -153,14 +153,14 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) { const framing::FieldTable& clientProperties = body.getClientProperties(); qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject = connection.getMgmtObject(); + types::Variant::Map properties; + qpid::amqp_0_10::translate(clientProperties, properties); if (mgmtObject != 0) { string procName = clientProperties.getAsString(CLIENT_PROCESS_NAME); uint32_t pid = clientProperties.getAsInt(CLIENT_PID); uint32_t ppid = clientProperties.getAsInt(CLIENT_PPID); - types::Variant::Map properties; - qpid::amqp_0_10::translate(clientProperties, properties); mgmtObject->set_remoteProperties(properties); if (!procName.empty()) mgmtObject->set_remoteProcessName(procName); @@ -192,7 +192,7 @@ void ConnectionHandler::Handler::startOk(const ConnectionStartOkBody& body) throw; } - connection.setClientProperties(clientProperties); + connection.setClientProperties(properties); if (clientProperties.isSet(QPID_FED_TAG)) { connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG)); } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 9346e7b1ac..7af2fe3cb4 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -47,7 +47,9 @@ struct SecuritySettings; namespace broker { +namespace amqp_0_10 { class Connection; +} class SecureConnection; class ConnectionHandler : public framing::FrameHandler @@ -55,13 +57,13 @@ class ConnectionHandler : public framing::FrameHandler struct Handler : public framing::AMQP_AllOperations::ConnectionHandler { framing::AMQP_AllProxy::Connection proxy; - Connection& connection; + amqp_0_10::Connection& connection; bool serverMode; std::auto_ptr<SaslAuthenticator> authenticator; SecureConnection* secured; bool isOpen; - Handler(Connection& connection, bool isClient); + Handler(amqp_0_10::Connection& connection, bool isClient); ~Handler(); void startOk(const qpid::framing::ConnectionStartOkBody& body); void startOk(const qpid::framing::FieldTable& clientProperties, @@ -99,7 +101,7 @@ class ConnectionHandler : public framing::FrameHandler bool handle(const qpid::framing::AMQMethodBody& method); public: - ConnectionHandler(Connection& connection, bool isClient ); + ConnectionHandler(amqp_0_10::Connection& connection, bool isClient ); void close(framing::connection::CloseCode code, const std::string& text); void heartbeat(); void handle(framing::AMQFrame& frame); diff --git a/qpid/cpp/src/qpid/broker/HandlerImpl.h b/qpid/cpp/src/qpid/broker/HandlerImpl.h index 72bfb1c474..c41438fd90 100644 --- a/qpid/cpp/src/qpid/broker/HandlerImpl.h +++ b/qpid/cpp/src/qpid/broker/HandlerImpl.h @@ -21,7 +21,7 @@ #include "qpid/broker/SemanticState.h" #include "qpid/broker/SessionContext.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" namespace qpid { namespace broker { @@ -40,7 +40,7 @@ class HandlerImpl { HandlerImpl(SemanticState& s) : state(s), session(s.getSession()) {} framing::AMQP_ClientProxy& getProxy() { return session.getProxy(); } - Connection& getConnection() { return session.getConnection(); } + amqp_0_10::Connection& getConnection() { return session.getConnection(); } Broker& getBroker() { return session.getConnection().getBroker(); } }; diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index c6ac6832c0..31685eb1de 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/Link.h" #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/sys/Timer.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkUp.h" #include "qmf/org/apache/qpid/broker/EventBrokerLinkDown.h" @@ -233,7 +233,7 @@ void Link::startConnectionLH () } } -void Link::established(Connection* c) +void Link::established(qpid::broker::amqp_0_10::Connection* c) { stringstream addr; addr << host << ":" << port; diff --git a/qpid/cpp/src/qpid/broker/Link.h b/qpid/cpp/src/qpid/broker/Link.h index 01ddc68d97..d9924feec3 100644 --- a/qpid/cpp/src/qpid/broker/Link.h +++ b/qpid/cpp/src/qpid/broker/Link.h @@ -45,8 +45,10 @@ namespace broker { class LinkRegistry; class Broker; -class Connection; class LinkExchange; +namespace amqp_0_10 { +class Connection; +} class Link : public PersistableConfig, public management::Manageable { private: @@ -83,7 +85,7 @@ class Link : public PersistableConfig, public management::Manageable { Bridges cancellations; // Bridges pending cancellation framing::ChannelId nextFreeChannel; RangeSet<framing::ChannelId> freeChannels; - Connection* connection; + amqp_0_10::Connection* connection; management::ManagementAgent* agent; boost::function<void(Link*)> listener; boost::intrusive_ptr<sys::TimerTask> timerTask; @@ -109,7 +111,7 @@ class Link : public PersistableConfig, public management::Manageable { void reconnectLH(const Address&); //called by LinkRegistry // connection management (called by LinkRegistry) - void established(Connection*); // Called when connection is created + void established(amqp_0_10::Connection*); // Called when connection is created void opened(); // Called when connection is open (after create) void closed(int, std::string); // Called when connection goes away void notifyConnectionForced(const std::string text); @@ -194,7 +196,7 @@ class Link : public PersistableConfig, public management::Manageable { /** The current connction for this link. Note returns 0 if the link is not * presently connected. */ - Connection* getConnection() { return connection; } + amqp_0_10::Connection* getConnection() { return connection; } }; } } diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp index 8642294d06..ed1e314bda 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.cpp +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.cpp @@ -21,7 +21,7 @@ #include "qpid/broker/LinkRegistry.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/Link.h" #include "qpid/log/Statement.h" #include <iostream> @@ -53,10 +53,26 @@ class LinkRegistryConnectionObserver : public ConnectionObserver { LinkRegistry& links; public: LinkRegistryConnectionObserver(LinkRegistry& l) : links(l) {} - void connection(Connection& c) { links.notifyConnection(c.getMgmtId(), &c); } - void opened(Connection& c) { links.notifyOpened(c.getMgmtId()); } - void closed(Connection& c) { links.notifyClosed(c.getMgmtId()); } - void forced(Connection& c, const string& text) { links.notifyConnectionForced(c.getMgmtId(), text); } + void connection(Connection& in) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyConnection(c->getMgmtId(), c); + } + void opened(Connection& in) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyOpened(c->getMgmtId()); + } + void closed(Connection& in) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyClosed(c->getMgmtId()); + } + void forced(Connection& in, const string& text) + { + amqp_0_10::Connection* c = dynamic_cast<amqp_0_10::Connection*>(&in); + if (c) links.notifyConnectionForced(c->getMgmtId(), text); + } }; LinkRegistry::LinkRegistry (Broker* _broker) : @@ -287,7 +303,7 @@ Link::shared_ptr LinkRegistry::findLink(const std::string& connId) return Link::shared_ptr(); } -void LinkRegistry::notifyConnection(const std::string& key, Connection* c) +void LinkRegistry::notifyConnection(const std::string& key, amqp_0_10::Connection* c) { // find a link that is attempting to connect to the remote, and // create a mapping from connection id to link diff --git a/qpid/cpp/src/qpid/broker/LinkRegistry.h b/qpid/cpp/src/qpid/broker/LinkRegistry.h index e5b1c40781..a156a53624 100644 --- a/qpid/cpp/src/qpid/broker/LinkRegistry.h +++ b/qpid/cpp/src/qpid/broker/LinkRegistry.h @@ -35,10 +35,11 @@ namespace qpid { namespace broker { - +namespace amqp_0_10 { + class Connection; +} class Link; class Broker; - class Connection; class LinkRegistry { typedef std::map<std::string, boost::shared_ptr<Link> > LinkMap; typedef std::map<std::string, Bridge::shared_ptr> BridgeMap; @@ -58,7 +59,7 @@ namespace broker { boost::shared_ptr<Link> findLink(const std::string& key); // Methods called by the connection observer, key is connection identifier - void notifyConnection (const std::string& key, Connection* c); + void notifyConnection (const std::string& key, amqp_0_10::Connection* c); void notifyOpened (const std::string& key); void notifyClosed (const std::string& key); void notifyConnectionForced (const std::string& key, const std::string& text); diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 1d901025b6..ec44404793 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -23,7 +23,8 @@ #include "qpid/amqp/CharSequence.h" #include "qpid/amqp/MapHandler.h" -#include "qpid/broker/ConnectionIdentity.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/OwnershipToken.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/Manageable.h" #include "qpid/StringUtils.h" @@ -203,10 +204,10 @@ uint8_t Message::getPriority() const bool Message::getIsManagementMessage() const { return isManagementMessage; } void Message::setIsManagementMessage(bool b) { isManagementMessage = b; } -const OwnershipToken* Message::getPublisherOwnership() const { return publisher->getOwnership(); } -const management::ObjectId Message::getPublisherObjectId() const { return publisher->getObjectId(); } -const std::string& Message::getPublisherUserId() const { return publisher->getUserId(); } -const std::string& Message::getPublisherUrl() const { return publisher->getUrl(); } +const Connection* Message::getPublisher() const { return publisher; } +void Message::setPublisher(const Connection& p) { publisher = &p; } +bool Message::isLocalTo(const OwnershipToken* token) const { return token && publisher && token->isLocal(publisher->getOwnership()); } + qpid::framing::SequenceNumber Message::getSequence() const { diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 41ce2ec1a2..8f12b06a9d 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -47,7 +47,7 @@ class Manageable; namespace broker { class OwnershipToken; -class ConnectionIdentity; +class Connection; enum MessageState { @@ -85,12 +85,9 @@ public: int getDeliveryCount() const { return deliveryCount; } void resetDeliveryCount() { deliveryCount = -1; } - void setPublisher(const ConnectionIdentity& p) { publisher = &p; } - const ConnectionIdentity& getPublisher() const { return *publisher; } - const OwnershipToken* getPublisherOwnership() const; - const management::ObjectId getPublisherObjectId() const; - const std::string& getPublisherUserId() const; - const std::string& getPublisherUrl() const; + void setPublisher(const Connection& p); + const Connection* getPublisher() const; + bool isLocalTo(const OwnershipToken*) const; QPID_BROKER_EXTERN std::string getRoutingKey() const; QPID_BROKER_EXTERN bool isPersistent() const; @@ -148,7 +145,7 @@ public: boost::intrusive_ptr<Encoding> encoding; boost::intrusive_ptr<PersistableMessage> persistentContext; int deliveryCount; - const ConnectionIdentity* publisher; + const Connection* publisher; qpid::sys::AbsTime expiration; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; uint64_t timestamp; diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index e1782b01ce..c402e3e016 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -220,18 +220,13 @@ Queue::~Queue() { } -bool isLocalTo(const OwnershipToken* token, const Message& msg) -{ - return token && token->isLocal(msg.getPublisherOwnership()); -} - bool Queue::isLocal(const Message& msg) { //message is considered local if it was published on the same //connection as that of the session which declared this queue //exclusive (owner) or which has an exclusive subscription //(exclusive) - return settings.noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg)); + return settings.noLocal && (msg.isLocalTo(owner) || msg.isLocalTo(exclusive)); } bool Queue::isExcluded(const Message& msg) diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp index a5ef8c560c..e5d6db1a04 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.cpp @@ -21,7 +21,7 @@ #include "qpid/broker/AclModule.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/framing/FieldValue.h" #include "qpid/log/Statement.h" @@ -54,12 +54,12 @@ namespace broker { class NullAuthenticator : public SaslAuthenticator { - Connection& connection; + amqp_0_10::Connection& connection; framing::AMQP_ClientProxy::Connection client; std::string realm; const bool encrypt; public: - NullAuthenticator(Connection& connection, bool encrypt); + NullAuthenticator(amqp_0_10::Connection& connection, bool encrypt); ~NullAuthenticator(); void getMechanisms(framing::Array& mechanisms); void start(const std::string& mechanism, const std::string* response); @@ -74,7 +74,7 @@ public: class CyrusAuthenticator : public SaslAuthenticator { sasl_conn_t *sasl_conn; - Connection& connection; + amqp_0_10::Connection& connection; framing::AMQP_ClientProxy::Connection client; const bool encrypt; @@ -82,7 +82,7 @@ class CyrusAuthenticator : public SaslAuthenticator bool getUsername(std::string& uid); public: - CyrusAuthenticator(Connection& connection, bool encrypt); + CyrusAuthenticator(amqp_0_10::Connection& connection, bool encrypt); ~CyrusAuthenticator(); void init(); void getMechanisms(framing::Array& mechanisms); @@ -167,7 +167,7 @@ void SaslAuthenticator::fini(void) #endif -std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connection& c ) +std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(amqp_0_10::Connection& c ) { if (c.getBroker().getOptions().auth) { return std::auto_ptr<SaslAuthenticator>( @@ -179,7 +179,7 @@ std::auto_ptr<SaslAuthenticator> SaslAuthenticator::createAuthenticator(Connecti } -NullAuthenticator::NullAuthenticator(Connection& c, bool e) : connection(c), client(c.getOutput()), +NullAuthenticator::NullAuthenticator(amqp_0_10::Connection& c, bool e) : connection(c), client(c.getOutput()), realm(c.getBroker().getOptions().realm), encrypt(e) {} NullAuthenticator::~NullAuthenticator() {} @@ -246,7 +246,7 @@ std::auto_ptr<SecurityLayer> NullAuthenticator::getSecurityLayer(uint16_t) #if HAVE_SASL -CyrusAuthenticator::CyrusAuthenticator(Connection& c, bool _encrypt) : +CyrusAuthenticator::CyrusAuthenticator(amqp_0_10::Connection& c, bool _encrypt) : sasl_conn(0), connection(c), client(c.getOutput()), encrypt(_encrypt) { init(); diff --git a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h index e5ecc9f6ec..97434d6ffe 100644 --- a/qpid/cpp/src/qpid/broker/SaslAuthenticator.h +++ b/qpid/cpp/src/qpid/broker/SaslAuthenticator.h @@ -34,7 +34,9 @@ namespace qpid { namespace broker { +namespace amqp_0_10 { class Connection; +} class SaslAuthenticator { @@ -54,7 +56,7 @@ public: static void init(const std::string& saslName, std::string const & saslConfigPath ); static void fini(void); - static std::auto_ptr<SaslAuthenticator> createAuthenticator(Connection& connection); + static std::auto_ptr<SaslAuthenticator> createAuthenticator(amqp_0_10::Connection& connection); virtual void callUserIdCallbacks() { } }; diff --git a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp index 6b4f6b3025..0258350043 100644 --- a/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp +++ b/qpid/cpp/src/qpid/broker/SecureConnectionFactory.cpp @@ -22,7 +22,7 @@ #include "qpid/amqp_0_10/Connection.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/SecureConnection.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" @@ -35,7 +35,7 @@ using framing::ProtocolVersion; using qpid::sys::SecuritySettings; typedef std::auto_ptr<qpid::amqp_0_10::Connection> CodecPtr; typedef std::auto_ptr<SecureConnection> SecureConnectionPtr; -typedef std::auto_ptr<Connection> ConnectionPtr; +typedef std::auto_ptr<qpid::broker::amqp_0_10::Connection> ConnectionPtr; typedef std::auto_ptr<sys::ConnectionInputHandler> InputPtr; SecureConnectionFactory::SecureConnectionFactory(Broker& b) : broker(b) {} @@ -64,7 +64,7 @@ SecureConnectionFactory::create_0_10(sys::OutputControl& out, const std::string& { SecureConnectionPtr sc(new SecureConnection()); CodecPtr c(new qpid::amqp_0_10::Connection(out, id, brokerActsAsClient)); - ConnectionPtr i(new broker::Connection(c.get(), broker, id, external, brokerActsAsClient)); + ConnectionPtr i(new broker::amqp_0_10::Connection(c.get(), broker, id, external, brokerActsAsClient)); i->setSecureConnection(sc.get()); c->setInputHandler(InputPtr(i.release())); sc->setCodec(std::auto_ptr<sys::ConnectionCodec>(c)); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 54069df591..dd7a25aaa4 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -22,7 +22,7 @@ #include "qpid/broker/SessionState.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/DtxAck.h" #include "qpid/broker/DtxTimeout.h" @@ -83,7 +83,7 @@ SemanticState::SemanticState(SessionState& ss) authMsg(getSession().getBroker().getOptions().auth && !getSession().getConnection().isUserProxyAuth()), userID(getSession().getConnection().getUserId()), closeComplete(false), - connectionId(getSession().getConnection().getUrl()) + connectionId(getSession().getConnection().getMgmtId()) {} SemanticState::~SemanticState() { diff --git a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp index f7ca4890b4..2d4868628f 100644 --- a/qpid/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/qpid/cpp/src/qpid/broker/SessionAdapter.cpp @@ -18,7 +18,7 @@ #include "qpid/broker/SessionAdapter.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Queue.h" #include "qpid/Exception.h" @@ -96,14 +96,14 @@ void SessionAdapter::ExchangeHandlerImpl::declare(const string& exchange, const try{ std::pair<Exchange::shared_ptr, bool> response = getBroker().createExchange(exchange, type, durable, alternateExchange, args, - getConnection().getUserId(), getConnection().getUrl()); + getConnection().getUserId(), getConnection().getMgmtId()); if (!response.second) { //exchange already there, not created checkType(response.first, type); checkAlternate(response.first, alternate); QPID_LOG_CAT(debug, model, "Create exchange. name:" << exchange << " user:" << getConnection().getUserId() - << " rhost:" << getConnection().getUrl() + << " rhost:" << getConnection().getMgmtId() << " type:" << type << " alternateExchange:" << alternateExchange << " durable:" << (durable ? "T" : "F")); @@ -134,7 +134,7 @@ void SessionAdapter::ExchangeHandlerImpl::checkAlternate(Exchange::shared_ptr ex void SessionAdapter::ExchangeHandlerImpl::delete_(const string& name, bool /*ifUnused*/) { //TODO: implement if-unused - getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getUrl()); + getBroker().deleteExchange(name, getConnection().getUserId(), getConnection().getMgmtId()); } ExchangeQueryResult SessionAdapter::ExchangeHandlerImpl::query(const string& name) @@ -156,7 +156,7 @@ void SessionAdapter::ExchangeHandlerImpl::bind(const string& queueName, const FieldTable& arguments) { getBroker().bind(queueName, exchangeName, routingKey, arguments, - getConnection().getUserId(), getConnection().getUrl()); + getConnection().getUserId(), getConnection().getMgmtId()); state.addBinding(queueName, exchangeName, routingKey, arguments); } @@ -166,7 +166,7 @@ void SessionAdapter::ExchangeHandlerImpl::unbind(const string& queueName, { state.removeBinding(queueName, exchangeName, routingKey); getBroker().unbind(queueName, exchangeName, routingKey, - getConnection().getUserId(), getConnection().getUrl()); + getConnection().getUserId(), getConnection().getMgmtId()); } ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string& exchangeName, @@ -209,7 +209,7 @@ ExchangeBoundResult SessionAdapter::ExchangeHandlerImpl::bound(const std::string SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : HandlerHelper(session), broker(getBroker()), //record connection id and userid for deleting exclsuive queues after session has ended: - connectionId(getConnection().getUrl()), userId(getConnection().getUserId()) + connectionId(getConnection().getMgmtId()), userId(getConnection().getUserId()) {} @@ -302,7 +302,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& exclusive ? &session : 0, alternateExchange, getConnection().getUserId(), - getConnection().getUrl()); + getConnection().getMgmtId()); queue = queue_created.first; assert(queue); if (queue_created.second) { // This is a new queue @@ -316,7 +316,7 @@ void SessionAdapter::QueueHandlerImpl::declare(const string& name, const string& } QPID_LOG_CAT(debug, model, "Create queue. name:" << name << " user:" << getConnection().getUserId() - << " rhost:" << getConnection().getUrl() + << " rhost:" << getConnection().getMgmtId() << " durable:" << (durable ? "T" : "F") << " exclusive:" << (exclusive ? "T" : "F") << " autodelete:" << (autoDelete ? "T" : "F") @@ -363,7 +363,7 @@ void SessionAdapter::QueueHandlerImpl::checkDelete(Queue::shared_ptr queue, bool void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnused, bool ifEmpty) { - getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getUrl(), + getBroker().deleteQueue(queue, getConnection().getUserId(), getConnection().getMgmtId(), boost::bind(&SessionAdapter::QueueHandlerImpl::checkDelete, this, _1, ifUnused, ifEmpty)); } @@ -429,12 +429,12 @@ SessionAdapter::MessageHandlerImpl::subscribe(const string& queueName, ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) - agent->raiseEvent(_qmf::EventSubscribe(getConnection().getUrl(), getConnection().getUserId(), + agent->raiseEvent(_qmf::EventSubscribe(getConnection().getMgmtId(), getConnection().getUserId(), queueName, destination, exclusive, ManagementAgent::toMap(arguments))); QPID_LOG_CAT(debug, model, "Create subscription. queue:" << queueName << " destination:" << destination << " user:" << getConnection().getUserId() - << " rhost:" << getConnection().getUrl() + << " rhost:" << getConnection().getMgmtId() << " exclusive:" << (exclusive ? "T" : "F") ); } @@ -448,10 +448,10 @@ SessionAdapter::MessageHandlerImpl::cancel(const string& destination ) ManagementAgent* agent = getBroker().getManagementAgent(); if (agent) - agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getUrl(), getConnection().getUserId(), destination)); + agent->raiseEvent(_qmf::EventUnsubscribe(getConnection().getMgmtId(), getConnection().getUserId(), destination)); QPID_LOG_CAT(debug, model, "Delete subscription. destination:" << destination << " user:" << getConnection().getUserId() - << " rhost:" << getConnection().getUrl() ); + << " rhost:" << getConnection().getMgmtId() ); } void diff --git a/qpid/cpp/src/qpid/broker/SessionContext.h b/qpid/cpp/src/qpid/broker/SessionContext.h index 25b8e22949..92a3dcecc2 100644 --- a/qpid/cpp/src/qpid/broker/SessionContext.h +++ b/qpid/cpp/src/qpid/broker/SessionContext.h @@ -36,14 +36,16 @@ class AMQP_ClientProxy; namespace broker { class Broker; +namespace amqp_0_10 { class Connection; +} class SessionContext : public OwnershipToken { public: virtual ~SessionContext(){} virtual bool isAttached() const = 0; - virtual Connection& getConnection() = 0; + virtual amqp_0_10::Connection& getConnection() = 0; virtual framing::AMQP_ClientProxy& getProxy() = 0; virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.cpp b/qpid/cpp/src/qpid/broker/SessionHandler.cpp index 8cbecbc6f7..93977c8a6e 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/SessionHandler.cpp @@ -20,7 +20,7 @@ #include "qpid/broker/SessionHandler.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/SessionState.h" #include "qpid/log/Statement.h" #include "qpid/sys/ConnectionOutputHandler.h" @@ -33,7 +33,7 @@ using namespace framing; using namespace std; using namespace qpid::sys; -SessionHandler::SessionHandler(Connection& c, ChannelId ch) +SessionHandler::SessionHandler(amqp_0_10::Connection& c, ChannelId ch) : qpid::amqp_0_10::SessionHandler(&c.getOutput(), ch), connection(c), proxy(out) @@ -64,9 +64,9 @@ void SessionHandler::executionException( errorListener->executionException(code, msg); } -Connection& SessionHandler::getConnection() { return connection; } +amqp_0_10::Connection& SessionHandler::getConnection() { return connection; } -const Connection& SessionHandler::getConnection() const { return connection; } +const amqp_0_10::Connection& SessionHandler::getConnection() const { return connection; } void SessionHandler::handleDetach() { qpid::amqp_0_10::SessionHandler::handleDetach(); diff --git a/qpid/cpp/src/qpid/broker/SessionHandler.h b/qpid/cpp/src/qpid/broker/SessionHandler.h index da57fb103e..3ee1538ccd 100644 --- a/qpid/cpp/src/qpid/broker/SessionHandler.h +++ b/qpid/cpp/src/qpid/broker/SessionHandler.h @@ -31,8 +31,9 @@ namespace qpid { class SessionState; namespace broker { - +namespace amqp_0_10 { class Connection; +} class SessionState; /** @@ -57,15 +58,15 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { /** *@param e must not be deleted until ErrorListener::detach has been called */ - SessionHandler(Connection&, framing::ChannelId); + SessionHandler(amqp_0_10::Connection&, framing::ChannelId); ~SessionHandler(); /** Get broker::SessionState */ SessionState* getSession() { return session.get(); } const SessionState* getSession() const { return session.get(); } - Connection& getConnection(); - const Connection& getConnection() const; + amqp_0_10::Connection& getConnection(); + const amqp_0_10::Connection& getConnection() const; framing::AMQP_ClientProxy& getProxy() { return proxy; } const framing::AMQP_ClientProxy& getProxy() const { return proxy; } @@ -93,7 +94,7 @@ class SessionHandler : public qpid::amqp_0_10::SessionHandler { : framing::AMQP_ClientProxy(setChannel), setChannel(ch, out) {} }; - Connection& connection; + amqp_0_10::Connection& connection; framing::AMQP_ClientProxy proxy; std::auto_ptr<SessionState> session; boost::shared_ptr<ErrorListener> errorListener; diff --git a/qpid/cpp/src/qpid/broker/SessionState.cpp b/qpid/cpp/src/qpid/broker/SessionState.cpp index f9b84dc9fb..421dc008a9 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.cpp +++ b/qpid/cpp/src/qpid/broker/SessionState.cpp @@ -96,7 +96,7 @@ uint16_t SessionState::getChannel() const { return handler->getChannel(); } -Connection& SessionState::getConnection() { +amqp_0_10::Connection& SessionState::getConnection() { assert(isAttached()); return handler->getConnection(); } diff --git a/qpid/cpp/src/qpid/broker/SessionState.h b/qpid/cpp/src/qpid/broker/SessionState.h index eef9cf70c7..daf3767969 100644 --- a/qpid/cpp/src/qpid/broker/SessionState.h +++ b/qpid/cpp/src/qpid/broker/SessionState.h @@ -89,7 +89,7 @@ class SessionState : public qpid::SessionState, uint16_t getChannel() const; /** @pre isAttached() */ - Connection& getConnection(); + amqp_0_10::Connection& getConnection(); bool isLocal(const OwnershipToken* t) const; Broker& getBroker(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp new file mode 100644 index 0000000000..10b7fa5f7a --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.cpp @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Authorise.h" +#include "Exception.h" +#include "Filter.h" +#include "qpid/amqp/descriptors.h" +#include "qpid/broker/AclModule.h" +#include "qpid/broker/Exchange.h" +#include "qpid/broker/Message.h" +#include "qpid/broker/Queue.h" +#include "qpid/types/Variant.h" +#include <map> +#include <boost/lexical_cast.hpp> + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +const std::string B_TRUE("true"); +const std::string B_FALSE("false"); +const std::string POLICY_TYPE("qpid.policy_type"); +} + +Authorise::Authorise(const std::string& u, AclModule* a) : user(u), acl(a) {} +void Authorise::access(boost::shared_ptr<Exchange> exchange) +{ + if (acl) { + std::map<acl::Property, std::string> params; + params.insert(std::make_pair(acl::PROP_TYPE, exchange->getType())); + params.insert(std::make_pair(acl::PROP_DURABLE, exchange->isDurable() ? B_TRUE : B_FALSE)); + if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_EXCHANGE, exchange->getName(), ¶ms)) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange access request from " << user)); + } +} +void Authorise::access(boost::shared_ptr<Queue> queue) +{ + if (acl) { + const QueueSettings& settings = queue->getSettings(); + std::map<acl::Property, std::string> params; + boost::shared_ptr<Exchange> altex = queue->getAlternateExchange(); + if (altex) + params.insert(std::make_pair(acl::PROP_ALTERNATE, altex->getName())); + params.insert(std::make_pair(acl::PROP_DURABLE, settings.durable ? B_TRUE : B_FALSE)); + params.insert(std::make_pair(acl::PROP_EXCLUSIVE, queue->hasExclusiveOwner() ? B_TRUE : B_FALSE)); + params.insert(std::make_pair(acl::PROP_AUTODELETE, settings.autodelete ? B_TRUE : B_FALSE)); + qpid::types::Variant::Map::const_iterator i = settings.original.find(POLICY_TYPE); + if (i != settings.original.end()) + params.insert(std::make_pair(acl::PROP_POLICYTYPE, i->second.asString())); + if (settings.maxDepth.hasCount()) + params.insert(std::make_pair(acl::PROP_MAXQUEUECOUNT, boost::lexical_cast<std::string>(settings.maxDepth.getCount()))); + if (settings.maxDepth.hasCount()) + params.insert(std::make_pair(acl::PROP_MAXQUEUESIZE, boost::lexical_cast<std::string>(settings.maxDepth.getSize()))); + if (!acl->authorise(user, acl::ACT_ACCESS, acl::OBJ_QUEUE, queue->getName(), ¶ms) ) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue access request from " << user)); + } +} + +void Authorise::incoming(boost::shared_ptr<Exchange> exchange) +{ + access(exchange); + //can't check publish permission here as do not yet know routing key +} +void Authorise::incoming(boost::shared_ptr<Queue> queue) +{ + access(queue); + if (acl) { + if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, std::string()/*default exchange*/, queue->getName())) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to queue " << queue->getName())); + } +} +void Authorise::outgoing(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue, const Filter& filter) +{ + access(exchange); + if (acl) { + std::map<qpid::acl::Property, std::string> params; + params.insert(std::make_pair(acl::PROP_QUEUENAME, queue->getName())); + params.insert(std::make_pair(acl::PROP_ROUTINGKEY, filter.getBindingKey(exchange))); + + if (!acl->authorise(user, acl::ACT_BIND, acl::OBJ_EXCHANGE, exchange->getName(), ¶ms)) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied exchange bind request from " << user)); + + if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL)) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user)); + } +} + +void Authorise::outgoing(boost::shared_ptr<Queue> queue) +{ + access(queue); + if (acl) { + if (!acl->authorise(user, acl::ACT_CONSUME, acl::OBJ_QUEUE, queue->getName(), NULL)) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied queue subscribe request from " << user)); + } +} + +void Authorise::route(boost::shared_ptr<Exchange> exchange, const Message& msg) +{ + if (acl && acl->doTransferAcl()) { + if (!acl->authorise(user, acl::ACT_PUBLISH, acl::OBJ_EXCHANGE, exchange->getName(), msg.getRoutingKey())) + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG(user << " cannot publish to " << exchange->getName() << " with routing-key " << msg.getRoutingKey())); + } +} + +void Authorise::interlink() +{ + if (acl) { + if (!acl->authorise(user, acl::ACT_CREATE, acl::OBJ_LINK, "")){ + throw Exception(qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS, QPID_MSG("ACL denied " << user << " a AMQP 1.0 link")); + } + } +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Authorise.h b/qpid/cpp/src/qpid/broker/amqp/Authorise.h new file mode 100644 index 0000000000..7bdb75375f --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Authorise.h @@ -0,0 +1,57 @@ +#ifndef QPID_BROKER_AMQP_AUTHORISE_H +#define QPID_BROKER_AMQP_AUTHORISE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +class AclModule; +class Exchange; +class Message; +class Queue; +namespace amqp { +class Filter; + +/** + * Class to handle authorisation requests (and hide the ACL mess behind) + */ +class Authorise +{ + public: + Authorise(const std::string& user, AclModule*); + void access(boost::shared_ptr<Exchange>); + void access(boost::shared_ptr<Queue>); + void incoming(boost::shared_ptr<Exchange>); + void incoming(boost::shared_ptr<Queue>); + void outgoing(boost::shared_ptr<Exchange>, boost::shared_ptr<Queue>, const Filter&); + void outgoing(boost::shared_ptr<Queue>); + void route(boost::shared_ptr<Exchange>, const Message&); + void interlink(); + private: + const std::string user; + AclModule* const acl; + +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_AUTHORISE_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 51576e9577..4433419402 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -19,9 +19,12 @@ * */ #include "Connection.h" +#include "DataReader.h" #include "Session.h" -#include "qpid/Exception.h" +#include "Exception.h" +#include "qpid/broker/AclModule.h" #include "qpid/broker/Broker.h" +#include "qpid/amqp/descriptors.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" @@ -36,7 +39,6 @@ extern "C" { namespace qpid { namespace broker { namespace amqp { - Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d) : ManagedConnection(b, i), connection(pn_connection()), @@ -52,6 +54,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid:: QPID_LOG_TEST_CAT(trace, protocol, enableTrace); if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM); + broker.getConnectionObservers().connection(*this); if (!saslInUse) { //feed in a dummy AMQP 1.0 header as engine expects one, but //we already read it (if sasl is in use we read the sasl @@ -62,15 +65,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid:: pi.encode(buffer); pn_transport_input(transport, &protocolHeader[0], protocolHeader.size()); - //wont get a userid, so set a dummy one on the ManagedConnection to trigger event - setUserid("no authentication used"); + setUserId("none"); } } Connection::~Connection() { - + broker.getConnectionObservers().closed(*this); pn_transport_free(transport); pn_connection_free(connection); } @@ -97,8 +99,17 @@ size_t Connection::decode(const char* buffer, size_t size) QPID_LOG_CAT(debug, network, id << " decoded " << n << " bytes from " << size); try { process(); + } catch (const Exception& e) { + QPID_LOG(error, id << ": " << e.what()); + pn_condition_t* error = pn_connection_condition(connection); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + close(); } catch (const std::exception& e) { QPID_LOG(error, id << ": " << e.what()); + pn_condition_t* error = pn_connection_condition(connection); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); close(); } pn_transport_tick(transport, 0); @@ -108,7 +119,7 @@ size_t Connection::decode(const char* buffer, size_t size) } return n; } else if (n == PN_ERR) { - throw qpid::Exception(QPID_MSG("Error on input: " << getError())); + throw Exception(qpid::amqp::error_conditions::DECODE_ERROR, QPID_MSG("Error on input: " << getError())); } else { return 0; } @@ -126,7 +137,7 @@ size_t Connection::encode(char* buffer, size_t size) haveOutput = size; return size;//Is this right? } else if (n == PN_ERR) { - throw qpid::Exception(QPID_MSG("Error on output: " << getError())); + throw Exception(qpid::amqp::error_conditions::INTERNAL_ERROR, QPID_MSG("Error on output: " << getError())); } else { haveOutput = false; return 0; @@ -139,8 +150,17 @@ bool Connection::canEncode() if (i->second->dispatch()) haveOutput = true; } process(); + } catch (const Exception& e) { + QPID_LOG(error, id << ": " << e.what()); + pn_condition_t* error = pn_connection_condition(connection); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + close(); } catch (const std::exception& e) { QPID_LOG(error, id << ": " << e.what()); + pn_condition_t* error = pn_connection_condition(connection); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); close(); } //TODO: proper handling of time in and out of tick @@ -148,6 +168,28 @@ bool Connection::canEncode() QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) return haveOutput; } + +void Connection::open() +{ + readPeerProperties(); + + pn_connection_set_container(connection, broker.getFederationTag().c_str()); + pn_connection_open(connection); + out.connectionEstablished(); + opened(); + broker.getConnectionObservers().opened(*this); +} + +void Connection::readPeerProperties() +{ + /** + * TODO: enable when proton 0.5 has been released: + qpid::types::Variant::Map properties; + DataReader::read(pn_connection_remote_properties(connection), properties); + setPeerProperties(properties); + */ +} + void Connection::closed() { for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { @@ -178,10 +220,8 @@ void Connection::process() QPID_LOG(trace, id << " process()"); if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { QPID_LOG_CAT(debug, model, id << " connection opened"); - pn_connection_set_container(connection, broker.getFederationTag().c_str()); + open(); setContainerId(pn_connection_remote_container(connection)); - pn_connection_open(connection); - out.connectionEstablished(); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { @@ -200,9 +240,17 @@ void Connection::process() try { session->second->attach(l); QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l)); + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(l); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(l); } catch (const std::exception& e) { QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - //TODO: set error details on detach when that is exposed via engine API + pn_condition_t* error = pn_link_condition(l); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); pn_link_close(l); } } @@ -214,7 +262,15 @@ void Connection::process() if (pn_link_is_receiver(link)) { Sessions::iterator i = sessions.find(pn_link_session(link)); if (i != sessions.end()) { - i->second->readable(link, delivery); + try { + i->second->readable(link, delivery); + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error on publish: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } } else { pn_delivery_update(delivery, PN_REJECTED); } @@ -271,4 +327,19 @@ std::string Connection::getDomain() const { return domain; } + +void Connection::abort() +{ + out.abort(); +} + +void Connection::setUserId(const std::string& user) +{ + ManagedConnection::setUserId(user); + AclModule* acl = broker.getAcl(); + if (acl && !acl->approveConnection(*this)) + { + throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit"); + } +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index d61db82e60..d460f972d2 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -58,6 +58,9 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection pn_transport_t* getTransport(); Interconnects& getInterconnects(); std::string getDomain() const; + void setUserId(const std::string&); + void abort(); + protected: typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; @@ -73,6 +76,8 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection virtual void process(); std::string getError(); void close(); + void open(); + void readPeerProperties(); }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp index 519dd71c9c..1140032174 100644 --- a/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.cpp @@ -21,6 +21,7 @@ #include "DataReader.h" #include "qpid/amqp/CharSequence.h" #include "qpid/amqp/Descriptor.h" +#include "qpid/amqp/MapBuilder.h" #include "qpid/log/Statement.h" #include <string> extern "C" { @@ -52,11 +53,6 @@ DataReader::DataReader(qpid::amqp::Reader& r) : reader(r) {} void DataReader::read(pn_data_t* data) { - /* - while (pn_data_next(data)) { - readOne(data); - } - */ do { readOne(data); } while (pn_data_next(data)); @@ -184,4 +180,12 @@ void DataReader::readMap(pn_data_t* data, const qpid::amqp::Descriptor* descript pn_data_exit(data); reader.onEndMap(count, descriptor); } + +void DataReader::read(pn_data_t* data, std::map<std::string, qpid::types::Variant>& out) +{ + qpid::amqp::MapBuilder builder; + DataReader reader(builder); + reader.read(data); + out = builder.getMap(); +} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/DataReader.h b/qpid/cpp/src/qpid/broker/amqp/DataReader.h index 024507e7f2..99ff77b3dd 100644 --- a/qpid/cpp/src/qpid/broker/amqp/DataReader.h +++ b/qpid/cpp/src/qpid/broker/amqp/DataReader.h @@ -22,10 +22,15 @@ * */ #include "qpid/amqp/Reader.h" +#include <map> +#include <string> struct pn_data_t; namespace qpid { +namespace types { +class Variant; +} namespace amqp { struct Descriptor; } @@ -40,6 +45,7 @@ class DataReader public: DataReader(qpid::amqp::Reader& reader); void read(pn_data_t*); + static void read(pn_data_t*, std::map<std::string, qpid::types::Variant>&); private: qpid::amqp::Reader& reader; diff --git a/qpid/cpp/src/qpid/broker/amqp/Exception.cpp b/qpid/cpp/src/qpid/broker/amqp/Exception.cpp new file mode 100644 index 0000000000..6b874aa272 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Exception.cpp @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Exception.h" + +namespace qpid { +namespace broker { +namespace amqp { +Exception::Exception(const std::string& n, const std::string& d) : name(n), description(d) {} +Exception::~Exception() throw() {} +const char* Exception::what() const throw() { return description.c_str(); } +const char* Exception::symbol() const throw() { return name.c_str(); } +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/ConnectionIdentity.h b/qpid/cpp/src/qpid/broker/amqp/Exception.h index 4e28ca11e3..c2fe470e55 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionIdentity.h +++ b/qpid/cpp/src/qpid/broker/amqp/Exception.h @@ -1,5 +1,5 @@ -#ifndef QPID_BROKER_CONNECTIONIDENTITY_H -#define QPID_BROKER_CONNECTIONIDENTITY_H +#ifndef QPID_BROKER_AMQP_EXCEPTION_H +#define QPID_BROKER_AMQP_EXCEPTION_H /* * @@ -21,31 +21,25 @@ * under the License. * */ - #include <string> namespace qpid { - -namespace management { -class ObjectId; -} - namespace broker { - -class OwnershipToken; - -// Interface used to hold Connection authentication and object details for use when authenticating -// publihed management requests. -class ConnectionIdentity { -protected: - virtual ~ConnectionIdentity() {} - -public: - virtual const OwnershipToken* getOwnership() const = 0; - virtual const management::ObjectId getObjectId() const = 0; - virtual const std::string& getUserId() const = 0; - virtual const std::string& getUrl() const = 0; +namespace amqp { +/** + * Exception to signal various AMQP 1.0 defined conditions + */ +class Exception : public std::exception +{ + public: + Exception(const std::string& name, const std::string& description); + virtual ~Exception() throw(); + const char* what() const throw(); + const char* symbol() const throw(); + private: + std::string name; + std::string description; }; +}}} // namespace qpid::broker::amqp -}} -#endif // QPID_BROKER_CONNECTIONIDENTITY_H +#endif /*!QPID_BROKER_AMQP_EXCEPTION_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp index 48d9334387..9b110b219d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Filter.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Filter.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/amqp/Filter.h" +#include "qpid/broker/amqp/Authorise.h" #include "qpid/broker/amqp/DataReader.h" #include "qpid/broker/amqp/Outgoing.h" #include "qpid/broker/DirectExchange.h" @@ -235,6 +236,15 @@ void Filter::configure(QueueSettings& settings) } } +std::string Filter::getBindingKey(boost::shared_ptr<Exchange> exchange) const +{ + if (subjectFilter.value.empty() && exchange->getType() == TopicExchange::typeName) { + return WILDCARD; + } else { + return subjectFilter.value; + } +} + void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) { qpid::framing::FieldTable bindingArgs; @@ -379,5 +389,12 @@ void Filter::MapFilter::writeValue(pn_data_t* data) pn_data_exit(data); } +void Filter::write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target) +{ + MapFilter dummy; + dummy.value = source; + dummy.writeValue(target); +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Filter.h b/qpid/cpp/src/qpid/broker/amqp/Filter.h index a76928eb01..3f395c9c19 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Filter.h +++ b/qpid/cpp/src/qpid/broker/amqp/Filter.h @@ -43,6 +43,7 @@ class Filter : qpid::amqp::MapReader Filter(); void read(pn_data_t*); void write(pn_data_t*); + std::string getBindingKey(boost::shared_ptr<Exchange> exchange) const; /** * Apply filters where source is a queue @@ -57,6 +58,11 @@ class Filter : qpid::amqp::MapReader * Bind subscription queue for case where source is an exchange */ void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); + + /** + * Not really the ideal place for this, but the logic is already implemented here... + */ + static void write(std::map<std::string, qpid::types::Variant> source, pn_data_t* target); private: struct FilterBase { diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index 14614b0b87..119d05af60 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -19,8 +19,10 @@ * */ #include "Incoming.h" +#include "Exception.h" #include "Message.h" #include "Session.h" +#include "qpid/amqp/descriptors.h" #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Message.h" @@ -60,6 +62,30 @@ void Incoming::wakeup() { session.wakeup(); } + +void Incoming::verify(const std::string& u, const std::string& r) +{ + userid.init(u, r); +} + +Incoming::UserId::UserId() : inDefaultRealm(false) {} +void Incoming::UserId::init(const std::string& u, const std::string& defaultRealm) +{ + userid = u; + size_t at = userid.find('@'); + if (at != std::string::npos) { + unqualified = userid.substr(0, at); + inDefaultRealm = defaultRealm == userid.substr(at+1); + } +} +void Incoming::UserId::verify(const std::string& claimed) +{ + if(!userid.empty() && !claimed.empty() && userid != claimed && !(inDefaultRealm && claimed == unqualified)) { + throw Exception(qpid::amqp::error_conditions::NOT_ALLOWED, QPID_MSG("Authenticated user id is " << userid << " but user id in message declared as " << claimed)); + } +} + + namespace { class Transfer : public qpid::broker::AsyncCompletion::Callback { @@ -89,7 +115,7 @@ void DecodingIncoming::readable(pn_delivery_t* delivery) pn_link_advance(link); qpid::broker::Message message(received, received); - + userid.verify(message.getUserId()); handle(message); --window; received->begin(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index a7c706aed9..8852766eda 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -43,12 +43,28 @@ class Incoming : public ManagedIncomingLink virtual bool haveWork();//called when handling input to see whether any output work is needed virtual void detached(); virtual void readable(pn_delivery_t* delivery) = 0; + void verify(const std::string& userid, const std::string& defaultRealm); void wakeup(); protected: + class UserId + { + public: + UserId(); + void init(const std::string& userid, const std::string& defaultRealm); + void verify(const std::string& claimed); + private: + std::string userid; + bool inDefaultRealm; + std::string unqualified; + }; + const uint32_t credit; uint32_t window; + + pn_link_t* link; Session& session; + UserId userid; virtual uint32_t getCredit(); }; diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp index f7f12be6c4..4741130bd1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp @@ -82,9 +82,7 @@ void Interconnect::process() } else { if ((pn_connection_state(connection) & UNINIT) == UNINIT) { QPID_LOG_CAT(debug, model, id << " interconnect opened"); - pn_connection_set_container(connection, broker.getFederationTag().c_str()); - pn_connection_open(connection); - out.connectionEstablished(); + open(); pn_session_t* s = pn_session(connection); pn_session_open(s); @@ -116,4 +114,9 @@ void Interconnect::transportDeleted() registry.remove(name); } +bool Interconnect::isLink() const +{ + return true; +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h index 230abbc667..64d037dae5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h @@ -44,6 +44,7 @@ class Interconnect : public Connection size_t encode(char* buffer, size_t size); void deletedFromRegistry(); void transportDeleted(); + bool isLink() const; private: bool incoming; std::string name; diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp index b62a07d067..d1595b47cc 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.cpp @@ -54,11 +54,17 @@ ManagedConnection::~ManagedConnection() QPID_LOG_CAT(debug, model, "Delete connection. user:" << userid << " rhost:" << id); } -void ManagedConnection::setUserid(const std::string& uid) +void ManagedConnection::setUserId(const std::string& uid) { userid = uid; - if (agent && connection) { + if (connection) { connection->set_authIdentity(userid); + } +} + +void ManagedConnection::opened() +{ + if (agent) { agent->raiseEvent(_qmf::EventClientConnect(id, userid, connection->get_remoteProperties())); } QPID_LOG_CAT(debug, model, "Create connection. user:" << userid << " rhost:" << id ); @@ -78,13 +84,20 @@ void ManagedConnection::setSaslSsf(int ssf) } } +void ManagedConnection::setPeerProperties(std::map<std::string, types::Variant>& p) +{ + peerProperties = p; + if (connection) { + connection->set_remoteProperties(peerProperties); + } +} + void ManagedConnection::setContainerId(const std::string& container) { containerid = container; + peerProperties["container-id"] = containerid; if (connection) { - qpid::types::Variant::Map props; - props["container-id"] = containerid; - connection->set_remoteProperties(props); + connection->set_remoteProperties(peerProperties); } } const std::string& ManagedConnection::getContainerId() const @@ -98,7 +111,31 @@ qpid::management::ManagementObject::shared_ptr ManagedConnection::GetManagementO } std::string ManagedConnection::getId() const { return id; } -std::string ManagedConnection::getUserid() const { return userid; } + +const OwnershipToken* ManagedConnection::getOwnership() const +{ + return this; +} +const management::ObjectId ManagedConnection::getObjectId() const +{ + return GetManagementObject()->getObjectId(); +} +const std::string& ManagedConnection::getUserId() const +{ + return userid; +} +const std::string& ManagedConnection::getMgmtId() const +{ + return id; +} +const std::map<std::string, types::Variant>& ManagedConnection::getClientProperties() const +{ + return connection->get_remoteProperties(); +} +bool ManagedConnection::isLink() const +{ + return false; +} bool ManagedConnection::isLocal(const OwnershipToken* t) const { diff --git a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h index 634c0fca99..a9f90cefcf 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h +++ b/qpid/cpp/src/qpid/broker/amqp/ManagedConnection.h @@ -22,7 +22,9 @@ * */ #include "qpid/management/Manageable.h" +#include "qpid/broker/Connection.h" #include "qpid/broker/OwnershipToken.h" +#include "qpid/types/Variant.h" #include "qmf/org/apache/qpid/broker/Connection.h" namespace qpid { @@ -34,28 +36,38 @@ namespace broker { class Broker; namespace amqp { -class ManagedConnection : public qpid::management::Manageable, public OwnershipToken +class ManagedConnection : public qpid::management::Manageable, public OwnershipToken, public qpid::broker::Connection { public: ManagedConnection(Broker& broker, const std::string id); virtual ~ManagedConnection(); - void setUserid(const std::string&); + virtual void setUserId(const std::string&); std::string getId() const; - std::string getUserid() const; void setSaslMechanism(const std::string&); void setSaslSsf(int); void setContainerId(const std::string&); const std::string& getContainerId() const; + void setPeerProperties(std::map<std::string, types::Variant>&); qpid::management::ManagementObject::shared_ptr GetManagementObject() const; bool isLocal(const OwnershipToken* t) const; void incomingMessageReceived(); void outgoingMessageSent(); + + //ConnectionIdentity + const OwnershipToken* getOwnership() const; + const management::ObjectId getObjectId() const; + const std::string& getUserId() const; + const std::string& getMgmtId() const; + const std::map<std::string, types::Variant>& getClientProperties() const; + virtual bool isLink() const; + void opened(); private: const std::string id; std::string userid; std::string containerid; qmf::org::apache::qpid::broker::Connection::shared_ptr connection; qpid::management::ManagementAgent* agent; + std::map<std::string, types::Variant> peerProperties; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp index 820aaf87d4..1ce5586ace 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp @@ -139,7 +139,7 @@ void Sasl::respond(qpid::SaslServer::Status status, const std::string& chllnge) { switch (status) { case qpid::SaslServer::OK: - connection.setUserid(authenticator->getUserid()); + connection.setUserId(authenticator->getUserid()); completed(true); //can't set authenticated & failed until we have actually sent the outcome state = SUCCESS_PENDING; diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index d1bab7f775..f90bfd1cd9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -120,14 +120,16 @@ class IncomingToQueue : public DecodingIncoming class IncomingToExchange : public DecodingIncoming { public: - IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e) {} + IncomingToExchange(Broker& b, Session& p, boost::shared_ptr<qpid::broker::Exchange> e, pn_link_t* l, const std::string& source) + : DecodingIncoming(l, b, p, source, e->getName(), pn_link_name(l)), exchange(e), authorise(p.getAuthorise()) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr<qpid::broker::Exchange> exchange; + Authorise& authorise; }; Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o) - : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {} + : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) @@ -140,11 +142,11 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te //is it a queue or an exchange? node.properties.read(pn_terminus_properties(terminus)); if (node.properties.isQueue()) { - node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; + node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; } else { qpid::framing::FieldTable args; node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), - args, connection.getUserid(), connection.getId()).first; + args, connection.getUserId(), connection.getId()).first; } } else { size_t i = name.find('@'); @@ -236,8 +238,13 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s { ResolvedNode node = resolve(name, target, true); //set capabilities - if (node.queue) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue); - else if (node.exchange) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange); + if (node.queue) { + setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue); + authorise.incoming(node.queue); + } else if (node.exchange) { + setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange); + authorise.incoming(node.exchange); + } const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link)); if (!sourceAddress) { @@ -260,6 +267,7 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } + if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm); QPID_LOG(debug, "Incoming link attached"); } @@ -282,11 +290,13 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (node.queue) { + authorise.outgoing(node.queue); boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false)); q->init(); filter.apply(q); outgoing[link] = q; } else if (node.exchange) { + authorise.access(node.exchange);//do separate access check before trying to create the queue bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source)); bool durable = pn_terminus_get_durability(source); QueueSettings settings(durable, !durable); @@ -295,7 +305,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s std::stringstream queueName; if (shared) { //just use link name (TODO: could allow this to be - //overridden when acces to link properties is provided + //overridden when access to link properties is provided //(PROTON-335)) queueName << pn_link_name(link); } else { @@ -303,9 +313,9 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s queueName << connection.getContainerId() << "_" << pn_link_name(link); } boost::shared_ptr<qpid::broker::Queue> queue - = broker.createQueue(queueName.str(), settings, this, "", connection.getUserid(), connection.getId()).first; + = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first; if (!shared) queue->setExclusiveOwner(this); - + authorise.outgoing(node.exchange, queue, filter); filter.bind(node.exchange, queue); boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared)); outgoing[link] = q; @@ -460,6 +470,11 @@ void Session::wakeup() out.activateOutput(); } +Authorise& Session::getAuthorise() +{ + return authorise; +} + void IncomingToQueue::handle(qpid::broker::Message& message) { queue->deliver(message); @@ -467,6 +482,7 @@ void IncomingToQueue::handle(qpid::broker::Message& message) void IncomingToExchange::handle(qpid::broker::Message& message) { + authorise.route(exchange, message); DeliverableMessage deliverable(message, 0); exchange->route(deliverable); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 19922f3ee1..78d44a1a18 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -23,6 +23,7 @@ */ #include "qpid/sys/Mutex.h" #include "qpid/sys/OutputControl.h" +#include "qpid/broker/amqp/Authorise.h" #include "qpid/broker/amqp/ManagedSession.h" #include "qpid/broker/amqp/NodeProperties.h" #include <deque> @@ -75,6 +76,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses void accepted(pn_delivery_t*, bool sync); void wakeup(); + + Authorise& getAuthorise(); private: typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks; typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks; @@ -88,6 +91,8 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses bool deleted; qpid::sys::Mutex lock; std::set< boost::shared_ptr<Queue> > exclusiveQueues; + Authorise authorise; + struct ResolvedNode { boost::shared_ptr<qpid::broker::Exchange> exchange; diff --git a/qpid/cpp/src/qpid/broker/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp index a127f9bee2..6732b66ed4 100644 --- a/qpid/cpp/src/qpid/broker/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp @@ -18,7 +18,7 @@ * under the License. * */ -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/SessionOutputException.h" @@ -59,6 +59,7 @@ namespace _qmf = qmf::org::apache::qpid::broker; namespace qpid { namespace broker { +namespace amqp_0_10 { struct ConnectionTimeoutTask : public sys::TimerTask { sys::Timer& timer; @@ -160,7 +161,6 @@ Connection::Connection(ConnectionOutputHandler* out_, mgmtObject = _qmf::Connection::shared_ptr(new _qmf::Connection(agent, this, parent, mgmtId, !link, false, "AMQP 0-10")); agent->addObject(mgmtObject, objectId); } - setUrl(mgmtId); } } @@ -542,4 +542,4 @@ void Connection::restartTimeout() bool Connection::isOpen() { return adapter.isOpen(); } -}} +}}} diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h new file mode 100644 index 0000000000..5411b883ef --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.h @@ -0,0 +1,235 @@ +#ifndef QPID_BROKER_AMQP_0_10_CONNECTION_H +#define QPID_BROKER_AMQP_0_10_CONNECTION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <memory> +#include <sstream> +#include <vector> +#include <queue> + +#include "qpid/broker/BrokerImportExport.h" + +#include "qpid/broker/ConnectionHandler.h" +#include "qpid/broker/Connection.h" +#include "qpid/broker/OwnershipToken.h" +#include "qpid/management/Manageable.h" +#include "qpid/sys/AggregateOutput.h" +#include "qpid/sys/ConnectionInputHandler.h" +#include "qpid/sys/SecuritySettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/types/Variant.h" +#include "qpid/RefCounted.h" +#include "qpid/Url.h" +#include "qpid/ptr_map.h" + +#include "qmf/org/apache/qpid/broker/Connection.h" + +#include <boost/ptr_container/ptr_map.hpp> +#include <boost/scoped_ptr.hpp> +#include <boost/bind.hpp> + +#include <algorithm> + +namespace qpid { +namespace sys { +class ConnectionOutputHandler; +class Timer; +class TimerTask; +} +namespace broker { + +class Broker; +class LinkRegistry; +class Queue; +class SecureConnection; +class SessionHandler; + +namespace amqp_0_10 { +struct ConnectionTimeoutTask; + +class Connection : public sys::ConnectionInputHandler, public qpid::broker::Connection, + public OwnershipToken, public management::Manageable, + public RefCounted +{ + public: + uint32_t getFrameMax() const { return framemax; } + uint16_t getHeartbeat() const { return heartbeat; } + uint16_t getHeartbeatMax() const { return heartbeatmax; } + + void setFrameMax(uint32_t fm) { framemax = std::max(fm, (uint32_t) 4096); } + void setHeartbeat(uint16_t hb) { heartbeat = hb; } + void setHeartbeatMax(uint16_t hbm) { heartbeatmax = hbm; } + + + const OwnershipToken* getOwnership() const { return this; }; + const management::ObjectId getObjectId() const { return GetManagementObject()->getObjectId(); }; + const std::string& getUserId() const { return userId; } + + void setUserProxyAuth(const bool b); + bool isUserProxyAuth() const { return userProxyAuth || federationPeerTag.size() > 0; } // links can proxy msgs with non-matching auth ids + bool isFederationLink() const { return federationPeerTag.size() > 0; } + void setFederationPeerTag(const std::string& tag) { federationPeerTag = std::string(tag); } + const std::string& getFederationPeerTag() const { return federationPeerTag; } + std::vector<Url>& getKnownHosts() { return knownHosts; } + + /**@return true if user is the authenticated user on this connection. + * If id has the default realm will also compare plain username. + */ + bool isAuthenticatedUser(const std::string& id) const { + return (id == userId || (isDefaultRealm && id == userName)); + } + + Broker& getBroker() { return broker; } + + sys::ConnectionOutputHandler& getOutput() { return *out; } + void activateOutput(); + void addOutputTask(OutputTask*); + void removeOutputTask(OutputTask*); + framing::ProtocolVersion getVersion() const { return version; } + + Connection(sys::ConnectionOutputHandler* out, + Broker& broker, + const std::string& mgmtId, + const qpid::sys::SecuritySettings&, + bool isLink = false, + uint64_t objectId = 0); + + ~Connection (); + + /** Get the SessionHandler for channel. Create if it does not already exist */ + SessionHandler& getChannel(framing::ChannelId channel); + + /** Close the connection. Waits for the client to respond with close-ok + * before actually destroying the connection. + */ + QPID_BROKER_EXTERN void close( + framing::connection::CloseCode code, const std::string& text); + + /** Abort the connection. Close abruptly and immediately. */ + QPID_BROKER_EXTERN void abort(); + + // ConnectionInputHandler methods + void received(framing::AMQFrame& frame); + bool doOutput(); + void closed(); + + void closeChannel(framing::ChannelId channel); + + // Manageable entry points + management::ManagementObject::shared_ptr GetManagementObject(void) const; + management::Manageable::status_t + ManagementMethod (uint32_t methodId, management::Args& args, std::string&); + + void requestIOProcessing (boost::function0<void>); + void recordFromServer (const framing::AMQFrame& frame); + void recordFromClient (const framing::AMQFrame& frame); + + // gets for configured federation links + std::string getAuthMechanism(); + std::string getAuthCredentials(); + std::string getUsername(); + std::string getPassword(); + std::string getHost(); + uint16_t getPort(); + + void notifyConnectionForced(const std::string& text); + void setUserId(const std::string& uid); + + // credentials for connected client + const std::string& getMgmtId() const { return mgmtId; } + management::ManagementAgent* getAgent() const { return agent; } + + void setHeartbeatInterval(uint16_t heartbeat); + void sendHeartbeat(); + void restartTimeout(); + + void setSecureConnection(SecureConnection* secured); + + const qpid::sys::SecuritySettings& getExternalSecuritySettings() const + { + return securitySettings; + } + + /** @return true if the initial connection negotiation is complete. */ + bool isOpen(); + + bool isLink() const { return link; } + void startLinkHeartbeatTimeoutTask(); + + void setClientProperties(const types::Variant::Map& cp) { clientProperties = cp; } + const types::Variant::Map& getClientProperties() const { return clientProperties; } + + private: + // Management object is used in the constructor so must be early + qmf::org::apache::qpid::broker::Connection::shared_ptr mgmtObject; + + //contained output tasks + sys::AggregateOutput outputTasks; + + boost::scoped_ptr<framing::FrameHandler> outboundTracker; + boost::scoped_ptr<sys::ConnectionOutputHandler> out; + + Broker& broker; + + framing::ProtocolVersion version; + uint32_t framemax; + uint16_t heartbeat; + uint16_t heartbeatmax; + std::string userId; + bool userProxyAuth; + std::string federationPeerTag; + std::vector<Url> knownHosts; + std::string userName; + bool isDefaultRealm; + + typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap; + + ChannelMap channels; + qpid::sys::SecuritySettings securitySettings; + const bool link; + ConnectionHandler adapter; + bool mgmtClosing; + const std::string mgmtId; + sys::Mutex ioCallbackLock; + std::queue<boost::function0<void> > ioCallbacks; + LinkRegistry& links; + management::ManagementAgent* agent; + sys::Timer& timer; + boost::intrusive_ptr<sys::TimerTask> heartbeatTimer, linkHeartbeatTimer; + boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer; + uint64_t objectId; + types::Variant::Map clientProperties; + +friend class OutboundFrameTracker; + + void sent(const framing::AMQFrame& f); + void doIoCallbacks(); + + public: + + qmf::org::apache::qpid::broker::Connection::shared_ptr getMgmtObject() { return mgmtObject; } +}; + +}}} + +#endif /*!QPID_BROKER_AMQP_0_10_CONNECTION_H*/ diff --git a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp index bb706e53b1..42ced2988d 100644 --- a/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp +++ b/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp @@ -22,7 +22,7 @@ #include "HaBroker.h" #include "QueueReplicator.h" #include "qpid/broker/Broker.h" -#include "qpid/broker/Connection.h" +#include "qpid/broker/amqp_0_10/Connection.h" #include "qpid/broker/ConnectionObserver.h" #include "qpid/broker/Queue.h" #include "qpid/broker/QueueSettings.h" @@ -378,7 +378,7 @@ void BrokerReplicator::connected(Bridge& bridge, SessionHandler& sessionHandler) connection = link->getConnection(); assert(connection); userId = link->getConnection()->getUserId(); - remoteHost = link->getConnection()->getUrl(); + remoteHost = link->getConnection()->getMgmtId(); link->getRemoteAddress(primary); string queueName = bridge.getQueueName(); diff --git a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp index 66e841e988..c9c5c2e576 100644 --- a/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp +++ b/qpid/cpp/src/qpid/ha/ConnectionObserver.cpp @@ -23,7 +23,7 @@ #include "BrokerInfo.h" #include "HaBroker.h" #include "qpid/Url.h" -#include "qpid/framing/FieldTable.h" +#include "qpid/types/Variant.h" #include "qpid/broker/Connection.h" #include "qpid/log/Statement.h" @@ -34,21 +34,23 @@ ConnectionObserver::ConnectionObserver(HaBroker& hb, const types::Uuid& uuid) : haBroker(hb), logPrefix("Backup: "), self(uuid) {} bool ConnectionObserver::getBrokerInfo(const broker::Connection& connection, BrokerInfo& info) { - framing::FieldTable ft; - if (connection.getClientProperties().getTable(ConnectionObserver::BACKUP_TAG, ft)) { - info = BrokerInfo(ft); + qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::BACKUP_TAG); + if (i != connection.getClientProperties().end() && i->second.getType() == qpid::types::VAR_MAP) { + info = BrokerInfo(i->second.asMap()); return true; } return false; } bool ConnectionObserver::getAddress(const broker::Connection& connection, Address& addr) { - Url url; - url.parseNoThrow( - connection.getClientProperties().getAsString(ConnectionObserver::ADDRESS_TAG).c_str()); - if (!url.empty()) { - addr = url[0]; - return true; + qpid::types::Variant::Map::const_iterator i = connection.getClientProperties().find(ConnectionObserver::ADDRESS_TAG); + if (i != connection.getClientProperties().end()) { + Url url; + url.parseNoThrow(i->second.asString().c_str()); + if (!url.empty()) { + addr = url[0]; + return true; + } } return false; } @@ -86,7 +88,7 @@ void ConnectionObserver::opened(broker::Connection& connection) { return; } if (connection.isLink()) return; // Allow outgoing links. - if (connection.getClientProperties().isSet(ADMIN_TAG)) { + if (connection.getClientProperties().find(ADMIN_TAG) != connection.getClientProperties().end()) { QPID_LOG(debug, logPrefix << "Accepted admin connection: " << connection.getMgmtId()); return; // No need to call observer, always allow admins. diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index c5c979bfb0..5f653939ec 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -88,10 +88,27 @@ const string keyifyNameStr(const string& name) struct ScopedManagementContext { - ScopedManagementContext(const ConnectionIdentity& p) + const Connection* context; + + ScopedManagementContext(const Connection* p) : context(p) + { + if (p) setManagementExecutionContext(*p); + } + + management::ObjectId getObjectId() const + { + return context ? context->getObjectId() : management::ObjectId(); + } + std::string getUserId() const + { + return context ? context->getUserId() : std::string(); + } + std::string getMgmtId() const { - setManagementExecutionContext(p); + return context ? context->getMgmtId() : std::string(); } + + ~ScopedManagementContext() { resetManagementExecutionContext(); @@ -2288,7 +2305,7 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) } if (opcode == "_method_request") - return handleMethodRequest(body, rte, rtk, cid, msg.getPublisherUserId(), viaLocal); + return handleMethodRequest(body, rte, rtk, cid, context.getUserId(), viaLocal); else if (opcode == "_query_request") return handleGetQuery(body, rte, rtk, cid, viaLocal); else if (opcode == "_agent_locate_request") @@ -2311,9 +2328,9 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) else if (opcode == 'q') handleClassInd (inBuffer, rtk, sequence); else if (opcode == 'S') handleSchemaRequest (inBuffer, rte, rtk, sequence); else if (opcode == 's') handleSchemaResponse (inBuffer, rtk, sequence); - else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, msg.getPublisherObjectId()); + else if (opcode == 'A') handleAttachRequest (inBuffer, rtk, sequence, context.getObjectId()); else if (opcode == 'G') handleGetQuery (inBuffer, rtk, sequence); - else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, msg.getPublisherUserId()); + else if (opcode == 'M') handleMethodRequest (inBuffer, rtk, sequence, context.getMgmtId()); } } @@ -2752,10 +2769,10 @@ ManagementAgent::EventQueue::Batch::const_iterator ManagementAgent::sendEvents( } namespace { -QPID_TSS const ConnectionIdentity* currentPublisher = 0; +QPID_TSS const Connection* currentPublisher = 0; } -void setManagementExecutionContext(const ConnectionIdentity& p) +void setManagementExecutionContext(const Connection& p) { currentPublisher = &p; } @@ -2765,7 +2782,7 @@ void resetManagementExecutionContext() currentPublisher = 0; } -const ConnectionIdentity* getCurrentPublisher() +const Connection* getCurrentPublisher() { return currentPublisher; } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index cb8bb588b9..d2869a705f 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -44,7 +44,6 @@ namespace qpid { namespace broker { class Connection; -class ConnectionIdentity; } namespace sys { class Timer; @@ -379,9 +378,9 @@ private: std::auto_ptr<EventQueue> sendQueue; }; -void setManagementExecutionContext(const broker::ConnectionIdentity&); +void setManagementExecutionContext(const broker::Connection&); void resetManagementExecutionContext(); -const broker::ConnectionIdentity* getCurrentPublisher(); +const broker::Connection* getCurrentPublisher(); }} #endif /*!_ManagementAgent_*/ diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 869d0caebc..4553ebddb3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -34,6 +34,7 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/Uuid.h" #include "qpid/log/Statement.h" +#include "qpid/sys/SystemInfo.h" #include "qpid/sys/Time.h" #include <vector> extern "C" { @@ -125,6 +126,7 @@ void ConnectionContext::open() } QPID_LOG(debug, id << " Opening..."); + setProperties(); pn_connection_open(connection); wakeupDriver(); //want to write while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { @@ -148,7 +150,7 @@ void ConnectionContext::endSession(boost::shared_ptr<SessionContext> ssn) //wait for outstanding sends to settle while (!ssn->settled()) { QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait();//wait until message has been confirmed + wait(ssn);//wait until message has been confirmed } pn_session_close(ssn->session); @@ -165,7 +167,7 @@ void ConnectionContext::close() //wait for outstanding sends to settle while (!i->second->settled()) { QPID_LOG(debug, "Waiting for sends to settle before closing"); - wait();//wait until message has been confirmed + wait(i->second);//wait until message has been confirmed } @@ -304,6 +306,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName()); } lnk->verify(t); + checkClosed(ssn, lnk); QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); } @@ -322,6 +325,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName()); } lnk->verify(s); + checkClosed(ssn, lnk); QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); } @@ -471,8 +475,15 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li { checkClosed(ssn); if ((pn_link_state(lnk) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { + pn_condition_t* error = pn_link_remote_condition(lnk); + std::stringstream text; + if (pn_condition_is_set(error)) { + text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error); + } else { + text << "Link detached by peer"; + } pn_link_close(lnk); - throw qpid::messaging::LinkError("Link detached by peer"); + throw qpid::messaging::LinkError(text.str()); } else if ((pn_link_state(lnk) & IS_CLOSED) == IS_CLOSED) { throw qpid::messaging::LinkError("Link is not attached"); } @@ -692,5 +703,39 @@ bool ConnectionContext::CodecSwitch::canEncode() return parent.canEncode(); } +namespace { +const std::string CLIENT_PROCESS_NAME("qpid.client_process"); +const std::string CLIENT_PID("qpid.client_pid"); +const std::string CLIENT_PPID("qpid.client_ppid"); +pn_bytes_t convert(const std::string& s) +{ + pn_bytes_t result; + result.start = const_cast<char*>(s.data()); + result.size = s.size(); + return result; +} +} +void ConnectionContext::setProperties() +{ + /** + * Enable when proton 0.5 is released and qpidc has been updated + * to use it + * + pn_data_t* data = pn_connection_properties(connection); + pn_data_put_map(data); + pn_data_enter(data); + + pn_data_put_symbol(data, convert(CLIENT_PROCESS_NAME)); + std::string processName = sys::SystemInfo::getProcessName(); + pn_data_put_string(data, convert(processName)); + + pn_data_put_symbol(data, convert(CLIENT_PID)); + pn_data_put_int(data, sys::SystemInfo::getProcessId()); + + pn_data_put_symbol(data, convert(CLIENT_PPID)); + pn_data_put_int(data, sys::SystemInfo::getParentProcessId()); + pn_data_exit(data); + **/ +} }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index fbff27c288..5627bd903d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -150,6 +150,7 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag std::size_t writeProtocolHeader(char* buffer, std::size_t size); std::string getError(); bool useSasl(); + void setProperties(); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 0fa97ab933..d4a5ca4292 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -93,8 +93,24 @@ SenderContext::Delivery* SenderContext::send(const qpid::messaging::Message& mes } } +void SenderContext::check() +{ + if (pn_link_state(sender) & PN_REMOTE_CLOSED && !(pn_link_state(sender) & PN_LOCAL_CLOSED)) { + pn_condition_t* error = pn_link_remote_condition(sender); + std::stringstream text; + if (pn_condition_is_set(error)) { + text << "Link detached by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error); + } else { + text << "Link detached by peer"; + } + pn_link_close(sender); + throw qpid::messaging::LinkError(text.str()); + } +} + uint32_t SenderContext::processUnsettled() { + check(); //remove messages from front of deque once peer has confirmed receipt while (!deliveries.empty() && deliveries.front().delivered()) { deliveries.front().settle(); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index 81d306bab3..e389cd2e35 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -74,6 +74,7 @@ class SenderContext Delivery* send(const qpid::messaging::Message& message); void configure(); void verify(pn_terminus_t*); + void check(); bool settled(); Address getAddress() const; private: diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 64462215f3..9815721fa0 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -144,7 +144,12 @@ bool SessionContext::settled() { bool result = true; for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { - if (!i->second->settled()) result = false; + try { + if (!i->second->settled()) result = false; + } catch (const std::exception&) { + senders.erase(i); + throw; + } } return result; } |
