diff options
| author | Gordon Sim <gsim@apache.org> | 2013-06-26 18:32:47 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-06-26 18:32:47 +0000 |
| commit | 2a9141ac23a2c8459a0ac684f6522898d9abb6e7 (patch) | |
| tree | 955587601cb1427450ef149c88ede566fa3574c2 /qpid/cpp/src | |
| parent | 3440b9189772b355b72b24ced083b0dac71cacb7 (diff) | |
| download | qpid-python-2a9141ac23a2c8459a0ac684f6522898d9abb6e7.tar.gz | |
QPID-4919: Allow definition of topics in AMQP 1.0, composed of an exchange and subscription queue configuration
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1497036 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
29 files changed, 729 insertions, 113 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 3ecbac5cc2..12b7045cf1 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1262,6 +1262,7 @@ set (qpidbroker_SOURCES qpid/broker/FifoDistributor.cpp qpid/broker/MessageGroupManager.cpp qpid/broker/PersistableMessage.cpp + qpid/broker/PersistableObject.cpp qpid/broker/Bridge.cpp qpid/broker/amqp_0_10/Connection.cpp qpid/broker/ConnectionHandler.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 942575ec3d..6e3ee6af43 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -517,6 +517,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/Waitable.h \ qpid/sys/uuid.h \ qpid/sys/unordered_map.h \ + qpid/amqp_0_10/CodecsInternal.h \ qpid/amqp_0_10/Codecs.cpp \ qpid/amqp/CharSequence.h \ qpid/amqp/CharSequence.cpp \ @@ -691,6 +692,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/PersistableExchange.h \ qpid/broker/PersistableMessage.cpp \ qpid/broker/PersistableMessage.h \ + qpid/broker/PersistableObject.h \ + qpid/broker/PersistableObject.cpp \ qpid/broker/PersistableQueue.h \ qpid/broker/Queue.cpp \ qpid/broker/Queue.h \ @@ -805,6 +808,8 @@ amqp_la_LIBADD = libqpidcommon.la amqp_la_SOURCES = \ qpid/broker/amqp/Authorise.h \ qpid/broker/amqp/Authorise.cpp \ + qpid/broker/amqp/BrokerContext.h \ + qpid/broker/amqp/BrokerContext.cpp \ qpid/broker/amqp/Connection.h \ qpid/broker/amqp/Connection.cpp \ qpid/broker/amqp/DataReader.h \ @@ -846,6 +851,8 @@ amqp_la_SOURCES = \ qpid/broker/amqp/SaslClient.cpp \ qpid/broker/amqp/Session.h \ qpid/broker/amqp/Session.cpp \ + qpid/broker/amqp/Topic.h \ + qpid/broker/amqp/Topic.cpp \ qpid/broker/amqp/Translation.h \ qpid/broker/amqp/Translation.cpp diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 5875dd9f9d..2d0ca06250 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -55,6 +55,8 @@ if (BUILD_AMQP) set (amqp_SOURCES qpid/broker/amqp/Authorise.h qpid/broker/amqp/Authorise.cpp + qpid/broker/amqp/BrokerContext.h + qpid/broker/amqp/BrokerContext.cpp qpid/broker/amqp/Connection.h qpid/broker/amqp/Connection.cpp qpid/broker/amqp/DataReader.h @@ -96,6 +98,8 @@ if (BUILD_AMQP) qpid/broker/amqp/SaslClient.cpp qpid/broker/amqp/Session.h qpid/broker/amqp/Session.cpp + qpid/broker/amqp/Topic.h + qpid/broker/amqp/Topic.cpp qpid/broker/amqp/Translation.h qpid/broker/amqp/Translation.cpp ) diff --git a/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp b/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp index ba736c87ea..930f402f85 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Codecs.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/amqp_0_10/Codecs.h" +#include "qpid/amqp_0_10/CodecsInternal.h" #include "qpid/framing/Array.h" #include "qpid/framing/Buffer.h" #include "qpid/framing/FieldTable.h" @@ -188,10 +189,6 @@ template <class T, class U, class F> void _decode(const std::string& data, U& va convert(t, value, f); } -uint32_t encodedSize(const Variant::Map& values); -uint32_t encodedSize(const Variant::List& values); -uint32_t encodedSize(const std::string& value); - uint32_t encodedSize(const Variant& value) { switch (value.getType()) { @@ -290,9 +287,6 @@ void encode(const std::string& value, const std::string& encoding, qpid::framing } } -void encode(const Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer); -void encode(const Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer); - void encode(const Variant& value, qpid::framing::Buffer& buffer) { switch (value.getType()) { diff --git a/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h b/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h new file mode 100644 index 0000000000..98b03c581e --- /dev/null +++ b/qpid/cpp/src/qpid/amqp_0_10/CodecsInternal.h @@ -0,0 +1,41 @@ +#ifndef QPID_AMQP_0_10_CODECSINTERNAL_H +#define QPID_AMQP_0_10_CODECSINTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/types/Variant.h" + +namespace qpid { +namespace framing { +class Buffer; +} +namespace amqp_0_10 { +void encode(const qpid::types::Variant::Map& map, uint32_t len, qpid::framing::Buffer& buffer); +void encode(const qpid::types::Variant::List& list, uint32_t len, qpid::framing::Buffer& buffer); +void encode(const qpid::types::Variant& value, qpid::framing::Buffer& buffer); +void encode(const std::string& value, const std::string& encoding, qpid::framing::Buffer& buffer); +uint32_t encodedSize(const qpid::types::Variant::Map& values); +uint32_t encodedSize(const qpid::types::Variant::List& values); +uint32_t encodedSize(const std::string& value); + +}} // namespace qpid::amqp_0_10 + +#endif /*!QPID_AMQP_0_10_CODECSINTERNAL_H*/ diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 4f1ead352b..69894e1633 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -35,6 +35,7 @@ #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/PersistableObject.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/MessageGroupManager.h" @@ -295,9 +296,10 @@ Broker::Broker(const Broker::Options& conf) : // Default exchnge is not replicated. exchanges.declare(empty, DirectExchange::typeName, false, noReplicateArgs()); + RecoveredObjects objects; if (store.get() != 0) { RecoveryManagerImpl recoverer( - queues, exchanges, links, dtxManager, protocolRegistry); + queues, exchanges, links, dtxManager, protocolRegistry, objects); recoveryInProgress = true; store->recover(recoverer); recoveryInProgress = false; @@ -349,6 +351,8 @@ Broker::Broker(const Broker::Options& conf) : // Initialize plugins Plugin::initializeAll(*this); + //recover any objects via object factories + objects.restore(*this); if(conf.enableMgmt) { if (getAcl()) { diff --git a/qpid/cpp/src/qpid/broker/ObjectFactory.cpp b/qpid/cpp/src/qpid/broker/ObjectFactory.cpp index 5bd516c351..2dd182c428 100644 --- a/qpid/cpp/src/qpid/broker/ObjectFactory.cpp +++ b/qpid/cpp/src/qpid/broker/ObjectFactory.cpp @@ -44,6 +44,16 @@ bool ObjectFactoryRegistry::deleteObject(Broker& broker, const std::string& type return false; } +bool ObjectFactoryRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId) +{ + for (Factories::iterator i = factories.begin(); i != factories.end(); ++i) + { + if ((*i)->recoverObject(broker, type, name, properties, persistenceId)) return true; + } + return false; +} + ObjectFactoryRegistry::~ObjectFactoryRegistry() { for (Factories::iterator i = factories.begin(); i != factories.end(); ++i) diff --git a/qpid/cpp/src/qpid/broker/ObjectFactory.h b/qpid/cpp/src/qpid/broker/ObjectFactory.h index 7a48be3caa..1b8f0356d6 100644 --- a/qpid/cpp/src/qpid/broker/ObjectFactory.h +++ b/qpid/cpp/src/qpid/broker/ObjectFactory.h @@ -41,6 +41,7 @@ class ObjectFactory const std::string& userId, const std::string& connectionId) = 0; virtual bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, const std::string& userId, const std::string& connectionId) = 0; + virtual bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, uint64_t persistenceId) = 0; virtual ~ObjectFactory() {} private: }; @@ -52,6 +53,7 @@ class ObjectFactoryRegistry : public ObjectFactory const std::string& userId, const std::string& connectionId); bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, const std::string& userId, const std::string& connectionId); + bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, uint64_t persistenceId); ~ObjectFactoryRegistry(); void add(ObjectFactory*); diff --git a/qpid/cpp/src/qpid/broker/PersistableObject.cpp b/qpid/cpp/src/qpid/broker/PersistableObject.cpp new file mode 100644 index 0000000000..822f795954 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PersistableObject.cpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PersistableObject.h" +#include "Broker.h" +#include "qpid/amqp_0_10/Codecs.h" +#include "qpid/amqp_0_10/CodecsInternal.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { +namespace { +const std::string UTF8("utf8"); +} +PersistableObject::PersistableObject(const std::string& n, const std::string& t, const qpid::types::Variant::Map p) : name(n), type(t), properties(p), id(0) {} +PersistableObject::PersistableObject() : id(0) {} +PersistableObject::~PersistableObject() {} +const std::string& PersistableObject::getName() const { return name; } +void PersistableObject::setPersistenceId(uint64_t i) const { id = i; } +uint64_t PersistableObject::getPersistenceId() const { return id; } +void PersistableObject::encode(framing::Buffer& buffer) const +{ + buffer.putShortString(type); + buffer.putMediumString(name); + qpid::amqp_0_10::encode(properties, qpid::amqp_0_10::encodedSize(properties), buffer); +} +uint32_t PersistableObject::encodedSize() const +{ + return type.size()+1 + name.size()+2 + qpid::amqp_0_10::encodedSize(properties); +} +void PersistableObject::decode(framing::Buffer& buffer) +{ + buffer.getShortString(type); + buffer.getMediumString(name); + qpid::framing::FieldTable ft; + buffer.get(ft); + qpid::amqp_0_10::translate(ft, properties); +} +bool PersistableObject::recover(Broker& broker) +{ + return broker.getObjectFactoryRegistry().recoverObject(broker, type, name, properties, id); +} + +namespace { +class RecoverableObject : public RecoverableConfig +{ + public: + RecoverableObject(boost::shared_ptr<PersistableObject> o) : object(o) {} + void setPersistenceId(uint64_t id) { object->setPersistenceId(id); } + private: + boost::shared_ptr<PersistableObject> object; +}; +} +boost::shared_ptr<RecoverableConfig> RecoveredObjects::recover(framing::Buffer& buffer) +{ + boost::shared_ptr<PersistableObject> object(new PersistableObject()); + object->decode(buffer); + objects.push_back(object); + return boost::shared_ptr<RecoverableConfig>(new RecoverableObject(object)); +} +void RecoveredObjects::restore(Broker& broker) +{ + //recover objects created through ObjectFactory + for (Objects::iterator i = objects.begin(); i != objects.end(); ++i) { + if (!(*i)->recover(broker)) { + QPID_LOG(warning, "Failed to recover object " << (*i)->name << " of type " << (*i)->type); + } + } +} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/PersistableObject.h b/qpid/cpp/src/qpid/broker/PersistableObject.h new file mode 100644 index 0000000000..c6aff466f7 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/PersistableObject.h @@ -0,0 +1,71 @@ +#ifndef QPID_BROKER_PERSISTABLEOBJECT_H +#define QPID_BROKER_PERSISTABLEOBJECT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PersistableConfig.h" +#include "qpid/types/Variant.h" +#include <vector> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +class Broker; +class RecoverableConfig; +/** + * Generic persistence support for objects created through the brokers + * create method. + */ +class PersistableObject : public PersistableConfig +{ + public: + PersistableObject(const std::string& name, const std::string& type, const qpid::types::Variant::Map properties); + virtual ~PersistableObject(); + const std::string& getName() const; + void setPersistenceId(uint64_t id) const; + uint64_t getPersistenceId() const; + void encode(framing::Buffer& buffer) const; + uint32_t encodedSize() const; + friend class RecoveredObjects; + private: + std::string name; + std::string type; + qpid::types::Variant::Map properties; + mutable uint64_t id; + + PersistableObject(); + void decode(framing::Buffer& buffer); + bool recover(Broker&); +}; + +class RecoveredObjects +{ + public: + boost::shared_ptr<RecoverableConfig> recover(framing::Buffer&); + void restore(Broker&); + private: + typedef std::vector<boost::shared_ptr<PersistableObject> > Objects; + Objects objects; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PERSISTABLEOBJECT_H*/ diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index bfac48a5d3..4bfdfe76f4 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/Message.h" #include "qpid/broker/PersistableMessage.h" +#include "qpid/broker/PersistableObject.h" #include "qpid/broker/Queue.h" #include "qpid/broker/Link.h" #include "qpid/broker/Bridge.h" @@ -40,8 +41,8 @@ namespace qpid { namespace broker { RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, LinkRegistry& _links, - DtxManager& _dtxMgr, ProtocolRegistry& p) - : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p) {} + DtxManager& _dtxMgr, ProtocolRegistry& p, RecoveredObjects& o) + : queues(_queues), exchanges(_exchanges), links(_links), dtxMgr(_dtxMgr), protocols(p), objects(o) {} RecoveryManagerImpl::~RecoveryManagerImpl() {} @@ -145,7 +146,7 @@ RecoverableConfig::shared_ptr RecoveryManagerImpl::recoverConfig(framing::Buffer else if (Bridge::isEncodedBridge(kind)) return RecoverableConfig::shared_ptr(new RecoverableConfigImpl(Bridge::decode (links, buffer))); - return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead + return objects.recover(buffer); } void RecoveryManagerImpl::recoveryComplete() diff --git a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h index 60ca28092d..22ea6f8d04 100644 --- a/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h +++ b/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.h @@ -22,6 +22,7 @@ #define _RecoveryManagerImpl_ #include <list> +#include <vector> #include "qpid/broker/DtxManager.h" #include "qpid/broker/ExchangeRegistry.h" #include "qpid/broker/QueueRegistry.h" @@ -30,17 +31,21 @@ namespace qpid { namespace broker { +class Broker; +class PersistableObject; class ProtocolRegistry; +class RecoveredObjects; - class RecoveryManagerImpl : public RecoveryManager{ + class RecoveryManagerImpl : public RecoveryManager { QueueRegistry& queues; ExchangeRegistry& exchanges; LinkRegistry& links; DtxManager& dtxMgr; ProtocolRegistry& protocols; + RecoveredObjects& objects; public: RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, LinkRegistry& links, - DtxManager& dtxMgr, ProtocolRegistry&); + DtxManager& dtxMgr, ProtocolRegistry&, RecoveredObjects&); ~RecoveryManagerImpl(); RecoverableExchange::shared_ptr recoverExchange(framing::Buffer& buffer); diff --git a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp new file mode 100644 index 0000000000..b109da961e --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.cpp @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "BrokerContext.h" + +namespace qpid { +namespace broker { +namespace amqp { +BrokerContext::BrokerContext(Broker& b, Interconnects& i, TopicRegistry& t, const std::string& d) : broker(b), interconnects(i), topics(t), domain(d) {} +BrokerContext::BrokerContext(BrokerContext& c) : broker(c.broker), interconnects(c.interconnects), topics(c.topics), domain(c.domain) {} +Broker& BrokerContext::getBroker() { return broker; } +Interconnects& BrokerContext::getInterconnects() { return interconnects; } +TopicRegistry& BrokerContext::getTopics() { return topics; } +std::string BrokerContext::getDomain() { return domain; } +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h new file mode 100644 index 0000000000..81c449c68d --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/BrokerContext.h @@ -0,0 +1,52 @@ +#ifndef QPID_BROKER_AMQP_BROKERCONTEXT_H +#define QPID_BROKER_AMQP_BROKERCONTEXT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> + +namespace qpid { +namespace broker { +class Broker; +namespace amqp { +class Interconnects; +class TopicRegistry; +/** + * Context providing access to broker scoped entities. + */ +class BrokerContext +{ + public: + BrokerContext(Broker&, Interconnects&, TopicRegistry&, const std::string&); + BrokerContext(BrokerContext&); + Broker& getBroker(); + Interconnects& getInterconnects(); + TopicRegistry& getTopics(); + std::string getDomain(); + private: + Broker& broker; + Interconnects& interconnects; + TopicRegistry& topics; + std::string domain; +}; +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_BROKERCONTEXT_H*/ diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 4433419402..fa0d719bf9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -39,11 +39,12 @@ extern "C" { namespace qpid { namespace broker { namespace amqp { -Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid::broker::Broker& b, Interconnects& interconnects_, bool saslInUse, const std::string& d) - : ManagedConnection(b, i), + +Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse) + : BrokerContext(b), ManagedConnection(getBroker(), i), connection(pn_connection()), transport(pn_transport()), - out(o), id(i), broker(b), haveOutput(true), interconnects(interconnects_), domain(d) + out(o), id(i), haveOutput(true) { if (pn_transport_bind(transport, connection)) { //error @@ -54,7 +55,7 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid:: QPID_LOG_TEST_CAT(trace, protocol, enableTrace); if (enableTrace) pn_transport_trace(transport, PN_TRACE_FRM); - broker.getConnectionObservers().connection(*this); + getBroker().getConnectionObservers().connection(*this); if (!saslInUse) { //feed in a dummy AMQP 1.0 header as engine expects one, but //we already read it (if sasl is in use we read the sasl @@ -72,15 +73,11 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, qpid:: Connection::~Connection() { - broker.getConnectionObservers().closed(*this); + getBroker().getConnectionObservers().closed(*this); pn_transport_free(transport); pn_connection_free(connection); } -Interconnects& Connection::getInterconnects() -{ - return interconnects; -} pn_transport_t* Connection::getTransport() { return transport; @@ -173,11 +170,11 @@ void Connection::open() { readPeerProperties(); - pn_connection_set_container(connection, broker.getFederationTag().c_str()); + pn_connection_set_container(connection, getBroker().getFederationTag().c_str()); pn_connection_open(connection); out.connectionEstablished(); opened(); - broker.getConnectionObservers().opened(*this); + getBroker().getConnectionObservers().opened(*this); } void Connection::readPeerProperties() @@ -227,7 +224,7 @@ void Connection::process() for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { QPID_LOG_CAT(debug, model, id << " session begun"); pn_session_open(s); - boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out)); + boost::shared_ptr<Session> ssn(new Session(s, *this, out)); sessions[s] = ssn; } for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) { @@ -323,11 +320,6 @@ std::string Connection::getError() return text.str(); } -std::string Connection::getDomain() const -{ - return domain; -} - void Connection::abort() { out.abort(); @@ -336,7 +328,7 @@ void Connection::abort() void Connection::setUserId(const std::string& user) { ManagedConnection::setUserId(user); - AclModule* acl = broker.getAcl(); + AclModule* acl = getBroker().getAcl(); if (acl && !acl->approveConnection(*this)) { throw Exception(qpid::amqp::error_conditions::RESOURCE_LIMIT_EXCEEDED, "User connection denied by configured limit"); diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index d460f972d2..1384e3560d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -22,6 +22,7 @@ * */ #include "qpid/sys/ConnectionCodec.h" +#include "qpid/broker/amqp/BrokerContext.h" #include "qpid/broker/amqp/ManagedConnection.h" #include <map> #include <boost/shared_ptr.hpp> @@ -42,10 +43,10 @@ class Session; /** * AMQP 1.0 protocol support for broker */ -class Connection : public sys::ConnectionCodec, public ManagedConnection +class Connection : public BrokerContext, public sys::ConnectionCodec, public ManagedConnection { public: - Connection(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, bool saslInUse, const std::string& domain); + Connection(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, bool saslInUse); virtual ~Connection(); size_t decode(const char* buffer, size_t size); virtual size_t encode(char* buffer, size_t size); @@ -56,22 +57,16 @@ class Connection : public sys::ConnectionCodec, public ManagedConnection framing::ProtocolVersion getVersion() const; pn_transport_t* getTransport(); - Interconnects& getInterconnects(); - std::string getDomain() const; void setUserId(const std::string&); void abort(); - protected: typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; pn_transport_t* transport; qpid::sys::OutputControl& out; const std::string id; - qpid::broker::Broker& broker; bool haveOutput; Sessions sessions; - Interconnects& interconnects; - std::string domain; virtual void process(); std::string getError(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp index 57a86da43b..500ec034f4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -50,6 +50,7 @@ const std::string SASL_MECHANISMS("sasl_mechanisms"); const std::string SASL_SERVICE("sasl_service"); const std::string MIN_SSF("min_ssf"); const std::string MAX_SSF("max_ssf"); +const std::string DURABLE("durable"); class Wrapper : public qpid::sys::ConnectionCodec { public: @@ -119,14 +120,23 @@ bool get(qpid::Url& url, const std::string& key, const qpid::types::Variant::Map return false; } } +bool get(const std::string& key, const qpid::types::Variant::Map& map) +{ + qpid::types::Variant::Map::const_iterator i = map.find(key); + if (i != map.end()) { + return i->second.asBool(); + } else { + return false; + } +} } -class InterconnectFactory : public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory> +class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCodec::Factory, public boost::enable_shared_from_this<InterconnectFactory> { public: - InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, Broker&, Interconnects&); + InterconnectFactory(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Domain&, BrokerContext&); InterconnectFactory(bool incoming, const std::string& name, const std::string& source, const std::string& target, - Domain&, Broker&, Interconnects&, boost::shared_ptr<Relay>); + Domain&, BrokerContext&, boost::shared_ptr<Relay>); qpid::sys::ConnectionCodec* create(framing::ProtocolVersion, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); bool connect(); @@ -140,14 +150,12 @@ class InterconnectFactory : public qpid::sys::ConnectionCodec::Factory, public b qpid::Url::iterator next; std::string hostname; Domain& domain; - Broker& broker; - Interconnects& registry; qpid::Address address; boost::shared_ptr<Relay> relay; }; -InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, Broker& b, Interconnects& r) - : incoming(i), name(n), url(d.getUrl()), domain(d), broker(b), registry(r) +InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpid::types::Variant::Map& properties, Domain& d, BrokerContext& c) + : BrokerContext(c), incoming(i), name(n), url(d.getUrl()), domain(d) { get(source, SOURCE, properties); get(target, TARGET, properties); @@ -155,8 +163,8 @@ InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const qpi } InterconnectFactory::InterconnectFactory(bool i, const std::string& n, const std::string& source_, const std::string& target_, - Domain& d, Broker& b, Interconnects& r, boost::shared_ptr<Relay> relay_) - : incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), broker(b), registry(r), relay(relay_) + Domain& d, BrokerContext& c, boost::shared_ptr<Relay> relay_) + : BrokerContext(c), incoming(i), name(n), source(source_), target(target_), url(d.getUrl()), domain(d), relay(relay_) { next = url.begin(); } @@ -168,8 +176,8 @@ qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::framing::ProtocolV qpid::sys::ConnectionCodec* InterconnectFactory::create(qpid::sys::OutputControl& out, const std::string& id, const qpid::sys::SecuritySettings& t) { bool useSasl = domain.getMechanisms() != NONE; - boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, broker, useSasl, incoming, name, source, target, domain, registry)); - if (!relay) registry.add(name, connection); + boost::shared_ptr<Interconnect> connection(new Interconnect(out, id, *this, useSasl, incoming, name, source, target, domain)); + if (!relay) getInterconnects().add(name, connection); else connection->setRelay(relay); std::auto_ptr<qpid::sys::ConnectionCodec> codec; @@ -191,7 +199,7 @@ bool InterconnectFactory::connect() next++; hostname = address.host; QPID_LOG (info, "Inter-broker connection initiated (" << address << ")"); - broker.connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2)); + getBroker().connect(name, address.host, boost::lexical_cast<std::string>(address.port), address.protocol, this, boost::bind(&InterconnectFactory::failed, this, _1, _2)); return true; } @@ -204,7 +212,7 @@ void InterconnectFactory::failed(int, std::string text) } Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties, Broker& b) - : name(n), durable(false), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent()) + : PersistableObject(n, "domain", properties), name(n), durable(get(DURABLE, properties)), broker(b), mechanisms("ANONYMOUS"), service(qpid::saslName), minSsf(0), maxSsf(0), agent(b.getManagementAgent()) { if (!get(url, URL, properties)) { QPID_LOG(error, "No URL specified for domain " << name << "!"); @@ -212,7 +220,6 @@ Domain::Domain(const std::string& n, const qpid::types::Variant::Map& properties } else { QPID_LOG(notice, "Created domain " << name << " with url " << url << " from " << properties); } - //TODO: durable get(username, USERNAME, properties); get(password, PASSWORD, properties); get(mechanisms, SASL_MECHANISMS, properties); @@ -249,21 +256,26 @@ qpid::Url Domain::getUrl() const return url; } +bool Domain::isDurable() const +{ + return durable; +} + std::auto_ptr<qpid::Sasl> Domain::sasl(const std::string& hostname) { return qpid::SaslFactory::getInstance().create(username, password, service, hostname, minSsf, maxSsf, false); } -void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects& registry) +void Domain::connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, BrokerContext& context) { - boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, broker, registry)); + boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, properties, *this, context)); factory->connect(); addPending(factory); } -void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects& registry, boost::shared_ptr<Relay> relay) +void Domain::connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, BrokerContext& context, boost::shared_ptr<Relay> relay) { - boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, broker, registry, relay)); + boost::shared_ptr<InterconnectFactory> factory(new InterconnectFactory(incoming, name, source, target, *this, context, relay)); factory->connect(); addPending(factory); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.h b/qpid/cpp/src/qpid/broker/amqp/Domain.h index d23b3308fe..ce7c590cfb 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.h +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.h @@ -25,6 +25,7 @@ #include "qpid/types/Variant.h" #include "qpid/Url.h" #include "qpid/Version.h" +#include "qpid/broker/PersistableObject.h" #include "qpid/management/Manageable.h" #include "qpid/sys/Mutex.h" #include "qmf/org/apache/qpid/broker/Domain.h" @@ -42,19 +43,20 @@ namespace broker { class Broker; namespace amqp { class InterconnectFactory; -class Interconnects; +class BrokerContext; class Relay; -class Domain : public qpid::management::Manageable +class Domain : public PersistableObject, public qpid::management::Manageable { public: Domain(const std::string& name, const qpid::types::Variant::Map& properties, Broker&); ~Domain(); - void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, Interconnects&); - void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, Interconnects&, boost::shared_ptr<Relay>); + void connect(bool incoming, const std::string& name, const qpid::types::Variant::Map& properties, BrokerContext&); + void connect(bool incoming, const std::string& name, const std::string& source, const std::string& target, BrokerContext&, boost::shared_ptr<Relay>); std::auto_ptr<qpid::Sasl> sasl(const std::string& hostname); const std::string& getMechanisms() const; qpid::Url getUrl() const; + bool isDurable() const; void addPending(boost::shared_ptr<InterconnectFactory>); void removePending(boost::shared_ptr<InterconnectFactory>); boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const; diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp index 4741130bd1..c99166e481 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.cpp @@ -39,9 +39,9 @@ namespace qpid { namespace broker { namespace amqp { -Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse, - bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d, Interconnects& r) - : Connection(out, id, broker, r, true, std::string()), incoming(i), name(n), source(s), target(t), domain(d), registry(r), headerDiscarded(saslInUse), +Interconnect::Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse, + bool i, const std::string& n, const std::string& s, const std::string& t, Domain& d) + : Connection(out, id, broker, true), incoming(i), name(n), source(s), target(t), domain(d), headerDiscarded(saslInUse), closeRequested(false), isTransportDeleted(false) {} @@ -83,10 +83,9 @@ void Interconnect::process() if ((pn_connection_state(connection) & UNINIT) == UNINIT) { QPID_LOG_CAT(debug, model, id << " interconnect opened"); open(); - pn_session_t* s = pn_session(connection); pn_session_open(s); - boost::shared_ptr<Session> ssn(new Session(s, broker, *this, out)); + boost::shared_ptr<Session> ssn(new Session(s, *this, out)); sessions[s] = ssn; pn_link_t* l = incoming ? pn_receiver(s, name.c_str()) : pn_sender(s, name.c_str()); @@ -111,7 +110,7 @@ void Interconnect::deletedFromRegistry() void Interconnect::transportDeleted() { isTransportDeleted = true; - registry.remove(name); + getInterconnects().remove(name); } bool Interconnect::isLink() const diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h index 64d037dae5..86b80ad129 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnect.h +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnect.h @@ -37,8 +37,8 @@ class Relay; class Interconnect : public Connection { public: - Interconnect(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, bool saslInUse, - bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&, Interconnects&); + Interconnect(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& broker, bool saslInUse, + bool incoming, const std::string& name, const std::string& source, const std::string& target, Domain&); void setRelay(boost::shared_ptr<Relay>); ~Interconnect(); size_t encode(char* buffer, size_t size); @@ -51,7 +51,6 @@ class Interconnect : public Connection std::string source; std::string target; Domain& domain; - Interconnects& registry; bool headerDiscarded; boost::shared_ptr<Relay> relay; bool closeRequested; diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp index 8b0891ef3a..8c1cb246ec 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.cpp @@ -30,6 +30,7 @@ #include "qpid/sys/OutputControl.h" #include "qpid/log/Statement.h" #include <boost/shared_ptr.hpp> +#include <assert.h> namespace qpid { namespace broker { @@ -50,6 +51,7 @@ bool Interconnects::createObject(Broker& broker, const std::string& type, const if (i == domains.end()) { boost::shared_ptr<Domain> domain(new Domain(name, properties, broker)); domains[name] = domain; + if (domain->isDurable()) broker.getStore().create(*domain); return true; } else { return false; @@ -72,20 +74,23 @@ bool Interconnects::createObject(Broker& broker, const std::string& type, const throw qpid::Exception(QPID_MSG("Domain must be specified")); } } - domain->connect(type == INCOMING_TYPE, name, properties, *this); + domain->connect(type == INCOMING_TYPE, name, properties, *context); return true; } else { return false; } } -bool Interconnects::deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/, +bool Interconnects::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/, const std::string& /*userId*/, const std::string& /*connectionId*/) { if (type == DOMAIN_TYPE) { + boost::shared_ptr<Domain> domain; qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); DomainMap::iterator i = domains.find(name); if (i != domains.end()) { + domain = i->second; domains.erase(i); + if (domain->isDurable()) broker.getStore().destroy(*domain); return true; } else { throw qpid::Exception(QPID_MSG("No such domain: " << name)); @@ -109,6 +114,20 @@ bool Interconnects::deleteObject(Broker&, const std::string& type, const std::st } } +bool Interconnects::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId) +{ + if (type == DOMAIN_TYPE) { + boost::shared_ptr<Domain> domain(new Domain(name, properties, broker)); + domain->setPersistenceId(persistenceId); + qpid::sys::ScopedLock<qpid::sys::Mutex> l(lock); + domains[name] = domain; + return true; + } else { + return false; + } +} + bool Interconnects::add(const std::string& name, boost::shared_ptr<Interconnect> connection) { @@ -145,7 +164,13 @@ boost::shared_ptr<Domain> Interconnects::findDomain(const std::string& name) } else { return i->second; } - } +void Interconnects::setContext(BrokerContext& c) +{ + context = &c; + assert(&(context->getInterconnects()) == this); +} + +Interconnects::Interconnects() : context(0) {} }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Interconnects.h b/qpid/cpp/src/qpid/broker/amqp/Interconnects.h index 4a7263c8f5..030d6d6446 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Interconnects.h +++ b/qpid/cpp/src/qpid/broker/amqp/Interconnects.h @@ -30,7 +30,7 @@ namespace qpid { namespace broker { namespace amqp { - +class BrokerContext; class Domain; class Interconnect; /** @@ -43,19 +43,23 @@ class Interconnects : public ObjectFactory const std::string& userId, const std::string& connectionId); bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, const std::string& userId, const std::string& connectionId); - + bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId); bool add(const std::string&, boost::shared_ptr<Interconnect>); boost::shared_ptr<Interconnect> get(const std::string&); bool remove(const std::string&); boost::shared_ptr<Domain> findDomain(const std::string&); + void setContext(BrokerContext&); + Interconnects(); private: typedef std::map<std::string, boost::shared_ptr<Interconnect> > InterconnectMap; typedef std::map<std::string, boost::shared_ptr<Domain> > DomainMap; InterconnectMap interconnects; DomainMap domains; qpid::sys::Mutex lock; + BrokerContext* context; }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index 91008d4075..d0311c34d2 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -30,6 +30,7 @@ #include "qpid/broker/amqp/Interconnects.h" #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/Sasl.h" +#include "qpid/broker/amqp/Topic.h" #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/framing/Buffer.h" @@ -50,20 +51,20 @@ struct Options : public qpid::Options { } }; -class ProtocolImpl : public Protocol +class ProtocolImpl : public BrokerContext, public Protocol { public: - ProtocolImpl(Interconnects* i, Broker& b, const std::string& d) : interconnects(i), broker(b), domain(d) + ProtocolImpl(Interconnects* interconnects, TopicRegistry* topics, Broker& broker, const std::string& domain) + : BrokerContext(broker, *interconnects, *topics, domain) { + interconnects->setContext(*this); broker.getObjectFactoryRegistry().add(interconnects);//registry deletes on shutdown + broker.getObjectFactoryRegistry().add(topics);//registry deletes on shutdown } qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const qpid::broker::Message&); boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); private: - Interconnects* interconnects; - Broker& broker; - std::string domain; }; struct ProtocolPlugin : public Plugin @@ -76,7 +77,7 @@ struct ProtocolPlugin : public Plugin //need to register protocol before recovery from store broker::Broker* broker = dynamic_cast<qpid::broker::Broker*>(&target); if (broker) { - ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), *broker, options.domain); + ProtocolImpl* impl = new ProtocolImpl(new Interconnects(), new TopicRegistry(), *broker, options.domain); broker->getProtocolRegistry().add("AMQP 1.0", impl);//registry deletes on shutdown } } @@ -90,22 +91,21 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe { if (v == qpid::framing::ProtocolVersion(1, 0)) { if (v.getProtocol() == qpid::framing::ProtocolVersion::SASL) { - if (broker.getOptions().auth) { + if (getBroker().getOptions().auth) { QPID_LOG(info, "Using AMQP 1.0 (with SASL layer)"); - return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, - qpid::SaslFactory::getInstance().createServer(broker.getOptions().realm,broker.getOptions().requireEncrypted, external), - domain); + return new qpid::broker::amqp::Sasl(out, id, *this, + qpid::SaslFactory::getInstance().createServer(getBroker().getOptions().realm,getBroker().getOptions().requireEncrypted, external)); } else { - std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(broker.getOptions().realm)); + std::auto_ptr<SaslServer> authenticator(new qpid::NullSaslServer(getBroker().getOptions().realm)); QPID_LOG(info, "Using AMQP 1.0 (with dummy SASL layer)"); - return new qpid::broker::amqp::Sasl(out, id, broker, *interconnects, authenticator, domain); + return new qpid::broker::amqp::Sasl(out, id, *this, authenticator); } } else { - if (broker.getOptions().auth) { + if (getBroker().getOptions().auth) { throw qpid::Exception("SASL layer required!"); } else { QPID_LOG(info, "Using AMQP 1.0 (no SASL layer)"); - return new qpid::broker::amqp::Connection(out, id, broker, *interconnects, false, domain); + return new qpid::broker::amqp::Connection(out, id, *this, false); } } } @@ -114,7 +114,7 @@ qpid::sys::ConnectionCodec* ProtocolImpl::create(const qpid::framing::ProtocolVe boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolImpl::translate(const qpid::broker::Message& m) { - qpid::broker::amqp::Translation t(m, &broker); + qpid::broker::amqp::Translation t(m, &getBroker()); return t.getTransfer(); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp index 1ce5586ace..44ac33c896 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.cpp @@ -31,8 +31,8 @@ namespace qpid { namespace broker { namespace amqp { -Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, qpid::broker::Broker& broker, Interconnects& i, std::auto_ptr<qpid::SaslServer> auth, const std::string& domain) - : qpid::amqp::SaslServer(id), out(o), connection(out, id, broker, i, true, domain), +Sasl::Sasl(qpid::sys::OutputControl& o, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> auth) + : qpid::amqp::SaslServer(id), out(o), connection(out, id, context, true), authenticator(auth), state(INCOMPLETE), writeHeader(true), haveOutput(true) { diff --git a/qpid/cpp/src/qpid/broker/amqp/Sasl.h b/qpid/cpp/src/qpid/broker/amqp/Sasl.h index 194ab0a0d5..859cfb2d34 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Sasl.h +++ b/qpid/cpp/src/qpid/broker/amqp/Sasl.h @@ -39,7 +39,7 @@ namespace amqp { class Sasl : public sys::ConnectionCodec, qpid::amqp::SaslServer { public: - Sasl(qpid::sys::OutputControl& out, const std::string& id, qpid::broker::Broker& broker, Interconnects&, std::auto_ptr<qpid::SaslServer> authenticator, const std::string& domain); + Sasl(qpid::sys::OutputControl& out, const std::string& id, BrokerContext& context, std::auto_ptr<qpid::SaslServer> authenticator); ~Sasl(); size_t decode(const char* buffer, size_t size); diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index f90bfd1cd9..ddfbc7de52 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -26,6 +26,7 @@ #include "Domain.h" #include "Interconnects.h" #include "Relay.h" +#include "Topic.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" @@ -128,24 +129,27 @@ class IncomingToExchange : public DecodingIncoming Authorise& authorise; }; -Session::Session(pn_session_t* s, qpid::broker::Broker& b, Connection& c, qpid::sys::OutputControl& o) - : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false), authorise(connection.getUserId(), broker.getAcl()) {} +Session::Session(pn_session_t* s, Connection& c, qpid::sys::OutputControl& o) + : ManagedSession(c.getBroker(), c, (boost::format("%1%") % s).str()), session(s), connection(c), out(o), deleted(false), + authorise(connection.getUserId(), connection.getBroker().getAcl()) {} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus, bool incoming) { ResolvedNode node; - node.exchange = broker.getExchanges().find(name); - node.queue = broker.getQueues().find(name); + node.exchange = connection.getBroker().getExchanges().find(name); + node.queue = connection.getBroker().getQueues().find(name); + node.topic = connection.getTopics().get(name); + if (node.topic) node.exchange = node.topic->getExchange(); if (!node.queue && !node.exchange) { if (pn_terminus_is_dynamic(terminus) || is_capability_requested(CREATE_ON_DEMAND, pn_terminus_capabilities(terminus))) { //is it a queue or an exchange? node.properties.read(pn_terminus_properties(terminus)); if (node.properties.isQueue()) { - node.queue = broker.createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; + node.queue = connection.getBroker().createQueue(name, node.properties.getQueueSettings(), this, node.properties.getAlternateExchange(), connection.getUserId(), connection.getId()).first; } else { qpid::framing::FieldTable args; - node.exchange = broker.createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), + node.exchange = connection.getBroker().createExchange(name, node.properties.getExchangeType(), node.properties.isDurable(), node.properties.getAlternateExchange(), args, connection.getUserId(), connection.getId()).first; } } else { @@ -159,13 +163,16 @@ Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* te if (d) { node.relay = boost::shared_ptr<Relay>(new Relay(1000)); if (incoming) { - d->connect(false, id, name, local, connection.getInterconnects(), node.relay); + d->connect(false, id, name, local, connection, node.relay); } else { - d->connect(true, id, local, name, connection.getInterconnects(), node.relay); + d->connect(true, id, local, name, connection, node.relay); } } } } + } else if (node.queue && node.topic) { + QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or topic, assuming topic"); + node.queue.reset(); } else if (node.queue && node.exchange) { QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); node.exchange.reset(); @@ -255,19 +262,20 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s source = sourceAddress; } if (node.queue) { - boost::shared_ptr<Incoming> q(new IncomingToQueue(broker, *this, node.queue, link, source)); + boost::shared_ptr<Incoming> q(new IncomingToQueue(connection.getBroker(), *this, node.queue, link, source)); incoming[link] = q; } else if (node.exchange) { - boost::shared_ptr<Incoming> e(new IncomingToExchange(broker, *this, node.exchange, link, source)); + boost::shared_ptr<Incoming> e(new IncomingToExchange(connection.getBroker(), *this, node.exchange, link, source)); incoming[link] = e; } else if (node.relay) { - boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, source, name, pn_link_name(link), node.relay)); + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, source, name, pn_link_name(link), node.relay)); incoming[link] = in; } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } - if (broker.getOptions().auth && !connection.isLink()) incoming[link]->verify(connection.getUserId(), broker.getOptions().realm); + if (connection.getBroker().getOptions().auth && !connection.isLink()) + incoming[link]->verify(connection.getUserId(), connection.getBroker().getOptions().realm); QPID_LOG(debug, "Incoming link attached"); } @@ -291,7 +299,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (node.queue) { authorise.outgoing(node.queue); - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, node.queue, link, *this, out, false)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, node.queue, link, *this, out, false)); q->init(); filter.apply(q); outgoing[link] = q; @@ -300,6 +308,11 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source)); bool durable = pn_terminus_get_durability(source); QueueSettings settings(durable, !durable); + if (node.topic) { + settings = node.topic->getPolicy(); + settings.durable = durable; + settings.autodelete = !durable; + } filter.configure(settings); //TODO: populate settings from source details when available from engine std::stringstream queueName; @@ -313,15 +326,15 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s queueName << connection.getContainerId() << "_" << pn_link_name(link); } boost::shared_ptr<qpid::broker::Queue> queue - = broker.createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first; + = connection.getBroker().createQueue(queueName.str(), settings, this, "", connection.getUserId(), connection.getId()).first; if (!shared) queue->setExclusiveOwner(this); authorise.outgoing(node.exchange, queue, filter); filter.bind(node.exchange, queue); - boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(broker, name, target, queue, link, *this, out, !shared)); + boost::shared_ptr<Outgoing> q(new OutgoingFromQueue(connection.getBroker(), name, target, queue, link, *this, out, !shared)); outgoing[link] = q; q->init(); } else if (node.relay) { - boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, name, target, pn_link_name(link), node.relay)); + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, name, target, pn_link_name(link), node.relay)); outgoing[link] = out; out->init(); } else { @@ -344,11 +357,11 @@ void Session::attach(pn_link_t* link, const std::string& src, const std::string& if (relay) { if (pn_link_is_sender(link)) { - boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, broker, *this, src, tgt, pn_link_name(link), relay)); + boost::shared_ptr<Outgoing> out(new OutgoingFromRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay)); outgoing[link] = out; out->init(); } else { - boost::shared_ptr<Incoming> in(new IncomingToRelay(link, broker, *this, src, tgt, pn_link_name(link), relay)); + boost::shared_ptr<Incoming> in(new IncomingToRelay(link, connection.getBroker(), *this, src, tgt, pn_link_name(link), relay)); incoming[link] = in; } } else { diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.h b/qpid/cpp/src/qpid/broker/amqp/Session.h index 78d44a1a18..a991ac9e3e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.h +++ b/qpid/cpp/src/qpid/broker/amqp/Session.h @@ -50,13 +50,14 @@ class Connection; class Incoming; class Outgoing; class Relay; +class Topic; /** * */ class Session : public ManagedSession, public boost::enable_shared_from_this<Session> { public: - Session(pn_session_t*, qpid::broker::Broker&, Connection&, qpid::sys::OutputControl&); + Session(pn_session_t*, Connection&, qpid::sys::OutputControl&); /** * called for links initiated by the peer */ @@ -82,7 +83,6 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses typedef std::map<pn_link_t*, boost::shared_ptr<Outgoing> > OutgoingLinks; typedef std::map<pn_link_t*, boost::shared_ptr<Incoming> > IncomingLinks; pn_session_t* session; - qpid::broker::Broker& broker; Connection& connection; qpid::sys::OutputControl& out; IncomingLinks incoming; @@ -97,6 +97,7 @@ class Session : public ManagedSession, public boost::enable_shared_from_this<Ses { boost::shared_ptr<qpid::broker::Exchange> exchange; boost::shared_ptr<qpid::broker::Queue> queue; + boost::shared_ptr<qpid::broker::amqp::Topic> topic; boost::shared_ptr<Relay> relay; NodeProperties properties; }; diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.cpp b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp new file mode 100644 index 0000000000..7d77343f26 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.cpp @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/amqp/Topic.h" +#include "qpid/broker/Broker.h" +#include "qpid/management/ManagementAgent.h" + +namespace _qmf = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { +namespace amqp { +namespace { +const std::string TOPIC("topic"); +const std::string EXCHANGE("exchange"); +const std::string DURABLE("durable"); +const std::string EMPTY; + +std::string getProperty(const std::string& k, const qpid::types::Variant::Map& m) +{ + qpid::types::Variant::Map::const_iterator i = m.find(k); + if (i == m.end()) return EMPTY; + else return i->second; +} + +bool testProperty(const std::string& k, const qpid::types::Variant::Map& m) +{ + qpid::types::Variant::Map::const_iterator i = m.find(k); + if (i == m.end()) return false; + else return i->second; +} + +} + +Topic::Topic(Broker& broker, const std::string& n, const qpid::types::Variant::Map& properties) + : PersistableObject(n, TOPIC, properties), name(n), durable(testProperty(DURABLE, properties)), exchange(broker.getExchanges().get(getProperty(EXCHANGE, properties))) +{ + if (exchange->getName().empty()) throw qpid::Exception("Exchange must be specified."); + + qpid::types::Variant::Map unused; + policy.populate(properties, unused); + + qpid::management::ManagementAgent* agent = broker.getManagementAgent(); + if (agent != 0) { + topic = _qmf::Topic::shared_ptr(new _qmf::Topic(agent, this, name, exchange->GetManagementObject()->getObjectId(), durable)); + topic->set_properties(policy.asMap()); + agent->addObject(topic); + } +} + +bool Topic::isDurable() const +{ + return durable; +} + +Topic::~Topic() +{ + if (topic != 0) topic->resourceDestroy(); +} + +boost::shared_ptr<qpid::management::ManagementObject> Topic::GetManagementObject() const +{ + return topic; +} + +const QueueSettings& Topic::getPolicy() const +{ + return policy; +} +boost::shared_ptr<Exchange> Topic::getExchange() +{ + return exchange; +} +const std::string& Topic::getName() const +{ + return name; +} + +boost::shared_ptr<Topic> TopicRegistry::createTopic(Broker& broker, const std::string& name, const qpid::types::Variant::Map& properties) +{ + boost::shared_ptr<Topic> topic(new Topic(broker, name, properties)); + add(topic); + return topic; +} + +bool TopicRegistry::createObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& props, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + if (type == TOPIC) { + boost::shared_ptr<Topic> topic = createTopic(broker, name, props); + if (topic->isDurable()) broker.getStore().create(*topic); + return true; + } else { + return false; + } +} + +bool TopicRegistry::deleteObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& /*properties*/, + const std::string& /*userId*/, const std::string& /*connectionId*/) +{ + if (type == TOPIC) { + boost::shared_ptr<Topic> topic = remove(name); + if (topic->isDurable()) broker.getStore().destroy(*topic); + return true; + } else { + return false; + } +} + +bool TopicRegistry::recoverObject(Broker& broker, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId) +{ + if (type == TOPIC) { + boost::shared_ptr<Topic> topic = createTopic(broker, name, properties); + topic->setPersistenceId(persistenceId); + return true; + } else { + return false; + } +} + +bool TopicRegistry::add(boost::shared_ptr<Topic> topic) +{ + qpid::sys::Mutex::ScopedLock l(lock); + Topics::const_iterator i = topics.find(topic->getName()); + if (i == topics.end()) { + topics.insert(Topics::value_type(topic->getName(), topic)); + return true; + } else { + return false; + } + +} +boost::shared_ptr<Topic> TopicRegistry::remove(const std::string& name) +{ + boost::shared_ptr<Topic> result; + qpid::sys::Mutex::ScopedLock l(lock); + Topics::iterator i = topics.find(name); + if (i != topics.end()) { + result = i->second; + topics.erase(i); + } + return result; +} + +boost::shared_ptr<Topic> TopicRegistry::get(const std::string& name) +{ + qpid::sys::Mutex::ScopedLock l(lock); + Topics::const_iterator i = topics.find(name); + if (i == topics.end()) { + return boost::shared_ptr<Topic>(); + } else { + return i->second; + } +} + +}}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Topic.h b/qpid/cpp/src/qpid/broker/amqp/Topic.h new file mode 100644 index 0000000000..decebdb0d4 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/amqp/Topic.h @@ -0,0 +1,88 @@ +#ifndef QPID_BROKER_AMQP_TOPIC_H +#define QPID_BROKER_AMQP_TOPIC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/ObjectFactory.h" +#include "qpid/broker/PersistableObject.h" +#include "qpid/broker/QueueSettings.h" +#include "qpid/sys/Mutex.h" +#include "qpid/types/Variant.h" +#include "qpid/management/Manageable.h" +#include "qmf/org/apache/qpid/broker/Exchange.h" +#include "qmf/org/apache/qpid/broker/Topic.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { +class Broker; +class Exchange; +class QueueDepth; + +namespace amqp { + +/** + * A topic is a node supporting a pub-sub style. It is at present + * implemented by an exchange with an additional policy for handling + * subscription queues. + */ +class Topic : public PersistableObject, public management::Manageable +{ + public: + Topic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); + ~Topic(); + const std::string& getName() const; + const QueueSettings& getPolicy() const; + boost::shared_ptr<Exchange> getExchange(); + bool isDurable() const; + boost::shared_ptr<qpid::management::ManagementObject> GetManagementObject() const; + private: + std::string name; + bool durable; + boost::shared_ptr<Exchange> exchange; + QueueSettings policy; + qmf::org::apache::qpid::broker::Topic::shared_ptr topic; +}; + +class TopicRegistry : public ObjectFactory +{ + public: + bool createObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool deleteObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + const std::string& userId, const std::string& connectionId); + bool recoverObject(Broker&, const std::string& type, const std::string& name, const qpid::types::Variant::Map& properties, + uint64_t persistenceId); + + bool add(boost::shared_ptr<Topic> topic); + boost::shared_ptr<Topic> remove(const std::string& name); + boost::shared_ptr<Topic> get(const std::string& name); + private: + typedef std::map<std::string, boost::shared_ptr<Topic> > Topics; + qpid::sys::Mutex lock; + Topics topics; + + boost::shared_ptr<Topic> createTopic(Broker&, const std::string& name, const qpid::types::Variant::Map& properties); +}; + +}}} // namespace qpid::broker::amqp + +#endif /*!QPID_BROKER_AMQP_TOPIC_H*/ |
