From 565b730bf0229bcc2d56b8962e897ac83c585b5d Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 5 Dec 2014 13:39:23 +0000 Subject: Revert "QPID-6252: AMQP 1.0 browsing client generates large number of errors on broker." This reverts commit 78d7f0727227f13da826180b2fe98f799160a93a. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1643275 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h | 1 - qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 2 +- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 4 +--- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 2 -- qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp | 4 ++-- qpid/cpp/src/qpid/messaging/amqp/SessionContext.h | 2 +- 6 files changed, 5 insertions(+), 10 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h index 66aee1ae22..3ee58cad8d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h @@ -44,7 +44,6 @@ class AddressHelper const qpid::types::Variant::Map& getNodeProperties() const; bool getLinkSource(std::string& out) const; bool getLinkTarget(std::string& out) const; - bool getBrowse() const { return browse; } const qpid::types::Variant::Map& getLinkProperties() const; static std::string getLinkName(const Address& address); private: diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index 9e3f95742b..fedab4286f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -292,7 +292,7 @@ bool ConnectionContext::get(boost::shared_ptr ssn, boost::shared QPID_LOG(debug, "Received message of " << encoded->getSize() << " bytes: "); encoded->init(impl); impl.setEncoded(encoded); - impl.setInternalId(ssn->record(current, lnk->getBrowse())); + impl.setInternalId(ssn->record(current)); pn_link_advance(lnk->receiver); if (lnk->capacity) { pn_link_flow(lnk->receiver, 1); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 454106149d..08cc130a9e 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -36,9 +36,7 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co address(a), helper(address), receiver(pn_receiver(session, name.c_str())), - capacity(0), used(0) -{} - + capacity(0), used(0) {} ReceiverContext::~ReceiverContext() { //pn_link_free(receiver); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 8ded487bf3..2b4e8e1986 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -60,8 +60,6 @@ class ReceiverContext void verify(); Address getAddress() const; bool hasCurrent(); - bool getBrowse() const { return helper.getBrowse(); } - private: friend class ConnectionContext; const std::string name; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index f2b7b24b4c..4e5d71f788 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -110,10 +110,10 @@ uint32_t SessionContext::getUnsettledAcks() return 0;//TODO } -qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery, bool browse) +qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { qpid::framing::SequenceNumber id = next++; - if (!browse) unacked[id] = delivery; + unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index b347c327c5..8c2bb040a6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -75,7 +75,7 @@ class SessionContext qpid::framing::SequenceNumber next; std::string name; - qpid::framing::SequenceNumber record(pn_delivery_t*, bool browse); + qpid::framing::SequenceNumber record(pn_delivery_t*); void acknowledge(); void acknowledge(const qpid::framing::SequenceNumber& id, bool cummulative); void acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end); -- cgit v1.2.1 From 32a91b1044c2fa592cd7e33ea8c54c908b8e04b9 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 5 Dec 2014 13:39:28 +0000 Subject: QPID-6252: AMQP 1.0 browsing client generates large number of errors on broker (better fix) This is a simpler and better fix based on the discussion at: http://qpid.2158936.n2.nabble.com/Re-svn-commit-r1642720-in-qpid-trunk-qpid-cpp-src-qpid-messaging-amqp-AddressHelper-h-ConnectionConth-td7617083.html The changes are all client-side: - A browsing address is unreliable by default. An explicit reliability setting is respected. - The client session does not record pre-settled deliveries in it's unacked list. So by default: - Browsing links are unreliable. Broker sends pre-settled, messages are not recorded in unacked list. - The user is not required to acknowledge browsed messages for proper clean-up. - Calling acknowledge() for a browsed message is a no-op, not an error. If the user explicitly requests a reliable browsing link, then we behave exactly as before. I can't see any value in doing this with qpidd but maybe with some other broker there might be a use for being able to control the accept of browsed messages. This does affect non-browsing, unreliable links but it is an improvement. Settling a pre-settled messages is a no-op, so there is no point in recording pre-settled messages in the unacked list since we do nothing when they are acknoweldged. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1643276 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 3 ++- qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index 763deb33c6..c6ad8cdb6c 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -571,7 +571,8 @@ bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const bool AddressHelper::isUnreliable() const { - return reliability == AT_MOST_ONCE || reliability == UNRELIABLE; + return reliability == AT_MOST_ONCE || reliability == UNRELIABLE || + (reliability.empty() && browse); // A browser defaults to unreliable. } const qpid::types::Variant::Map& AddressHelper::getNodeProperties() const diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 4e5d71f788..824b958af3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -113,7 +113,8 @@ uint32_t SessionContext::getUnsettledAcks() qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { qpid::framing::SequenceNumber id = next++; - unacked[id] = delivery; + if (!pn_delivery_settled(delivery)) + unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } -- cgit v1.2.1 From b5dba684ab72f5f8ea3e28672ba7e65668332dbc Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 17 Dec 2014 14:29:13 +0000 Subject: QPID-5003: Use correct property for delayed auto-delete. Explicit 0 timeout should override default. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646255 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index bbb310b0f4..920dd4172b 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -538,15 +538,16 @@ Subscription::Subscription(const Address& address, const std::string& type) exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) { - const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; - if (timeout) { + + if ((Opt(address)/LINK).hasKey(TIMEOUT)) { + const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) { //if durable but not explicitly reliable, then set a non-zero //default for the autodelete timeout (previously this would //have defaulted to autodelete immediately anyway, so the risk //of the change causing problems is mitigated) - queueOptions.setInt("qpid.auto_delete_delay", 15*60); + queueOptions.setInt("qpid.auto_delete_timeout", 15*60); } (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); -- cgit v1.2.1 From 1412aadeb7b519f1f0bc19ac0274db32cb640ae7 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 17 Dec 2014 14:29:19 +0000 Subject: QPID-6267: honour policy timeout even if it is 0 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646256 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 85da8c0c21..32a923cac5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -505,7 +505,7 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (!settings.autodelete) settings.autodelete = autodelete; altExchange = node.topic->getAlternateExchange(); } - if (!settings.autoDeleteDelay) { + if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) { //only use delay from link if policy didn't specify one settings.autoDeleteDelay = pn_terminus_get_timeout(source); settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; -- cgit v1.2.1 From 72f126ed32ec0bcdccd7caa6a4dab5f6d12e2495 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 17 Dec 2014 14:29:24 +0000 Subject: QPID-6269: trigger autodelete timer on queue recovery git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646257 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index d897029c85..658fc26919 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1258,6 +1258,10 @@ Queue::shared_ptr Queue::restore( QueueRegistry& queues, Buffer& buffer ) if (has_userId) result.first->setOwningUser(_userId); + if (result.first->getSettings().autoDeleteDelay) { + result.first->scheduleAutoDelete(); + } + return result.first; } -- cgit v1.2.1 From 39ccbda213efd951ff7355fa1cf2fdc5da58a96b Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 17 Dec 2014 14:29:31 +0000 Subject: QPID-6167: ensure broker responds with a header for an enabled protocol git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646258 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Protocol.cpp | 23 ++++++++++++++++++++--- qpid/cpp/src/qpid/broker/Protocol.h | 4 +++- qpid/cpp/src/qpid/broker/amqp/Domain.cpp | 4 ++++ qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp | 6 ++++++ qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp | 2 +- qpid/cpp/src/qpid/sys/ConnectionCodec.h | 2 ++ 6 files changed, 36 insertions(+), 5 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/Protocol.cpp b/qpid/cpp/src/qpid/broker/Protocol.cpp index 2ef8c66445..e9e7892499 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.cpp +++ b/qpid/cpp/src/qpid/broker/Protocol.cpp @@ -35,8 +35,10 @@ ProtocolRegistry::ProtocolRegistry(const std::set& e, Broker* b) : qpid::sys::ConnectionCodec* ProtocolRegistry::create(const qpid::framing::ProtocolVersion& v, qpid::sys::OutputControl& o, const std::string& id, const qpid::sys::SecuritySettings& s) { - if (v == qpid::framing::ProtocolVersion(0, 10) && isEnabled(AMQP_0_10)) { - return create_0_10(o, id, s, false); + if (v == qpid::framing::ProtocolVersion(0, 10)) { + if (isEnabled(AMQP_0_10)) { + return create_0_10(o, id, s, false); + } } qpid::sys::ConnectionCodec* codec = 0; for (Protocols::const_iterator i = protocols.begin(); !codec && i != protocols.end(); ++i) { @@ -51,7 +53,22 @@ qpid::sys::ConnectionCodec* ProtocolRegistry::create(qpid::sys::OutputControl& o return create_0_10(o, id, s, true); } -bool ProtocolRegistry::isEnabled(const std::string& name) +qpid::framing::ProtocolVersion ProtocolRegistry::supportedVersion() const +{ + if (isEnabled(AMQP_0_10)) { + return qpid::framing::ProtocolVersion(0,10); + } else { + for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) { + if (isEnabled(i->first)) { + return i->second->supportedVersion(); + } + } + } + QPID_LOG(error, "No enabled protocols!"); + return qpid::framing::ProtocolVersion(0,0); +} + +bool ProtocolRegistry::isEnabled(const std::string& name) const { return enabled.empty()/*if nothing is explicitly enabled, assume everything is*/ || enabled.find(name) != enabled.end(); } diff --git a/qpid/cpp/src/qpid/broker/Protocol.h b/qpid/cpp/src/qpid/broker/Protocol.h index 59a631848e..a7dbc98fff 100644 --- a/qpid/cpp/src/qpid/broker/Protocol.h +++ b/qpid/cpp/src/qpid/broker/Protocol.h @@ -61,6 +61,7 @@ class Protocol virtual qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&) = 0; virtual boost::intrusive_ptr translate(const Message&) = 0; virtual boost::shared_ptr recover(qpid::framing::Buffer&) = 0; + virtual qpid::framing::ProtocolVersion supportedVersion() const = 0; private: }; @@ -70,6 +71,7 @@ class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Fac public: QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); QPID_BROKER_EXTERN qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + QPID_BROKER_EXTERN qpid::framing::ProtocolVersion supportedVersion() const; QPID_BROKER_EXTERN boost::intrusive_ptr translate(const Message&); QPID_BROKER_EXTERN boost::shared_ptr recover(qpid::framing::Buffer&); QPID_BROKER_EXTERN Message decode(qpid::framing::Buffer&); @@ -86,7 +88,7 @@ class ProtocolRegistry : public Protocol, public qpid::sys::ConnectionCodec::Fac Broker* broker; qpid::sys::ConnectionCodec* create_0_10(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&, bool); - bool isEnabled(const std::string&); + bool isEnabled(const std::string&) const; }; }} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp index cc714c0730..c2d4782fc4 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Domain.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Domain.cpp @@ -140,6 +140,10 @@ class InterconnectFactory : public BrokerContext, public qpid::sys::ConnectionCo boost::shared_ptr, BrokerContext&, boost::shared_ptr); qpid::sys::ConnectionCodec* create(const framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); qpid::sys::ConnectionCodec* create(qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); + qpid::framing::ProtocolVersion supportedVersion() const + { + return qpid::framing::ProtocolVersion(1, 0); + } bool connect(); void failed(int, std::string); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp index 2ea381e2bc..621f25f04b 100644 --- a/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/ProtocolPlugin.cpp @@ -73,6 +73,7 @@ class ProtocolImpl : public BrokerContext, public Protocol qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr translate(const qpid::broker::Message&); boost::shared_ptr recover(qpid::framing::Buffer&); + qpid::framing::ProtocolVersion supportedVersion() const; private: }; @@ -158,5 +159,10 @@ boost::shared_ptr ProtocolImpl::recover(qpid::framing::Buffe } } +qpid::framing::ProtocolVersion ProtocolImpl::supportedVersion() const +{ + return qpid::framing::ProtocolVersion(1,0); +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp index bb59530883..7be625a1a3 100644 --- a/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp +++ b/qpid/cpp/src/qpid/sys/AsynchIOHandler.cpp @@ -150,7 +150,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) { if (!codec) { //TODO: may still want to revise this... //send valid version header & close connection. - write(framing::ProtocolInitiation(framing::highestProtocolVersion)); + write(framing::ProtocolInitiation(factory->supportedVersion())); readError = true; aio->queueWriteClose(); } else { diff --git a/qpid/cpp/src/qpid/sys/ConnectionCodec.h b/qpid/cpp/src/qpid/sys/ConnectionCodec.h index 969a3877e3..8b5b69cdb4 100644 --- a/qpid/cpp/src/qpid/sys/ConnectionCodec.h +++ b/qpid/cpp/src/qpid/sys/ConnectionCodec.h @@ -60,6 +60,8 @@ class ConnectionCodec : public Codec { virtual ConnectionCodec* create( OutputControl&, const std::string& id, const SecuritySettings& ) = 0; + + virtual framing::ProtocolVersion supportedVersion() const = 0; }; }; -- cgit v1.2.1 From faec291315dc851eefa59b63f3c7107888b75f98 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 17 Dec 2014 14:29:44 +0000 Subject: QPID-6274: Delete subscription queue immediately on link close git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646260 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 8 ++++---- qpid/cpp/src/qpid/broker/Queue.h | 4 ++-- qpid/cpp/src/qpid/broker/amqp/Incoming.cpp | 2 +- qpid/cpp/src/qpid/broker/amqp/Incoming.h | 2 +- qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 10 ++++++---- qpid/cpp/src/qpid/broker/amqp/Outgoing.h | 4 ++-- qpid/cpp/src/qpid/broker/amqp/Relay.cpp | 4 ++-- qpid/cpp/src/qpid/broker/amqp/Relay.h | 4 ++-- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 12 ++++++------ 9 files changed, 26 insertions(+), 24 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 658fc26919..b1f7d0524b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1300,10 +1300,10 @@ struct AutoDeleteTask : qpid::sys::TimerTask } }; -void Queue::scheduleAutoDelete() +void Queue::scheduleAutoDelete(bool immediate) { if (canAutoDelete()) { - if (settings.autoDeleteDelay) { + if (!immediate && settings.autoDeleteDelay) { AbsTime time(now(), Duration(settings.autoDeleteDelay * TIME_SEC)); autoDeleteTask = boost::intrusive_ptr(new AutoDeleteTask(shared_from_this(), time)); broker->getTimer().add(autoDeleteTask); @@ -1343,7 +1343,7 @@ bool Queue::isExclusiveOwner(const OwnershipToken* const o) const return o == owner; } -void Queue::releaseExclusiveOwnership() +void Queue::releaseExclusiveOwnership(bool immediateExpiry) { bool unused; { @@ -1355,7 +1355,7 @@ void Queue::releaseExclusiveOwnership() unused = !users.isUsed(); } if (unused && settings.autodelete) { - scheduleAutoDelete(); + scheduleAutoDelete(immediateExpiry); } } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 65a91b8729..efca9b9d40 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -379,7 +379,7 @@ class Queue : public boost::enable_shared_from_this, QPID_BROKER_EXTERN uint32_t getConsumerCount() const; inline const std::string& getName() const { return name; } QPID_BROKER_EXTERN bool isExclusiveOwner(const OwnershipToken* const o) const; - QPID_BROKER_EXTERN void releaseExclusiveOwnership(); + QPID_BROKER_EXTERN void releaseExclusiveOwnership(bool immediateExpiry=false); QPID_BROKER_EXTERN bool setExclusiveOwner(const OwnershipToken* const o); QPID_BROKER_EXTERN bool hasExclusiveConsumer() const; QPID_BROKER_EXTERN bool hasExclusiveOwner() const; @@ -389,7 +389,7 @@ class Queue : public boost::enable_shared_from_this, inline bool isAutoDelete() const { return settings.autodelete; } inline bool isBrowseOnly() const { return settings.isBrowseOnly; } QPID_BROKER_EXTERN bool canAutoDelete() const; - QPID_BROKER_EXTERN void scheduleAutoDelete(); + QPID_BROKER_EXTERN void scheduleAutoDelete(bool immediate=false); QPID_BROKER_EXTERN bool isDeleted() const; const QueueBindings& getBindings() const { return bindings; } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp index ce4c73dead..d4f73fc511 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.cpp @@ -57,7 +57,7 @@ uint32_t Incoming::getCredit() return credit;//TODO: proper flow control } -void Incoming::detached() +void Incoming::detached(bool /*closed*/) { } diff --git a/qpid/cpp/src/qpid/broker/amqp/Incoming.h b/qpid/cpp/src/qpid/broker/amqp/Incoming.h index 1a7064337d..ccf999a256 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Incoming.h +++ b/qpid/cpp/src/qpid/broker/amqp/Incoming.h @@ -42,7 +42,7 @@ class Incoming : public ManagedIncomingLink virtual ~Incoming(); virtual bool doWork();//do anything that requires output virtual bool haveWork();//called when handling input to see whether any output work is needed - virtual void detached(); + virtual void detached(bool closed); virtual void readable(pn_delivery_t* delivery) = 0; void verify(const std::string& userid, const std::string& defaultRealm); void wakeup(); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index 54993d071e..d0b41c6c90 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -156,7 +156,7 @@ bool OutgoingFromQueue::canDeliver() return deliveries[current].delivery == 0 && pn_link_credit(link); } -void OutgoingFromQueue::detached() +void OutgoingFromQueue::detached(bool closed) { QPID_LOG(debug, "Detaching outgoing link " << getName() << " from " << queue->getName()); queue->cancel(shared_from_this()); @@ -164,12 +164,14 @@ void OutgoingFromQueue::detached() for (size_t i = 0 ; i < deliveries.capacity(); ++i) { if (deliveries[i].msg) queue->release(deliveries[i].cursor, true); } - if (exclusive) queue->releaseExclusiveOwnership(); - else if (isControllingUser) queue->releaseFromUse(true); + if (exclusive) { + queue->releaseExclusiveOwnership(closed); + } else if (isControllingUser) { + queue->releaseFromUse(true); + } cancelled = true; } - OutgoingFromQueue::~OutgoingFromQueue() { if (!cancelled && isControllingUser) queue->releaseFromUse(true); diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h index 27d8205fc8..d3825d0894 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.h @@ -70,7 +70,7 @@ class Outgoing : public ManagedOutgoingLink /** * Signals that this link has been detached */ - virtual void detached() = 0; + virtual void detached(bool closed) = 0; /** * Called when a delivery is writable */ @@ -98,7 +98,7 @@ class OutgoingFromQueue : public Outgoing, public qpid::broker::Consumer, public void write(const char* data, size_t size); void handle(pn_delivery_t* delivery); bool canDeliver(); - void detached(); + void detached(bool closed); //Consumer interface: bool deliver(const QueueCursor& cursor, const qpid::broker::Message& msg); diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 83b3e64ee6..495fe800cb 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -163,7 +163,7 @@ void OutgoingFromRelay::handle(pn_delivery_t* delivery) /** * Signals that this link has been detached */ -void OutgoingFromRelay::detached() +void OutgoingFromRelay::detached(bool /*closed*/) { relay->detached(this); } @@ -221,7 +221,7 @@ uint32_t IncomingToRelay::getCredit() return relay->getCredit(); } -void IncomingToRelay::detached() +void IncomingToRelay::detached(bool /*closed*/) { relay->detached(this); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.h b/qpid/cpp/src/qpid/broker/amqp/Relay.h index ef700690fd..32f317bfe1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.h +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.h @@ -100,7 +100,7 @@ class OutgoingFromRelay : public Outgoing const std::string& target, const std::string& name, boost::shared_ptr); bool doWork(); void handle(pn_delivery_t* delivery); - void detached(); + void detached(bool closed); void init(); void setSubjectFilter(const std::string&); void setSelectorFilter(const std::string&); @@ -118,7 +118,7 @@ class IncomingToRelay : public Incoming bool settle(); bool doWork(); bool haveWork(); - void detached(); + void detached(bool closed); void readable(pn_delivery_t* delivery); uint32_t getCredit(); private: diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 32a923cac5..2e7d30118a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -577,7 +577,7 @@ void Session::detach(pn_link_t* link) if (pn_link_is_sender(link)) { OutgoingLinks::iterator i = outgoing.find(link); if (i != outgoing.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); boost::shared_ptr q = OutgoingFromQueue::getExclusiveSubscriptionQueue(i->second.get()); if (q && !q->isAutoDelete() && !q->isDeleted()) { connection.getBroker().deleteQueue(q->getName(), connection.getUserId(), connection.getMgmtId()); @@ -588,7 +588,7 @@ void Session::detach(pn_link_t* link) } else { IncomingLinks::iterator i = incoming.find(link); if (i != incoming.end()) { - i->second->detached(); + i->second->detached(true/*TODO: checked whether actually closed; see PROTON-773*/); incoming.erase(i); QPID_LOG(debug, "Incoming link detached"); } @@ -653,7 +653,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(s->first); - s->second->detached(); + s->second->detached(true); outgoing.erase(s++); output = true; } @@ -678,7 +678,7 @@ bool Session::dispatch() pn_condition_set_name(error, e.symbol()); pn_condition_set_description(error, e.what()); pn_link_close(i->first); - i->second->detached(); + i->second->detached(true); incoming.erase(i++); output = true; } @@ -690,10 +690,10 @@ bool Session::dispatch() void Session::close() { for (OutgoingLinks::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { - i->second->detached(); + i->second->detached(false); } for (IncomingLinks::iterator i = incoming.begin(); i != incoming.end(); ++i) { - i->second->detached(); + i->second->detached(false); } outgoing.clear(); incoming.clear(); -- cgit v1.2.1 From b2e043b69e7049c31fb0a75bb3e41f2550223a24 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 17 Dec 2014 14:29:50 +0000 Subject: QPID-6275: reduce default autodelete timeout for durable subscriptions git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646261 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 2 +- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 920dd4172b..6128f4c0fc 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -547,7 +547,7 @@ Subscription::Subscription(const Address& address, const std::string& type) //default for the autodelete timeout (previously this would //have defaulted to autodelete immediately anyway, so the risk //of the change causing problems is mitigated) - queueOptions.setInt("qpid.auto_delete_timeout", 15*60); + queueOptions.setInt("qpid.auto_delete_timeout", 2*60); } (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(subscriptionOptions); diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index c6ad8cdb6c..e631501977 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -242,7 +242,7 @@ bool replace(Variant::Map& map, const std::string& original, const std::string& } } -const uint32_t DEFAULT_DURABLE_TIMEOUT(15*60);//15 minutes +const uint32_t DEFAULT_DURABLE_TIMEOUT(2*60);//2 minutes const uint32_t DEFAULT_TIMEOUT(0); } -- cgit v1.2.1 From aa51ac52f3bd77d92acf585699bc7429666ad785 Mon Sep 17 00:00:00 2001 From: Ken Giusti Date: Wed, 17 Dec 2014 22:37:37 +0000 Subject: QPID-6255: Use Proton event model in qpidd when available. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646354 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/amqp.cmake | 3 +- qpid/cpp/src/config.h.cmake | 1 + qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 289 ++++++++++++++++++--------- qpid/cpp/src/qpid/broker/amqp/Connection.h | 15 ++ qpid/cpp/src/qpid/broker/amqp/Relay.cpp | 8 +- 5 files changed, 225 insertions(+), 91 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 3be9f520e0..77ade87c8e 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -22,7 +22,7 @@ find_package(Proton 0.5) set (amqp_default ${amqp_force}) -set (maximum_version 0.7) +set (maximum_version 0.8) if (Proton_FOUND) if (Proton_VERSION GREATER ${maximum_version}) message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version and might not be compatible, ${maximum_version} is highest tested; build may not work") @@ -35,6 +35,7 @@ if (Proton_FOUND) endif (NOT Proton_VERSION EQUAL 0.5) if (Proton_VERSION GREATER 0.7) set (USE_PROTON_TRANSPORT_CONDITION 1) + set (HAVE_PROTON_EVENTS 1) endif (Proton_VERSION GREATER 0.7) else () message(STATUS "Qpid proton not found, amqp 1.0 support not enabled") diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake index f8139262d5..dbfc4ced8a 100644 --- a/qpid/cpp/src/config.h.cmake +++ b/qpid/cpp/src/config.h.cmake @@ -58,5 +58,6 @@ #cmakedefine HAVE_LOG_FTP #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION +#cmakedefine HAVE_PROTON_EVENTS #endif /* QPID_CONFIG_H */ diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 04bbe8b944..f04cd8eb6e 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -37,6 +37,9 @@ extern "C" { #include #include +#ifdef HAVE_PROTON_EVENTS +#include +#endif } namespace qpid { @@ -117,8 +120,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated), connection(pn_connection()), transport(pn_transport()), + collector(0), out(o), id(i), haveOutput(true), closeInitiated(false), closeRequested(false) { +#ifdef HAVE_PROTON_EVENTS + collector = pn_collector(); + pn_connection_collect(connection, collector); +#endif + if (pn_transport_bind(transport, connection)) { //error QPID_LOG(error, "Failed to bind transport to connection: " << getError()); @@ -157,6 +166,9 @@ Connection::~Connection() getBroker().getConnectionObservers().closed(*this); pn_transport_free(transport); pn_connection_free(connection); +#ifdef HAVE_PROTON_EVENTS + pn_collector_free(collector); +#endif } pn_transport_t* Connection::getTransport() @@ -222,10 +234,15 @@ size_t Connection::encode(char* buffer, size_t size) void Connection::doOutput(size_t capacity) { - for (ssize_t n = pn_transport_pending(transport); n > 0 && n < (ssize_t) capacity; n = pn_transport_pending(transport)) { - if (dispatch()) processDeliveries(); - else break; - } + ssize_t n = 0; + do { + if (dispatch()) { + processDeliveries(); + ssize_t next = pn_transport_pending(transport); + if (n == next) break; + n = next; + } else break; + } while (n > 0 && n < (ssize_t) capacity); } bool Connection::dispatch() @@ -327,85 +344,70 @@ framing::ProtocolVersion Connection::getVersion() const { return qpid::framing::ProtocolVersion(1,0); } -namespace { -pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; -pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; -} void Connection::process() { QPID_LOG(trace, id << " process()"); +#ifdef HAVE_PROTON_EVENTS + pn_event_t *event = pn_collector_peek(collector); + while (event) { + switch (pn_event_type(event)) { + case PN_CONNECTION_REMOTE_OPEN: + doConnectionRemoteOpen(); + break; + case PN_CONNECTION_REMOTE_CLOSE: + doConnectionRemoteClose(); + break; + case PN_SESSION_REMOTE_OPEN: + doSessionRemoteOpen(pn_event_session(event)); + break; + case PN_SESSION_REMOTE_CLOSE: + doSessionRemoteClose(pn_event_session(event)); + break; + case PN_LINK_REMOTE_OPEN: + doLinkRemoteOpen(pn_event_link(event)); + break; + case PN_LINK_REMOTE_CLOSE: + doLinkRemoteClose(pn_event_link(event)); + break; + case PN_DELIVERY: + doDeliveryUpdated(pn_event_delivery(event)); + break; + default: + break; + } + pn_collector_pop(collector); + event = pn_collector_peek(collector); + } + +#else // !HAVE_PROTON_EVENTS + + const pn_state_t REQUIRES_OPEN = PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE; + const pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; + if ((pn_connection_state(connection) & REQUIRES_OPEN) == REQUIRES_OPEN) { - QPID_LOG_CAT(debug, model, id << " connection opened"); - open(); - setContainerId(pn_connection_remote_container(connection)); + doConnectionRemoteOpen(); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_OPEN); s; s = pn_session_next(s, REQUIRES_OPEN)) { - QPID_LOG_CAT(debug, model, id << " session begun"); - pn_session_open(s); - boost::shared_ptr ssn(new Session(s, *this, out)); - sessions[s] = ssn; + doSessionRemoteOpen(s); } for (pn_link_t* l = pn_link_head(connection, REQUIRES_OPEN); l; l = pn_link_next(l, REQUIRES_OPEN)) { - pn_link_open(l); - - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " Link attached on unknown session!"); - } else { - try { - session->second->attach(l); - QPID_LOG_CAT(debug, protocol, id << " link " << l << " attached on " << pn_link_session(l)); - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } catch (const qpid::framing::UnauthorizedAccessException& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } catch (const std::exception& e) { - QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); - pn_condition_t* error = pn_link_condition(l); - pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); - pn_condition_set_description(error, e.what()); - pn_link_close(l); - } - } + doLinkRemoteOpen(l); } processDeliveries(); for (pn_link_t* l = pn_link_head(connection, REQUIRES_CLOSE); l; l = pn_link_next(l, REQUIRES_CLOSE)) { - pn_link_close(l); - Sessions::iterator session = sessions.find(pn_link_session(l)); - if (session == sessions.end()) { - QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); - } else { - session->second->detach(l); - QPID_LOG_CAT(debug, model, id << " link detached"); - } + doLinkRemoteClose(l); } for (pn_session_t* s = pn_session_head(connection, REQUIRES_CLOSE); s; s = pn_session_next(s, REQUIRES_CLOSE)) { - pn_session_close(s); - Sessions::iterator i = sessions.find(s); - if (i != sessions.end()) { - i->second->close(); - sessions.erase(i); - QPID_LOG_CAT(debug, model, id << " session ended"); - } else { - QPID_LOG(error, id << " peer attempted to close unrecognised session"); - } + doSessionRemoteClose(s); } if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { - QPID_LOG_CAT(debug, model, id << " connection closed"); - pn_connection_close(connection); + doConnectionRemoteClose(); } +#endif // !HAVE_PROTON_EVENTS } namespace { std::string convert(pn_delivery_tag_t in) @@ -415,34 +417,15 @@ std::string convert(pn_delivery_tag_t in) } void Connection::processDeliveries() { - //handle deliveries +#ifdef HAVE_PROTON_EVENTS + // with the event API, there's no way to selectively process only + // the delivery-related events. We have to process all events: + process(); +#else for (pn_delivery_t* delivery = pn_work_head(connection); delivery; delivery = pn_work_next(delivery)) { - pn_link_t* link = pn_delivery_link(delivery); - try { - if (pn_link_is_receiver(link)) { - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - i->second->readable(link, delivery); - } else { - pn_delivery_update(delivery, PN_REJECTED); - } - } else { //i.e. SENDER - Sessions::iterator i = sessions.find(pn_link_session(link)); - if (i != sessions.end()) { - QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); - i->second->writable(link, delivery); - } else { - QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); - } - } - } catch (const Exception& e) { - QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); - pn_condition_t* error = pn_link_condition(link); - pn_condition_set_name(error, e.symbol()); - pn_condition_set_description(error, e.what()); - pn_link_close(link); - } + doDeliveryUpdated(delivery); } +#endif } std::string Connection::getError() @@ -470,4 +453,132 @@ void Connection::closedByManagement() closeRequested = true; out.activateOutput(); } + +// the peer has issued an Open performative +void Connection::doConnectionRemoteOpen() +{ + // respond in kind if we haven't yet + if ((pn_connection_state(connection) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + QPID_LOG_CAT(debug, model, id << " connection opened"); + open(); + setContainerId(pn_connection_remote_container(connection)); + } +} + +// the peer has issued a Close performative +void Connection::doConnectionRemoteClose() +{ + if ((pn_connection_state(connection) & PN_LOCAL_CLOSED) == 0) { + QPID_LOG_CAT(debug, model, id << " connection closed"); + pn_connection_close(connection); + } +} + +// the peer has issued a Begin performative +void Connection::doSessionRemoteOpen(pn_session_t *session) +{ + if ((pn_session_state(session) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + QPID_LOG_CAT(debug, model, id << " session begun"); + pn_session_open(session); + boost::shared_ptr ssn(new Session(session, *this, out)); + sessions[session] = ssn; + } +} + +// the peer has issued an End performative +void Connection::doSessionRemoteClose(pn_session_t *session) +{ + if ((pn_session_state(session) & PN_LOCAL_CLOSED) == 0) { + pn_session_close(session); + Sessions::iterator i = sessions.find(session); + if (i != sessions.end()) { + i->second->close(); + sessions.erase(i); + QPID_LOG_CAT(debug, model, id << " session ended"); + } else { + QPID_LOG(error, id << " peer attempted to close unrecognised session"); + } + } +} + +// the peer has issued an Attach performative +void Connection::doLinkRemoteOpen(pn_link_t *link) +{ + if ((pn_link_state(link) & PN_LOCAL_UNINIT) == PN_LOCAL_UNINIT) { + pn_link_open(link); + Sessions::iterator session = sessions.find(pn_link_session(link)); + if (session == sessions.end()) { + QPID_LOG(error, id << " Link attached on unknown session!"); + } else { + try { + session->second->attach(link); + QPID_LOG_CAT(debug, protocol, id << " link " << link << " attached on " << pn_link_session(link)); + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } catch (const qpid::framing::UnauthorizedAccessException& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, qpid::amqp::error_conditions::UNAUTHORIZED_ACCESS.c_str()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } catch (const std::exception& e) { + QPID_LOG_CAT(error, protocol, "Error on attach: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, qpid::amqp::error_conditions::INTERNAL_ERROR.c_str()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } + } + } +} + +// the peer has issued a Detach performative +void Connection::doLinkRemoteClose(pn_link_t *link) +{ + if ((pn_link_state(link) & PN_LOCAL_CLOSED) == 0) { + pn_link_close(link); + Sessions::iterator session = sessions.find(pn_link_session(link)); + if (session == sessions.end()) { + QPID_LOG(error, id << " peer attempted to detach link on unknown session!"); + } else { + session->second->detach(link); + QPID_LOG_CAT(debug, model, id << " link detached"); + } + } +} + +// the status of the delivery has changed +void Connection::doDeliveryUpdated(pn_delivery_t *delivery) +{ + pn_link_t* link = pn_delivery_link(delivery); + try { + if (pn_link_is_receiver(link)) { + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + i->second->readable(link, delivery); + } else { + pn_delivery_update(delivery, PN_REJECTED); + } + } else { //i.e. SENDER + Sessions::iterator i = sessions.find(pn_link_session(link)); + if (i != sessions.end()) { + QPID_LOG(trace, id << " handling outgoing delivery for " << link << " on session " << pn_link_session(link)); + i->second->writable(link, delivery); + } else { + QPID_LOG(error, id << " Got delivery for non-existent session: " << pn_link_session(link) << ", link: " << link); + } + } + } catch (const Exception& e) { + QPID_LOG_CAT(error, protocol, "Error processing deliveries: " << e.what()); + pn_condition_t* error = pn_link_condition(link); + pn_condition_set_name(error, e.symbol()); + pn_condition_set_description(error, e.what()); + pn_link_close(link); + } +} + }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index 17c5b0ecf0..ea4ce06163 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -31,6 +31,9 @@ struct pn_connection_t; struct pn_session_t; struct pn_transport_t; +struct pn_collector_t; +struct pn_link_t; +struct pn_delivery_t; namespace qpid { namespace sys { @@ -69,6 +72,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man typedef std::map > Sessions; pn_connection_t* connection; pn_transport_t* transport; + pn_collector_t* collector; qpid::sys::OutputControl& out; const std::string id; bool haveOutput; @@ -86,6 +90,17 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void open(); void readPeerProperties(); void closedByManagement(); + + private: + // handle Proton engine events + void doConnectionRemoteOpen(); + void doConnectionRemoteClose(); + void doSessionRemoteOpen(pn_session_t *session); + void doSessionRemoteClose(pn_session_t *session); + void doLinkRemoteOpen(pn_link_t *link); + void doLinkRemoteClose(pn_link_t *link); + void doDeliveryUpdated(pn_delivery_t *delivery); + }; }}} // namespace qpid::broker::amqp diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 495fe800cb..5e7a3af889 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -126,7 +126,13 @@ bool OutgoingFromRelay::doWork() { relay->check(); relay->setCredit(pn_link_credit(link)); - return relay->send(link); + bool worked = relay->send(link); + pn_delivery_t *d = pn_link_current(link); + if (d && pn_delivery_writable(d)) { + handle(d); + return true; + } + return worked; } /** * Called when a delivery is writable -- cgit v1.2.1 From 40e74eaa3f8a345e7bc888e36de79717b7c761d0 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 19 Dec 2014 03:18:57 +0000 Subject: QPID-6278: HA broker abort in TXN soak test The crash appears to be a race condition in async completion exposed by the HA TX code code as follows: 1. Message received and placed on tx-replication queue, completion delayed till backups ack. Completion count goes up for each backup then down as each backup acks. 2. Prepare received, message placed on primary's local persistent queue. Completion count goes up one then down one for local store completion (null store in this case). The race is something like this: - last backup ack arrives (on backup IO thread) and drops completion count to 0. - prepare arrives (on client thread) null store bumps count to 1 and immediately drops to 0. - both threads try to invoke the completion callback, one deletes it while the other is still invoking. The old completion logic assumed that only one thread can see the atomic counter go to 0. It does not handle the count going to 0 in one thread and concurrently being increased and decreased back to 0 in another. This case is introduced by HA transactions because the same message is put onto a tx-replication queue and then put again onto another persistent local queue, so there are two cycles of completion. The new logic fixes this only one call to completion callback is possible in all cases. Also fixed missing lock in ha/Primary.cpp. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1646618 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/AsyncCompletion.h | 5 +++-- qpid/cpp/src/qpid/ha/Primary.cpp | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index 1ab69f32d3..cb5d58977b 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -111,13 +111,14 @@ class AsyncCompletion : public virtual RefCounted qpid::sys::Mutex::ScopedLock l(callbackLock); if (active) { if (callback.get()) { + boost::intrusive_ptr save = callback; + callback = boost::intrusive_ptr(); // Nobody else can run callback. inCallback = true; { qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - callback->completed(sync); + save->completed(sync); } inCallback = false; - callback = boost::intrusive_ptr(); callbackLock.notifyAll(); } active = false; diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 0e87346ac1..870e4723b2 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -482,6 +482,7 @@ shared_ptr Primary::makeTxObserver( { shared_ptr observer = PrimaryTxObserver::create(*this, haBroker, txBuffer); + sys::Mutex::ScopedLock l(lock); txMap[observer->getTxQueue()->getName()] = observer; return observer; } -- cgit v1.2.1 From 81fd3962fe649940eb64262a260997a2da064dc6 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 2 Jan 2015 17:43:14 +0000 Subject: QPID-5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs: WIP, but mostly complete. Needs additional testing. It is now possible to add queues which use a partition other than the broker default by using qpid-config --durable together with --efp--partition-num and/or --efp-pool-file-size git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1649081 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/linearstore/ISSUES | 16 ++-- qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 12 +-- .../src/qpid/linearstore/journal/EmptyFilePool.cpp | 99 +++++++++++++--------- .../src/qpid/linearstore/journal/EmptyFilePool.h | 13 ++- .../linearstore/journal/EmptyFilePoolManager.cpp | 7 +- .../linearstore/journal/EmptyFilePoolPartition.cpp | 69 +++++++++------ .../linearstore/journal/EmptyFilePoolPartition.h | 4 +- .../qpid/linearstore/journal/RecoveryManager.cpp | 34 +++++++- qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp | 14 +-- qpid/cpp/src/qpid/linearstore/journal/jerrno.h | 4 +- .../cpp/src/qpid/linearstore/management-schema.xml | 49 +---------- 11 files changed, 178 insertions(+), 143 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 2d5a389615..26e62482a5 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -25,7 +25,7 @@ Current/pending: ------ ------- ---------------------- 5359 - Linearstore: Implement new management schema and wire into store 5360 - Linearstore: Evaluate and rework logging to produce a consistent log output - 5361 1145359 Linearstore: No tests for linearstore functionality currently exist +* 5361 1145359 Linearstore: No tests for linearstore functionality currently exist svn r.1564893 2014-02-05: Added tx-test-soak.sh svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore @@ -41,11 +41,12 @@ Current/pending: - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP - 1067429 [LinearStore] last file from deleted queue is not moved to EFP - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP -* 5671 [linearstore] Add ability to use disk partitions and select per-queue EFPs +* 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new - 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal - svn r.1643053 2014-11-18: Proposed fix +* 1148807 [linearstore] Restarting broker with empty journal raises confusing warning +* 1066256 [linearstore] Changing EFP size after using store breaks durable queue creation + @@ -147,7 +148,9 @@ NO-JIRA - Added missing Apache copyright/license text svn r.1631360 2014-10-13 Proposed solution 6157 1150397 linearstore: segfault when 2 journals request new journal file from empty EFP svn r.1632504 2014-10-17 Proposed solution by pmoravec - 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute + 6230 1165200 [linearstore] qpid-qls-analyze fails when analyzing empty journal + svn r.1643053 2014-11-18: Proposed fix + 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute svn r.1641689 2014-11-25 Proposed solution @@ -194,7 +197,8 @@ no. svn r Q-JIRA RHBZ Date Alt Committer 34. 1632504 6157 1150397 2014-10-17 (pmoravec) 35. 1636598 5671 2014-11-04 36. 1637985 5671 2014-11-10 -37. 1641689 6248 1167911 2014-11-25 +37. 1643053 6230 1165200 2014-11-18 +38. 1641689 6248 1167911 2014-11-25 See above sections for details on these checkins. diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 77d5703636..68810936e1 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -148,7 +148,7 @@ void MessageStoreImpl::initManagement () mgmtObject = qmf::org::apache::qpid::linearstore::Store::shared_ptr ( new qmf::org::apache::qpid::linearstore::Store(agent, this, broker)); - mgmtObject->set_location(storeDir); + mgmtObject->set_storeDir(storeDir); mgmtObject->set_tplIsInitialized(false); mgmtObject->set_tplDirectory(getTplBaseDir()); mgmtObject->set_tplWritePageSize(tplWCachePgSizeSblks * QLS_SBLK_SIZE_BYTES); @@ -406,7 +406,7 @@ void MessageStoreImpl::create(qpid::broker::PersistableQueue& queue_, if (queue_.getName().size() == 0) { - QLS_LOG(error, "Cannot create store for empty (null) queue name - ignoring and attempting to continue."); + QLS_LOG(error, "Cannot create store for empty (null) queue name - queue create ignored."); return; } @@ -449,15 +449,15 @@ qpid::linearstore::journal::EmptyFilePool* MessageStoreImpl::getEmptyFilePool(const qpid::framing::FieldTable& args_) { qpid::framing::FieldTable::ValuePtr value; qpid::linearstore::journal::efpPartitionNumber_t localEfpPartition = defaultEfpPartitionNumber; - value = args_.get("qpid.efp_partition"); + value = args_.get("qpid.efp_partition_num"); if (value.get() != 0 && !value->empty() && value->convertsTo()) { - localEfpPartition = chkEfpPartition((uint32_t)value->get(), "qpid.efp_partition"); + localEfpPartition = chkEfpPartition((uint32_t)value->get(), "qpid.efp_partition_num"); } qpid::linearstore::journal::efpDataSize_kib_t localEfpFileSizeKib = defaultEfpFileSize_kib; - value = args_.get("qpid.efp_file_size"); + value = args_.get("qpid.efp_pool_file_size"); if (value.get() != 0 && !value->empty() && value->convertsTo()) { - localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get(),"qpid.efp_file_size" ); + localEfpFileSizeKib = chkEfpFileSizeKiB((uint32_t)value->get(), "qpid.efp_pool_file_size"); } return getEmptyFilePool(localEfpPartition, localEfpFileSizeKib); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp index 18f4d3afc3..08db3f75bd 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp @@ -59,10 +59,16 @@ EmptyFilePool::EmptyFilePool(const std::string& efpDirectory, EmptyFilePool::~EmptyFilePool() {} void EmptyFilePool::initialize() { -//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG - std::vector dirList; + if (::mkdir(efpDirectory_.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH)) { // Create EFP dir if it does not yet exist + if (errno != EEXIST) { + std::ostringstream oss; + oss << "directory=" << efpDirectory_ << " " << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_EFP_MKDIR, oss.str(), "EmptyFilePool", "initialize"); + } + } // Process empty files in main dir + std::vector dirList; jdir::read_dir(efpDirectory_, dirList, false, true, false, false); for (std::vector::iterator i = dirList.begin(); i != dirList.end(); ++i) { size_t dotPos = i->rfind("."); @@ -122,14 +128,14 @@ const efpIdentity_t EmptyFilePool::getIdentity() const { std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { std::string emptyFileName = popEmptyFile(); - std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); + std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { // Try again with new UUID for file name newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName(); - if (moveFile(emptyFileName, newFileName)) { + if (!moveFile(emptyFileName, newFileName)) { //std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG - pushEmptyFile(emptyFileName); + pushEmptyFile(emptyFileName); // Return empty file to pool std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\"" << FORMAT_SYSERR(errno); throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile"); @@ -138,7 +144,7 @@ std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) { if (createSymLink(newFileName, symlinkName)) { std::ostringstream oss; oss << "file=\"" << emptyFileName << "\" dest=\"" << newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile"); } return symlinkName; } @@ -189,12 +195,27 @@ efpDataSize_kib_t EmptyFilePool::dataSizeFromDirName_kib(const std::string& dirN } // --- protected functions --- +void EmptyFilePool::checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName) { + if (!ofs.good()) { + if (ofs.is_open()) { + ofs.close(); + } + std::ostringstream oss; + oss << "IO failure: eofbit=" << (ofs.eof()?"T":"F") << " failbit=" << (ofs.fail()?"T":"F") << " badbit=" + << (ofs.bad()?"T":"F") << " file=" << fqFileName << " operation=" << operation << ": " << errorMessage; + throw jexception(jerrno, oss.str(), className, fnName); + } +} std::string EmptyFilePool::createEmptyFile() { std::string efpfn = getEfpFileName(); - if (!overwriteFileContents(efpfn)) { - // TODO: handle failure to prepare new file here - } + overwriteFileContents(efpfn); return efpfn; } @@ -226,24 +247,20 @@ void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) { } } -bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { +void EmptyFilePool::overwriteFileContents(const std::string& fqFileName) { ::file_hdr_t fh; ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_); std::ofstream ofs(fqFileName.c_str(), std::ofstream::out | std::ofstream::binary); - if (ofs.good()) { - ofs.write((char*)&fh, sizeof(::file_hdr_t)); - uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); - while (rem--) - ofs.put('\0'); - ofs.close(); - return true; -//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG - } else { - std::ostringstream oss; - oss << "std::ofstream ofs: file=\"" << fqFileName.c_str() << "\"" << " failed to be open"; - throw jexception(jerrno::JERR_EFP_FOPEN, oss.str(), "EmptyFilePool", "overwriteFileContents"); + checkIosState(ofs, jerrno::JERR_EFP_FOPEN, fqFileName, "constructor", "Failed to create file", "EmptyFilePool", "overwriteFileContents"); + ofs.write((char*)&fh, sizeof(::file_hdr_t)); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "write()", "Failed to write header", "EmptyFilePool", "overwriteFileContents"); + uint64_t rem = ((efpDataSize_kib_ + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB)) * 1024) - sizeof(::file_hdr_t); + while (rem--) { + ofs.put('\0'); + checkIosState(ofs, jerrno::JERR_EFP_FWRITE, fqFileName, "put()", "Failed to put \0", "EmptyFilePool", "overwriteFileContents"); } - return false; + ofs.close(); +//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG } std::string EmptyFilePool::popEmptyFile() { @@ -271,7 +288,7 @@ void EmptyFilePool::pushEmptyFile(const std::string fqFileName) { void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(emptyFileName, returnedFileName)) { + if (!moveFile(emptyFileName, returnedFileName)) { ::unlink(emptyFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG } @@ -283,7 +300,7 @@ void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) { overwriteFileContents(returnedFileName); } std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/' - if (moveFile(returnedFileName, sanitizedEmptyFileName)) { + if (!moveFile(returnedFileName, sanitizedEmptyFileName)) { ::unlink(returnedFileName.c_str()); //std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG } else { @@ -395,18 +412,6 @@ bool EmptyFilePool::validateEmptyFile(const std::string& emptyFileName) const { return true; } -// static -int EmptyFilePool::moveFile(const std::string& from, - const std::string& to) { - if (::rename(from.c_str(), to.c_str())) { - if (errno == EEXIST) return errno; // File name exists - std::ostringstream oss; - oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); - } - return 0; -} - //static int EmptyFilePool::createSymLink(const std::string& fqFileName, const std::string& fqLinkName) { @@ -414,7 +419,7 @@ int EmptyFilePool::createSymLink(const std::string& fqFileName, if (errno == EEXIST) return errno; // File name exists std::ostringstream oss; oss << "file=\"" << fqFileName << "\" symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "createSymLink"); } return 0; } @@ -426,7 +431,7 @@ std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) { if (len < 0) { std::ostringstream oss; oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno); - throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink"); } ::unlink(fqLinkName.c_str()); return std::string(buff, len); @@ -455,4 +460,18 @@ bool EmptyFilePool::isSymlink(const std::string& fqName) { } +// static +bool EmptyFilePool::moveFile(const std::string& from, + const std::string& to) { + if (::rename(from.c_str(), to.c_str())) { + if (errno == EEXIST) { + return false; // File name exists + } + std::ostringstream oss; + oss << "file=\"" << from << "\" dest=\"" << to << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "returnEmptyFile"); + } + return true; +} + }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h index 1a1264fa26..dc567ff917 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h @@ -87,23 +87,30 @@ public: const efpPartitionNumber_t partitionNumber); protected: + void checkIosState(std::ofstream& ofs, + const uint32_t jerrno, + const std::string& fqFileName, + const std::string& operation, + const std::string& errorMessage, + const std::string& className, + const std::string& fnName); std::string createEmptyFile(); std::string getEfpFileName(); void initializeSubDirectory(const std::string& fqDirName); - bool overwriteFileContents(const std::string& fqFileName); + void overwriteFileContents(const std::string& fqFileName); std::string popEmptyFile(); void pushEmptyFile(const std::string fqFileName); void returnEmptyFile(const std::string& emptyFileName); void resetEmptyFileHeader(const std::string& fqFileName); bool validateEmptyFile(const std::string& emptyFileName) const; - static int moveFile(const std::string& fromFqPath, - const std::string& toFqPath); static int createSymLink(const std::string& fqFileName, const std::string& fqLinkName); static std::string deleteSymlink(const std::string& fqLinkName); static bool isFile(const std::string& fqName); static bool isSymlink(const std::string& fqName); + static bool moveFile(const std::string& fromFqPath, + const std::string& toFqPath); }; }}} diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp index 28e1b0b56e..33707578bf 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp @@ -165,9 +165,10 @@ EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpIdentity_t efpIde EmptyFilePool* EmptyFilePoolManager::getEmptyFilePool(const efpPartitionNumber_t partitionNumber, const efpDataSize_kib_t efpDataSize_kib) { EmptyFilePoolPartition* efppp = getEfpPartition(partitionNumber > 0 ? partitionNumber : defaultPartitionNumber_); - if (efppp != 0) - return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_); - return 0; + if (efppp == 0) { + return 0; + } + return efppp->getEmptyFilePool(efpDataSize_kib > 0 ? efpDataSize_kib : defaultEfpDataSize_kib_, true); } void EmptyFilePoolManager::getEmptyFilePools(std::vector& emptyFilePoolList, diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp index a31855e0d8..9b58e8c4ff 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp @@ -85,26 +85,7 @@ EmptyFilePoolPartition::findEmptyFilePools() { } } } - EmptyFilePool* efpp = 0; - try { - efpp = new EmptyFilePool(fqFileName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); - { - slock l(efpMapMutex_); - efpMap_[efpp->dataSize_kib()] = efpp; - } - } - catch (const std::exception& e) { - if (efpp != 0) { - delete efpp; - efpp = 0; - } - std::ostringstream oss; - oss << "EmptyFilePool create failed: " << e.what(); - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - if (efpp != 0) { - efpp->initialize(); - } + createEmptyFilePool(fqFileName); } if (upgradeDirStructureFlag) { std::string oldEfpDir(partitionDir_ + "/efp"); @@ -117,12 +98,17 @@ EmptyFilePoolPartition::findEmptyFilePools() { } } -EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { - slock l(efpMapMutex_); - efpMapItr_t i = efpMap_.find(efpDataSize_kib); - if (i == efpMap_.end()) - return 0; - return i->second; +EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) { + { + slock l(efpMapMutex_); + efpMapItr_t i = efpMap_.find(efpDataSize_kib); + if (i != efpMap_.end()) + return i->second; + } + if (createIfNonExistent) { + return createEmptyFilePool(efpDataSize_kib); + } + return 0; } void EmptyFilePoolPartition::getEmptyFilePools(std::vector& efpList) { @@ -183,7 +169,7 @@ std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNu //static efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::string& name) { if (name.length() == 4 && name[0] == 'p' && ::isdigit(name[1]) && ::isdigit(name[2]) && ::isdigit(name[3])) { - long pn = ::strtol(name.c_str() + 1, 0, 0); + long pn = ::strtol(name.c_str() + 1, 0, 10); if (pn == 0 && errno) { return 0; } else { @@ -195,6 +181,35 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::strin // --- protected functions --- +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { + std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib)); + return createEmptyFilePool(fqEfpDirectoryName); +} + +EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqEfpDirectoryName) { + EmptyFilePool* efpp = 0; + try { + efpp = new EmptyFilePool(fqEfpDirectoryName, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_); + { + slock l(efpMapMutex_); + efpMap_[efpp->dataSize_kib()] = efpp; + } + } + catch (const std::exception& e) { + if (efpp != 0) { + delete efpp; + efpp = 0; + } + std::ostringstream oss; + oss << "EmptyFilePool create failed: " << e.what(); + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } + if (efpp != 0) { + efpp->initialize(); + } + return efpp; +} + void EmptyFilePoolPartition::validatePartitionDir() { if (!jdir::is_dir(partitionDir_)) { std::ostringstream ss; diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h index c653c6be6a..23a541f8f4 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h @@ -59,7 +59,7 @@ public: virtual ~EmptyFilePoolPartition(); void findEmptyFilePools(); - EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent); void getEmptyFilePools(std::vector& efpList); void getEmptyFilePoolSizes_kib(std::vector& efpDataSizesList) const; std::string getPartitionDirectory() const; @@ -70,6 +70,8 @@ public: static efpPartitionNumber_t getPartitionNumber(const std::string& name); protected: + EmptyFilePool* createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib); + EmptyFilePool* createEmptyFilePool(const std::string fqEfpDirectoryName); void validatePartitionDir(); }; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 3f39913422..198b39857c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -43,6 +43,7 @@ #include "qpid/linearstore/journal/utils/file_hdr.h" #include #include +#include #include namespace qpid { @@ -101,7 +102,11 @@ void RecoveryManager::analyzeJournals(const std::vector* preparedTr analyzeJournalFileHeaders(efpIdentity); if (journalEmptyFlag_) { - *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + if (uninitFileList_.empty()) { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(0, 0); // Use default EFP + } else { + *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); + } } else { *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity); if (! *emptyFilePoolPtrPtr) { @@ -294,6 +299,7 @@ void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr, LinearFileController* lfcPtr) { if (journalEmptyFlag_) { if (uninitFileList_.size() > 0) { + // TODO: Handle case if uninitFileList_.size() > 1, but this should not happen in normal operation. Here we assume only one item in the list. std::string uninitFile = uninitFileList_.back(); uninitFileList_.pop_back(); lfcPtr->restoreEmptyFile(uninitFile); @@ -377,11 +383,28 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { jdir::read_dir(journalDirectory_, directoryList, false, true, false, true); for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) { bool hdrOk = readJournalFileHeader(*i, fileHeader, headerQueueName); - if (!hdrOk || headerQueueName.empty()) { + bool hdrEmpty = ::is_file_hdr_reset(&fileHeader); + if (!hdrOk) { std::ostringstream oss; - oss << "Journal file " << (*i) << " is uninitialized or corrupted"; + oss << "Journal file " << (*i) << " is corrupted or invalid"; journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str()); + } else if (hdrEmpty) { + // Read symlink, find efp directory name which is efp size in KiB + // TODO: place this bit into a common function as it is also used in EmptyFilePool.cpp::deleteSymlink() + char buff[1024]; + ssize_t len = ::readlink((*i).c_str(), buff, 1024); + if (len < 0) { + std::ostringstream oss; + oss << "symlink=\"" << (*i) << "\"" << FORMAT_SYSERR(errno); + throw jexception(jerrno::JERR__SYMLINK, oss.str(), "RecoveryManager", "analyzeJournalFileHeaders"); + } + // Find second and third '/' from back of string, which contains the EFP directory name + *(::strrchr(buff, '/')) = '\0'; + *(::strrchr(buff, '/')) = '\0'; + int efpDataSize_kib = atoi(::strrchr(buff, '/') + 1); uninitFileList_.push_back(*i); + efpIdentity.pn_ = fileHeader._efp_partition; + efpIdentity.ds_ = efpDataSize_kib; } else if (headerQueueName.compare(queueName_) != 0) { std::ostringstream oss; oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring"; @@ -406,6 +429,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { } } +std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { @@ -905,7 +929,9 @@ bool RecoveryManager::readJournalFileHeader(const std::string& journalFileName, } ifs.close(); ::memcpy(&fileHeaderRef, buffer, sizeof(::file_hdr_t)); - if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) return false; + if (::file_hdr_check(&fileHeaderRef, QLS_FILE_MAGIC, QLS_JRNL_VERSION, 0, QLS_MAX_QUEUE_NAME_LEN)) { + return false; + } queueName.assign(buffer + sizeof(::file_hdr_t), fileHeaderRef._queue_name_len); return true; } diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp index 9d59039220..ce88e7809c 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp @@ -42,6 +42,7 @@ const uint32_t jerrno::JERR__UNEXPRESPONSE = 0x0108; const uint32_t jerrno::JERR__RECNFOUND = 0x0109; const uint32_t jerrno::JERR__NOTIMPL = 0x010a; const uint32_t jerrno::JERR__NULL = 0x010b; +const uint32_t jerrno::JERR__SYMLINK = 0x010c; // class jcntl const uint32_t jerrno::JERR_JCNTL_STOPPED = 0x0200; @@ -112,10 +113,11 @@ const uint32_t jerrno::JERR_EFP_BADPARTITIONDIR = 0x0d02; const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME = 0x0d03; const uint32_t jerrno::JERR_EFP_NOEFP = 0x0d04; const uint32_t jerrno::JERR_EFP_EMPTY = 0x0d05; -const uint32_t jerrno::JERR_EFP_SYMLINK = 0x0d06; -const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d07; -const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d08; -const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d09; +const uint32_t jerrno::JERR_EFP_LSTAT = 0x0d06; +const uint32_t jerrno::JERR_EFP_BADFILETYPE = 0x0d07; +const uint32_t jerrno::JERR_EFP_FOPEN = 0x0d08; +const uint32_t jerrno::JERR_EFP_FWRITE = 0x0d09; +const uint32_t jerrno::JERR_EFP_MKDIR = 0x0d0a; // Negative returns for some functions const int32_t jerrno::AIO_TIMEOUT = -1; @@ -140,6 +142,7 @@ jerrno::__init() _err_map[JERR__RECNFOUND] = "JERR__RECNFOUND: Record not found."; _err_map[JERR__NOTIMPL] = "JERR__NOTIMPL: Not implemented"; _err_map[JERR__NULL] = "JERR__NULL: Operation on null pointer"; + _err_map[JERR__SYMLINK] = "JERR__SYMLINK: Symbolic link operation failed"; // class jcntl _err_map[JERR_JCNTL_STOPPED] = "JERR_JCNTL_STOPPED: Operation on stopped journal."; @@ -210,10 +213,11 @@ jerrno::__init() _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory"; _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size"; _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty"; - _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed"; _err_map[JERR_EFP_LSTAT] = "JERR_EFP_LSTAT: lstat() operation failed"; _err_map[JERR_EFP_BADFILETYPE] = "JERR_EFP_BADFILETYPE: File type incorrect for operation"; _err_map[JERR_EFP_FOPEN] = "JERR_EFP_FOPEN: Unable to fopen file for write"; + _err_map[JERR_EFP_FWRITE] = "JERR_EFP_FWRITE: Write failed"; + _err_map[JERR_EFP_MKDIR] = "JERR_EFP_MKDIR: Directory creation failed"; //_err_map[] = ""; diff --git a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h index 77b18b17e8..6e817682ca 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/jerrno.h +++ b/qpid/cpp/src/qpid/linearstore/journal/jerrno.h @@ -60,6 +60,7 @@ namespace journal { static const uint32_t JERR__RECNFOUND; ///< Record not found static const uint32_t JERR__NOTIMPL; ///< Not implemented static const uint32_t JERR__NULL; ///< Operation on null pointer + static const uint32_t JERR__SYMLINK; ///< Symbolic Link operation failed // class jcntl static const uint32_t JERR_JCNTL_STOPPED; ///< Operation on stopped journal @@ -130,10 +131,11 @@ namespace journal { static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory static const uint32_t JERR_EFP_NOEFP; ///< No EFP found for given partition and file size static const uint32_t JERR_EFP_EMPTY; ///< EFP empty - static const uint32_t JERR_EFP_SYMLINK; ///< Symbolic Link operation failed static const uint32_t JERR_EFP_LSTAT; ///< lstat operation failed static const uint32_t JERR_EFP_BADFILETYPE; ///< Bad file type static const uint32_t JERR_EFP_FOPEN; ///< Unable to fopen file for write + static const uint32_t JERR_EFP_FWRITE; ///< Write failed + static const uint32_t JERR_EFP_MKDIR; ///< Directory creation failed // Negative returns for some functions static const int32_t AIO_TIMEOUT; ///< Timeout waiting for AIO return diff --git a/qpid/cpp/src/qpid/linearstore/management-schema.xml b/qpid/cpp/src/qpid/linearstore/management-schema.xml index a55883a255..ebd388593e 100644 --- a/qpid/cpp/src/qpid/linearstore/management-schema.xml +++ b/qpid/cpp/src/qpid/linearstore/management-schema.xml @@ -21,38 +21,24 @@ - - - + - - - - - + - - - - - - - - @@ -64,36 +50,5 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - -- cgit v1.2.1 From 367a7d5bb192ee6efb402f14eda617b2736836e4 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 2 Jan 2015 17:45:30 +0000 Subject: NO-JIRA: [linearstore] Update of ISSUES file git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1649082 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/linearstore/ISSUES | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 26e62482a5..2042136bac 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -44,6 +44,7 @@ Current/pending: * 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new + svn r.1649081 2015-01-02: WIP: Specify new queue using qpid-config --durable together with --efp-partition-num and/or --efp-pool-file-size. Needs testing. * 1148807 [linearstore] Restarting broker with empty journal raises confusing warning * 1066256 [linearstore] Changing EFP size after using store breaks durable queue creation @@ -190,15 +191,15 @@ no. svn r Q-JIRA RHBZ Date Alt Committer 28. 1596509 5767 1098118 2014-05-21 0.22-mrg (pmoravec) 29. 1596633 NO-JIRA 1078937 2014-05-21 (includes tools install update) 30. 1599243 5767 1098118 2014-06-02 0.22-mrg -30. 1599243 5767 1098118 2014-06-02 -31. 1614665 5924 1124906 2014-07-30 -32. 1620426 6043 1089652 2014-08-25 -33. 1631360 6147 1152012 2014-10-13 (pmoravec) -34. 1632504 6157 1150397 2014-10-17 (pmoravec) -35. 1636598 5671 2014-11-04 -36. 1637985 5671 2014-11-10 -37. 1643053 6230 1165200 2014-11-18 -38. 1641689 6248 1167911 2014-11-25 +31. 1599243 5767 1098118 2014-06-02 +32. 1614665 5924 1124906 2014-07-30 +33. 1620426 6043 1089652 2014-08-25 +34. 1631360 6147 1152012 2014-10-13 (pmoravec) +35. 1632504 6157 1150397 2014-10-17 (pmoravec) +36. 1636598 5671 2014-11-04 +37. 1637985 5671 2014-11-10 +38. 1643053 6230 1165200 2014-11-18 +39. 1641689 6248 1167911 2014-11-25 See above sections for details on these checkins. -- cgit v1.2.1 From bf0975bc868a3328d467d7e056968d22175f1242 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 7 Jan 2015 23:28:54 +0000 Subject: QPID-6299: allow ring and lvq behaviours to be combined git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1650196 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/CMakeLists.txt | 1 + qpid/cpp/src/qpid/broker/LossyQueue.h | 2 +- qpid/cpp/src/qpid/broker/Lvq.h | 2 +- qpid/cpp/src/qpid/broker/QueueFactory.cpp | 12 ++++++++++-- 4 files changed, 13 insertions(+), 4 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index fe7a809cee..3e5165dfb0 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1077,6 +1077,7 @@ set (qpidbroker_SOURCES qpid/broker/IngressCompletion.cpp qpid/broker/Link.cpp qpid/broker/LinkRegistry.cpp + qpid/broker/LossyLvq.cpp qpid/broker/LossyQueue.cpp qpid/broker/Lvq.cpp qpid/broker/Message.cpp diff --git a/qpid/cpp/src/qpid/broker/LossyQueue.h b/qpid/cpp/src/qpid/broker/LossyQueue.h index 3e62151d6f..705865f449 100644 --- a/qpid/cpp/src/qpid/broker/LossyQueue.h +++ b/qpid/cpp/src/qpid/broker/LossyQueue.h @@ -29,7 +29,7 @@ namespace broker { /** * Drops messages to prevent a breach of any configured maximum depth. */ -class LossyQueue : public Queue +class LossyQueue : public virtual Queue { public: LossyQueue(const std::string&, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); diff --git a/qpid/cpp/src/qpid/broker/Lvq.h b/qpid/cpp/src/qpid/broker/Lvq.h index 335270a073..26ba2b4914 100644 --- a/qpid/cpp/src/qpid/broker/Lvq.h +++ b/qpid/cpp/src/qpid/broker/Lvq.h @@ -32,7 +32,7 @@ class MessageMap; * conjunction with the MessageMap class. This requires an existing * message to be 'replaced' by a newer message with the same key. */ -class Lvq : public Queue +class Lvq : public virtual Queue { public: Lvq(const std::string&, std::auto_ptr, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); diff --git a/qpid/cpp/src/qpid/broker/QueueFactory.cpp b/qpid/cpp/src/qpid/broker/QueueFactory.cpp index 8104fff740..16cdea3b0a 100644 --- a/qpid/cpp/src/qpid/broker/QueueFactory.cpp +++ b/qpid/cpp/src/qpid/broker/QueueFactory.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/Broker.h" #include "qpid/broker/QueueSettings.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/LossyLvq.h" #include "qpid/broker/LossyQueue.h" #include "qpid/broker/Lvq.h" #include "qpid/broker/Messages.h" @@ -51,10 +52,17 @@ boost::shared_ptr QueueFactory::create(const std::string& name, const Que boost::shared_ptr flow_ptr(QueueFlowLimit::createLimit(name, settings)); //1. determine Queue type (i.e. whether we are subclassing Queue) - // -> if 'ring' policy is in use then subclass boost::shared_ptr queue; if (settings.dropMessagesAtLimit) { - queue = boost::shared_ptr(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + // -> if 'ring' policy is in use then subclass + if (settings.lvqKey.size()) { + //combination of ring and lvq: + std::auto_ptr map(new MessageMap(settings.lvqKey)); + queue = boost::shared_ptr(new LossyLvq(name, map, settings, settings.durable ? store : 0, parent, broker)); + } else { + //simple ring: + queue = boost::shared_ptr(new LossyQueue(name, settings, settings.durable ? store : 0, parent, broker)); + } } else if (settings.selfDestructAtLimit) { queue = boost::shared_ptr(new SelfDestructQueue(name, settings, settings.durable ? store : 0, parent, broker)); } else if (settings.lvqKey.size()) { -- cgit v1.2.1 From 12552e67e41a794b66c16b9aa0c1ce79a73f98f9 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Thu, 8 Jan 2015 12:04:09 +0000 Subject: QPID-6299: add files missed out from last commit git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1650259 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/LossyLvq.cpp | 29 +++++++++++++++++++++++++ qpid/cpp/src/qpid/broker/LossyLvq.h | 41 +++++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 qpid/cpp/src/qpid/broker/LossyLvq.cpp create mode 100644 qpid/cpp/src/qpid/broker/LossyLvq.h (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.cpp b/qpid/cpp/src/qpid/broker/LossyLvq.cpp new file mode 100644 index 0000000000..c6b76ecb8c --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LossyLvq.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "LossyLvq.h" + +namespace qpid { +namespace broker { + +LossyLvq::LossyLvq(const std::string& n, std::auto_ptr m, const QueueSettings& s, MessageStore* const ms, management::Manageable* p, Broker* b) + : Queue(n, s, ms, p, b), Lvq(n, m, s, ms, p, b), LossyQueue(n, s, ms, p, b) {} + +}} // namespace qpid::broker diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.h b/qpid/cpp/src/qpid/broker/LossyLvq.h new file mode 100644 index 0000000000..e0a266ab77 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/LossyLvq.h @@ -0,0 +1,41 @@ +#ifndef QPID_BROKER_LOSSYLVQ_H +#define QPID_BROKER_LOSSYLVQ_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Lvq.h" +#include "qpid/broker/LossyQueue.h" + +namespace qpid { +namespace broker { +class MessageMap; + +/** + * Combination of LossyQueue and Lvq behaviours. + */ +class LossyLvq : public Lvq, public LossyQueue +{ + public: + LossyLvq(const std::string&, std::auto_ptr, const QueueSettings&, MessageStore* const, management::Manageable*, Broker*); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_LOSSYLVQ_H*/ -- cgit v1.2.1 From 6e403d50b2f88a04b04f018ad7c2dc9f492920a9 Mon Sep 17 00:00:00 2001 From: "Charles E. Rolke" Date: Thu, 8 Jan 2015 21:51:49 +0000 Subject: QPID-6298: [C++ Messaging] Closing sender/receiver frees proton link. Tested with proton 0.9, 0.7, and 0.6. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1650389 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 2 +- qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 08cc130a9e..5e0707056f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -39,7 +39,7 @@ ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, co capacity(0), used(0) {} ReceiverContext::~ReceiverContext() { - //pn_link_free(receiver); + pn_link_free(receiver); } void ReceiverContext::setCapacity(uint32_t c) diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1a254c1846..421b177163 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -49,7 +49,7 @@ SenderContext::SenderContext(pn_session_t* session, const std::string& n, const SenderContext::~SenderContext() { - //pn_link_free(sender); + pn_link_free(sender); } void SenderContext::close() -- cgit v1.2.1 From 22f1697cbe8585c9534052b0ced167ed6c827555 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 12 Jan 2015 18:42:15 +0000 Subject: NO-JIRA: ensure MapMessage is fully defined when used git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1651158 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/LossyLvq.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/LossyLvq.cpp b/qpid/cpp/src/qpid/broker/LossyLvq.cpp index c6b76ecb8c..f59ecc1925 100644 --- a/qpid/cpp/src/qpid/broker/LossyLvq.cpp +++ b/qpid/cpp/src/qpid/broker/LossyLvq.cpp @@ -19,6 +19,7 @@ * */ #include "LossyLvq.h" +#include "MessageMap.h" namespace qpid { namespace broker { -- cgit v1.2.1 From e757e25f051ab6c0bb2656146a6f4b2b045e11d7 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 13 Jan 2015 10:25:02 +0000 Subject: QPID-6310 ensure session is attached even for unsupported controls git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1651319 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp index 43f39c2919..bd0dcbfc85 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp @@ -276,6 +276,7 @@ void SessionHandler::flush(bool expected, bool confirmed, bool completed) { } void SessionHandler::gap(const SequenceSet& /*commands*/) { + checkAttached(); throw NotImplementedException("session.gap not supported"); } -- cgit v1.2.1 From 5a984f552b92187ef0a09d035db9a51009279a7a Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 13 Jan 2015 10:25:08 +0000 Subject: QPID-6310: handle case where content bearing method is not in fact a message-transfer, which is the only one currently supported by qpidd git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1651320 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/MessageBuilder.cpp | 3 +++ 1 file changed, 3 insertions(+) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp index 7cb99514d5..f5e9332052 100644 --- a/qpid/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/qpid/cpp/src/qpid/broker/MessageBuilder.cpp @@ -45,6 +45,9 @@ void MessageBuilder::handle(AMQFrame& frame) switch(state) { case METHOD: checkType(METHOD_BODY, type); + if (!frame.getMethod()->isA()) + throw NotImplementedException(QPID_MSG("Unexpected method: " << *(frame.getMethod()))); + exchange = frame.castBody()->getDestination(); state = HEADER; break; -- cgit v1.2.1 From f457cc314c6bc692731a87e8fed86d049e7c66c6 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 13 Jan 2015 10:25:15 +0000 Subject: QPID-6310: check validity of ranges when decoding sequence set git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1651321 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/framing/SequenceSet.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp index 72fcd8a9e2..845bf8bfae 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -54,7 +54,11 @@ void SequenceSet::decode(Buffer& buffer) throw IllegalArgumentException(QPID_MSG("Invalid size for sequence set: " << size)); for (uint16_t i = 0; i < count; i++) { - add(SequenceNumber(buffer.getLong()), SequenceNumber(buffer.getLong())); + SequenceNumber a(buffer.getLong()); + SequenceNumber b(buffer.getLong()); + if (b < a) + throw IllegalArgumentException(QPID_MSG("Invalid range in sequence set: " << a << " -> " << b)); + add(a, b); } } -- cgit v1.2.1 From 84d003afa06a4e17ff79f40bfb61d3b54f8e9c91 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Fri, 16 Jan 2015 18:38:19 +0000 Subject: QPID-6303 [linearstore] Roll back auto-upgrade of store directory structure git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1652486 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/linearstore/ISSUES | 43 +++++++++-------- qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp | 6 +-- .../linearstore/journal/EmptyFilePoolManager.cpp | 1 + .../linearstore/journal/EmptyFilePoolPartition.cpp | 55 +++++++--------------- .../linearstore/journal/EmptyFilePoolPartition.h | 2 + .../qpid/linearstore/journal/RecoveryManager.cpp | 2 +- 6 files changed, 46 insertions(+), 63 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/linearstore/ISSUES b/qpid/cpp/src/qpid/linearstore/ISSUES index 2042136bac..4023ba9629 100644 --- a/qpid/cpp/src/qpid/linearstore/ISSUES +++ b/qpid/cpp/src/qpid/linearstore/ISSUES @@ -37,16 +37,21 @@ Current/pending: ** Basic performance tests 5464 - [linearstore] Incompletely created journal files accumulate in EFP - 1088944 [Linearstore] store does not return all files to EFP after purging big queue - 6043 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation +* - 1066256 [LinearStore] changing efp size after using store broke the new durable nodes creation - 1067480 [LinearStore] Provide a way to limit max count/size of empty files in EFP - 1067429 [LinearStore] last file from deleted queue is not moved to EFP - - 1067482 [LinearStore] Provide a way to prealocate empty pages in EFP -* 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs - svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks - svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new - svn r.1649081 2015-01-02: WIP: Specify new queue using qpid-config --durable together with --efp-partition-num and/or --efp-pool-file-size. Needs testing. -* 1148807 [linearstore] Restarting broker with empty journal raises confusing warning -* 1066256 [linearstore] Changing EFP size after using store breaks durable queue creation + - 1067482 [LinearStore] Provide a way to preallocate empty pages in EFP +* 6303 1180660 [linearstore] Roll back auto-upgrade of store directory structure +* 5362 1145363 Linearstore: No store tools exist for examining the journals + svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. + svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze + svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze + svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze + svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze + svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze + * Store analysis and status + * Recovery/reading of message content + * Empty file pool status and management @@ -117,16 +122,6 @@ NO-JIRA - Added missing Apache copyright/license text 5651 - [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec * Bug introduced by r.1578899. - 5362 1145363 Linearstore: No store tools exist for examining the journals - svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up. - svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze - svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze - svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze - svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze - svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze - * Store analysis and status - * Recovery/reading of message content - * Empty file pool status and management 5661 - [linearstore] Set default cmake build to exclude linearstore svn r.1584379 2014-04-03 Proposed solution. * Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build. @@ -153,6 +148,12 @@ NO-JIRA - Added missing Apache copyright/license text svn r.1643053 2014-11-18: Proposed fix 6248 1167911 [linearstore] Symlink creation fails if store dir path is not absolute svn r.1641689 2014-11-25 Proposed solution + 5671 1160367 [linearstore] Add ability to use disk partitions and select per-queue EFPs + svn r.1636598 2014-11-04: WIP: New EFP and journal dir structure using symlinks + svn r.1637985 2014-11-10: WIP: Auto-upgrade from old dir structure to new + svn r.1649081 2015-01-02: WIP: Specify new queue using qpid-config --durable together with --efp-partition-num and/or --efp-pool-file-size. Needs testing. + - 1148807 [linearstore] Restarting broker with empty journal raises confusing warning + Fixed by svn r.1649081 of bug 5671 / 1160367 above Ordered checkin list: @@ -196,10 +197,12 @@ no. svn r Q-JIRA RHBZ Date Alt Committer 33. 1620426 6043 1089652 2014-08-25 34. 1631360 6147 1152012 2014-10-13 (pmoravec) 35. 1632504 6157 1150397 2014-10-17 (pmoravec) -36. 1636598 5671 2014-11-04 -37. 1637985 5671 2014-11-10 +36. 1636598 5671 1160367 2014-11-04 +37. 1637985 5671 1160367 2014-11-10 38. 1643053 6230 1165200 2014-11-18 39. 1641689 6248 1167911 2014-11-25 +40. 1649081 5671 1160367 2015-01-02 +41. 1649082 NO-JIRA - 2015-01-02 See above sections for details on these checkins. diff --git a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp index 68810936e1..70eac27f48 100644 --- a/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp +++ b/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp @@ -1488,21 +1488,21 @@ std::string MessageStoreImpl::getStoreTopLevelDir() { std::string MessageStoreImpl::getJrnlBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/jrnl/" ; + dir << storeDir << "/" << storeTopLevelDir << "/jrnl2/" ; return dir.str(); } std::string MessageStoreImpl::getBdbBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/dat/" ; + dir << storeDir << "/" << storeTopLevelDir << "/dat2/" ; return dir.str(); } std::string MessageStoreImpl::getTplBaseDir() { std::ostringstream dir; - dir << storeDir << "/" << storeTopLevelDir << "/tpl/" ; + dir << storeDir << "/" << storeTopLevelDir << "/tpl2/" ; return dir.str(); } diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp index 33707578bf..a02679736e 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp @@ -74,6 +74,7 @@ void EmptyFilePoolManager::findEfpPartitions() { if (!foundPartition) { std::ostringstream oss1; oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_) + << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_); jdir::create_dir(oss1.str()); insertPartition(defaultPartitionNumber_, oss1.str()); diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp index 9b58e8c4ff..12d2db74b8 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp @@ -32,6 +32,9 @@ namespace qpid { namespace linearstore { namespace journal { +// static +const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition + EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum, const std::string& partitionDir, const bool overwriteBeforeReturnFlag, @@ -57,45 +60,18 @@ EmptyFilePoolPartition::~EmptyFilePoolPartition() { void EmptyFilePoolPartition::findEmptyFilePools() { //std::cout << "*** EmptyFilePoolPartition::findEmptyFilePools(): Reading " << partitionDir_ << std::endl; // DEBUG - std::vector dirList; - bool upgradeDirStructureFlag = false; - std::string oldPartitionDir; - jdir::read_dir(partitionDir_, dirList, true, false, false, false); -//std::cout << "*** dirList.size()=" << dirList.size() << "; dirList.front()=" << dirList.front() << std::endl; // DEBUG - if (dirList.size() == 1 && dirList.front().compare("efp") == 0) { - upgradeDirStructureFlag = true; - oldPartitionDir = partitionDir_ + "/efp"; -//std::cout << "*** oldPartitionDir=" << oldPartitionDir << std::endl; // DEBUG - dirList.clear(); - jdir::read_dir(oldPartitionDir, dirList, true, false, false, false); - } + std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_); + if (jdir::is_dir(efpDir)) { + std::vector dirList; + jdir::read_dir(efpDir, dirList, true, false, false, true); for (std::vector::iterator i = dirList.begin(); i != dirList.end(); ++i) { - std::string fqFileName(partitionDir_ + "/" + *i); - if (upgradeDirStructureFlag) { - std::string fqOldFileName(partitionDir_ + "/efp/" + *i); - if (::rename(fqOldFileName.c_str(), fqFileName.c_str())) { - // File move failed - std::ostringstream oss; - oss << "File \'" << fqOldFileName << "\' could not be renamed to \'" << fqFileName << "\' (" << FORMAT_SYSERR(errno) << "); file deleted"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - if (::unlink(fqOldFileName.c_str())) { - std::ostringstream oss; - oss << "File \'" << fqOldFileName << "\' could not be deleted (" << FORMAT_SYSERR(errno) << "\'; file orphaned"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } - } - } - createEmptyFilePool(fqFileName); - } - if (upgradeDirStructureFlag) { - std::string oldEfpDir(partitionDir_ + "/efp"); - if (::rmdir(oldEfpDir.c_str())) { - // Unable to delete old "efp" dir - std::ostringstream oss; - oss << "Unable to delete old EFP directory \'" << oldEfpDir << "\' (" << FORMAT_SYSERR(errno) << "\'; directory orphaned"; - journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); - } + createEmptyFilePool(*i); } + } else { + std::ostringstream oss; + oss << "Partition \"" << partitionDir_ << "\" does not contain top level EFP dir \"" << s_efpTopLevelDir_ << "\""; + journalLogRef_.log(JournalLog::LOG_WARN, oss.str()); + } } EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib, const bool createIfNonExistent) { @@ -182,7 +158,7 @@ efpPartitionNumber_t EmptyFilePoolPartition::getPartitionNumber(const std::strin // --- protected functions --- EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) { - std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib)); + std::string fqEfpDirectoryName(partitionDir_ + "/" + EmptyFilePoolPartition::s_efpTopLevelDir_ + "/" + EmptyFilePool::dirNameFromDataSize(efpDataSize_kib)); return createEmptyFilePool(fqEfpDirectoryName); } @@ -211,11 +187,12 @@ EmptyFilePool* EmptyFilePoolPartition::createEmptyFilePool(const std::string fqE } void EmptyFilePoolPartition::validatePartitionDir() { + std::ostringstream ss; if (!jdir::is_dir(partitionDir_)) { - std::ostringstream ss; ss << "Invalid partition directory: \'" << partitionDir_ << "\' is not a directory"; throw jexception(jerrno::JERR_EFP_BADPARTITIONDIR, ss.str(), "EmptyFilePoolPartition", "validatePartitionDir"); } + // TODO: other validity checks here } diff --git a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h index 23a541f8f4..570e2b073f 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h +++ b/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h @@ -37,6 +37,8 @@ class JournalLog; class EmptyFilePoolPartition { +public: + static const std::string s_efpTopLevelDir_; protected: typedef std::map efpMap_t; typedef efpMap_t::iterator efpMapItr_t; diff --git a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp index 198b39857c..73a16f01b7 100644 --- a/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp +++ b/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp @@ -429,7 +429,7 @@ void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) { } } -std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG +//std::cerr << "*** RecoveryManager::analyzeJournalFileHeaders() fileNumberMap_.size()=" << fileNumberMap_.size() << std::endl; // DEBUG if (fileNumberMap_.empty()) { journalEmptyFlag_ = true; } else { -- cgit v1.2.1 From e459d2c4c0073217cdd7cd5b73e3aecee0c06ca8 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 19 Jan 2015 14:12:54 +0000 Subject: QPID-6321: handle change to pn_delivery_tag_t in 0.9 git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653005 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/amqp.cmake | 5 ++++- qpid/cpp/src/config.h.cmake | 2 +- qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 4 ++++ qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp | 9 +++++++++ qpid/cpp/src/qpid/broker/amqp/Relay.cpp | 9 +++++++++ qpid/cpp/src/qpid/broker/amqp/Session.cpp | 5 +++++ qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 5 +++++ 7 files changed, 37 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/amqp.cmake b/qpid/cpp/src/amqp.cmake index 77ade87c8e..b2ff10bd68 100644 --- a/qpid/cpp/src/amqp.cmake +++ b/qpid/cpp/src/amqp.cmake @@ -22,7 +22,7 @@ find_package(Proton 0.5) set (amqp_default ${amqp_force}) -set (maximum_version 0.8) +set (maximum_version 0.9) if (Proton_FOUND) if (Proton_VERSION GREATER ${maximum_version}) message(WARNING "Qpid proton ${Proton_VERSION} is not a tested version and might not be compatible, ${maximum_version} is highest tested; build may not work") @@ -37,6 +37,9 @@ if (Proton_FOUND) set (USE_PROTON_TRANSPORT_CONDITION 1) set (HAVE_PROTON_EVENTS 1) endif (Proton_VERSION GREATER 0.7) + if (Proton_VERSION GREATER 0.8) + set (NO_PROTON_DELIVERY_TAG_T 1) + endif (Proton_VERSION GREATER 0.8) else () message(STATUS "Qpid proton not found, amqp 1.0 support not enabled") endif () diff --git a/qpid/cpp/src/config.h.cmake b/qpid/cpp/src/config.h.cmake index dbfc4ced8a..777fc1b893 100644 --- a/qpid/cpp/src/config.h.cmake +++ b/qpid/cpp/src/config.h.cmake @@ -59,5 +59,5 @@ #cmakedefine HAVE_PROTON_TRACER #cmakedefine USE_PROTON_TRANSPORT_CONDITION #cmakedefine HAVE_PROTON_EVENTS - +#cmakedefine NO_PROTON_DELIVERY_TAG_T #endif /* QPID_CONFIG_H */ diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index f04cd8eb6e..3a93e2aac5 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -412,7 +412,11 @@ void Connection::process() namespace { std::string convert(pn_delivery_tag_t in) { +#ifdef NO_PROTON_DELIVERY_TAG_T + return std::string(in.start, in.size); +#else return std::string(in.bytes, in.size); +#endif } } void Connection::processDeliveries() diff --git a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp index d0b41c6c90..0136d5a0ed 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -32,6 +32,7 @@ #include "qpid/framing/Buffer.h" #include "qpid/framing/reply_exceptions.h" #include "qpid/log/Statement.h" +#include "config.h" namespace qpid { namespace broker { @@ -285,7 +286,11 @@ qpid::broker::OwnershipToken* OutgoingFromQueue::getSession() OutgoingFromQueue::Record::Record() : delivery(0), disposition(0), index(0) { +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = tagData; +#else tag.bytes = tagData; +#endif tag.size = TAG_WIDTH; } void OutgoingFromQueue::Record::init(size_t i) @@ -306,7 +311,11 @@ void OutgoingFromQueue::Record::reset() size_t OutgoingFromQueue::Record::getIndex(pn_delivery_tag_t t) { assert(t.size == TAG_WIDTH); +#ifdef NO_PROTON_DELIVERY_TAG_T + qpid::framing::Buffer buffer(const_cast(t.start)/*won't ever be written to*/, t.size); +#else qpid::framing::Buffer buffer(const_cast(t.bytes)/*won't ever be written to*/, t.size); +#endif return (size_t) buffer.getLong(); } diff --git a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp index 5e7a3af889..587a11466a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Relay.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Relay.cpp @@ -23,6 +23,7 @@ #include "qpid/log/Statement.h" #include #include +#include "config.h" namespace qpid { namespace broker { @@ -244,7 +245,11 @@ void BufferedTransfer::initIn(pn_link_t* link, pn_delivery_t* d) //copy delivery tag pn_delivery_tag_t dt = pn_delivery_tag(d); tag.resize(dt.size); +#ifdef NO_PROTON_DELIVERY_TAG_T + ::memmove(&tag[0], dt.start, dt.size); +#else ::memmove(&tag[0], dt.bytes, dt.size); +#endif //set context pn_delivery_set_context(d, this); @@ -264,7 +269,11 @@ bool BufferedTransfer::settle() void BufferedTransfer::initOut(pn_link_t* link) { pn_delivery_tag_t dt; +#ifdef NO_PROTON_DELIVERY_TAG_T + dt.start = &tag[0]; +#else dt.bytes = &tag[0]; +#endif dt.size = tag.size(); out.handle = pn_delivery(link, dt); //set context diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 2e7d30118a..04a691e5d9 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -48,6 +48,7 @@ #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include "qpid/amqp_0_10/Codecs.h" +#include "config.h" #include #include #include @@ -615,7 +616,11 @@ void Session::accepted(pn_delivery_t* delivery, bool sync) void Session::readable(pn_link_t* link, pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); +#ifdef NO_PROTON_DELIVERY_TAG_T + QPID_LOG(debug, "received delivery: " << std::string(tag.start, tag.size)); +#else QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); +#endif incomingMessageReceived(); IncomingLinks::iterator target = incoming.find(link); if (target == incoming.end()) { diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 421b177163..2a48b2241a 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -30,6 +30,7 @@ #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/log/Statement.h" +#include "config.h" extern "C" { #include } @@ -510,7 +511,11 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) { pn_delivery_tag_t tag; tag.size = sizeof(id); +#ifdef NO_PROTON_DELIVERY_TAG_T + tag.start = reinterpret_cast(&id); +#else tag.bytes = reinterpret_cast(&id); +#endif token = pn_delivery(sender, tag); pn_link_send(sender, encoded.getData(), encoded.getSize()); if (unreliable) { -- cgit v1.2.1 From d4b6143a73d60d3ade9b398b8d45c3ba495bc6ec Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 19 Jan 2015 14:13:00 +0000 Subject: QPID-6322: avoid setting qpid.auto_delete_timeout unnecessarily git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653006 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/amqp/Session.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 04a691e5d9..538883f29a 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -509,7 +509,8 @@ void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::s if (settings.original.find("qpid.auto_delete_timeout") == settings.original.end()) { //only use delay from link if policy didn't specify one settings.autoDeleteDelay = pn_terminus_get_timeout(source); - settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; + if (settings.autoDeleteDelay) + settings.original["qpid.auto_delete_timeout"] = settings.autoDeleteDelay; } if (settings.autoDeleteDelay) { settings.autodelete = true; -- cgit v1.2.1 From cd68c04d70cd93d01feec3ae5e034f497f88823d Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 19 Jan 2015 14:13:06 +0000 Subject: QPID-6323: align default timeout behaviour with 0-10 codepath git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653007 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp index e631501977..7f19ca7ec0 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp @@ -267,7 +267,7 @@ AddressHelper::AddressHelper(const Address& address) : bind(link, RELIABILITY, reliability); durableNode = test(node, DURABLE); durableLink = test(link, DURABLE); - timeout = get(link, TIMEOUT, durableLink ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); + timeout = get(link, TIMEOUT, durableLink && reliability != AT_LEAST_ONCE ? DEFAULT_DURABLE_TIMEOUT : DEFAULT_TIMEOUT); std::string mode; if (bind(address, MODE, mode)) { if (mode == BROWSE) { -- cgit v1.2.1 From 9ff0e238e286632fe0996779abbf70b72dada252 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 19 Jan 2015 14:13:11 +0000 Subject: QPID-5597: honour explicit auto-delete when True (as well as when False) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653008 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 6128f4c0fc..2d03748ac6 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -534,7 +534,7 @@ Subscription::Subscription(const Address& address, const std::string& type) reliable(durable ? !AddressResolution::is_unreliable(address) : AddressResolution::is_reliable(address)), actualType(type.empty() ? (specifiedType.empty() ? TOPIC_EXCHANGE : specifiedType) : type), exclusiveQueue((Opt(address)/LINK/X_DECLARE/EXCLUSIVE).asBool(true)), - autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(true)), + autoDeleteQueue((Opt(address)/LINK/X_DECLARE/AUTO_DELETE).asBool(!(durable || reliable))), exclusiveSubscription((Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE).asBool(exclusiveQueue)), alternateExchange((Opt(address)/LINK/X_DECLARE/ALTERNATE_EXCHANGE).str()) { @@ -610,7 +610,7 @@ void Subscription::subscribe(qpid::client::AsyncSession& session, const std::str //create subscription queue: session.queueDeclare(arg::queue=queue, arg::exclusive=exclusiveQueue, - arg::autoDelete=autoDeleteQueue && (!(durable || reliable)), arg::durable=durable, + arg::autoDelete=autoDeleteQueue, arg::durable=durable, arg::alternateExchange=alternateExchange, arg::arguments=queueOptions); //'default' binding: -- cgit v1.2.1 From b086149e61bfd501f316d5ddb43010506da4e78c Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Mon, 19 Jan 2015 14:13:17 +0000 Subject: QPID-6324: fix when default timeout applies git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653009 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 2d03748ac6..558cad9e54 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -542,11 +542,10 @@ Subscription::Subscription(const Address& address, const std::string& type) if ((Opt(address)/LINK).hasKey(TIMEOUT)) { const Variant* timeout = (Opt(address)/LINK/TIMEOUT).value; if (timeout->asUint32()) queueOptions.setInt("qpid.auto_delete_timeout", timeout->asUint32()); - } else if (durable && !(Opt(address)/LINK/RELIABILITY).value) { - //if durable but not explicitly reliable, then set a non-zero - //default for the autodelete timeout (previously this would - //have defaulted to autodelete immediately anyway, so the risk - //of the change causing problems is mitigated) + } else if (durable && !reliable && !(Opt(address)/LINK/X_DECLARE).hasKey(AUTO_DELETE)) { + //if durable but not reliable, and auto-delete not + //explicitly set, then set a non-zero default for the + //autodelete timeout queueOptions.setInt("qpid.auto_delete_timeout", 2*60); } (Opt(address)/LINK/X_DECLARE/ARGUMENTS).collect(queueOptions); -- cgit v1.2.1 From f759ba5b3a919149622c656a2bb06f21d893ac1e Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 20 Jan 2015 10:00:17 +0000 Subject: QPID-6325: improve 0-10 connection handling logic git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653216 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/amqp_0_10/Connection.cpp | 2 +- qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 27 +++++++++++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp index 87085b6d77..866f98071b 100644 --- a/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp +++ b/qpid/cpp/src/qpid/amqp_0_10/Connection.cpp @@ -54,7 +54,7 @@ size_t Connection::decode(const char* buffer, size_t size) { } } framing::AMQFrame frame; - while(frame.decode(in)) { + while(!pushClosed && frame.decode(in)) { QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); connection->received(frame); } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 2afdc5a61d..eece59d095 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -93,14 +93,14 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) } else if (isOpen()) { handler->connection.getChannel(frame.getChannel()).in(frame); } else { - handler->proxy.close( + handler->connection.close( connection::CLOSE_CODE_FRAMING_ERROR, "Connection not yet open, invalid frame received."); } }catch(ConnectionException& e){ - handler->proxy.close(e.code, e.what()); + handler->connection.close(e.code, e.what()); }catch(std::exception& e){ - handler->proxy.close(541/*internal error*/, e.what()); + handler->connection.close(connection::CLOSE_CODE_CONNECTION_FORCED, e.what()); } } @@ -234,6 +234,10 @@ void ConnectionHandler::Handler::tuneOk(uint16_t /*channelmax*/, void ConnectionHandler::Handler::open(const string& /*virtualHost*/, const framing::Array& /*capabilities*/, bool /*insist*/) { + if (connection.getUserId().empty()) { + throw ConnectionForcedException("Not authenticated!"); + } + if (connection.isFederationLink()) { AclModule* acl = connection.getBroker().getAcl(); if (acl && acl->userAclRules()) { @@ -302,6 +306,11 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, const framing::Array& supportedMechanisms, const framing::Array& /*locales*/) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + + string requestedMechanism = connection.getAuthMechanism(); std::string username = connection.getUsername(); @@ -388,6 +397,10 @@ void ConnectionHandler::Handler::start(const FieldTable& serverProperties, void ConnectionHandler::Handler::secure(const string& challenge ) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + if (sasl.get()) { string response = sasl->step(challenge); proxy.secureOk(response); @@ -402,6 +415,10 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, uint16_t /*heartbeatMin*/, uint16_t heartbeatMax) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + maxFrameSize = std::min(maxFrameSize, maxFrameSizeProposed); connection.setFrameMax(maxFrameSize); @@ -420,6 +437,10 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, void ConnectionHandler::Handler::openOk(const framing::Array& knownHosts) { + if (serverMode) { + throw ConnectionForcedException("Invalid protocol sequence."); + } + for (Array::ValueVector::const_iterator i = knownHosts.begin(); i != knownHosts.end(); ++i) { Url url((*i)->get()); connection.getKnownHosts().push_back(url); -- cgit v1.2.1 From 05b0237a8bbdb66bc299b73837d691bbfa412ce7 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 21 Jan 2015 14:50:52 +0000 Subject: QPID-6325: further improve 0-10 connection handling logic git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653547 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 14 +++++++------- qpid/cpp/src/qpid/broker/ConnectionHandler.h | 3 ++- 2 files changed, 9 insertions(+), 8 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index eece59d095..8972040be5 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -242,17 +242,17 @@ void ConnectionHandler::Handler::open(const string& /*virtualHost*/, AclModule* acl = connection.getBroker().getAcl(); if (acl && acl->userAclRules()) { if (!acl->authorise(connection.getUserId(),acl::ACT_CREATE,acl::OBJ_LINK,"")){ - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, - QPID_MSG("ACL denied " << connection.getUserId() - << " creating a federation link")); + connection.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("ACL denied " << connection.getUserId() + << " creating a federation link")); return; } } else { if (connection.getBroker().isAuthenticating()) { - proxy.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, - QPID_MSG("User " << connection.getUserId() - << " federation connection denied. Systems with authentication " - "enabled must specify ACL create link rules.")); + connection.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED, + QPID_MSG("User " << connection.getUserId() + << " federation connection denied. Systems with authentication " + "enabled must specify ACL create link rules.")); return; } } diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.h b/qpid/cpp/src/qpid/broker/ConnectionHandler.h index 7af2fe3cb4..30155fb903 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.h +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.h @@ -100,13 +100,14 @@ class ConnectionHandler : public framing::FrameHandler std::auto_ptr handler; bool handle(const qpid::framing::AMQMethodBody& method); + void close(framing::connection::CloseCode code, const std::string& text); public: ConnectionHandler(amqp_0_10::Connection& connection, bool isClient ); - void close(framing::connection::CloseCode code, const std::string& text); void heartbeat(); void handle(framing::AMQFrame& frame); void setSecureConnection(SecureConnection* secured); bool isOpen() { return handler->isOpen; } + friend class amqp_0_10::Connection; }; -- cgit v1.2.1 From 23e9a05cdc97a43363a35ad848ec73a29f71c702 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 21 Jan 2015 14:50:59 +0000 Subject: QPID-6329: slightly more forgiving equivalence check on fieldvalues in assertions on 0-10 path git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653548 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 10 ++++++++-- qpid/cpp/src/tests/assertions.py | 15 +++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 558cad9e54..0225ee74cb 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -55,6 +55,7 @@ using qpid::messaging::AssertionFailed; using qpid::framing::ExchangeBoundResult; using qpid::framing::ExchangeQueryResult; using qpid::framing::FieldTable; +using qpid::framing::FieldValue; using qpid::framing::QueueQueryResult; using qpid::framing::ReplyTo; using qpid::framing::Uuid; @@ -140,6 +141,11 @@ const std::string PREFIX_AMQ("amq."); const std::string PREFIX_QPID("qpid."); const Verifier verifier; + +bool areEquivalent(const FieldValue& a, const FieldValue& b) +{ + return ((a == b) || (a.convertsTo() && b.convertsTo() && a.get() == b.get())); +} } struct Binding @@ -806,7 +812,7 @@ void Queue::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (!areEquivalent(*i->second, *v)) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } @@ -906,7 +912,7 @@ void Exchange::checkAssert(qpid::client::AsyncSession& session, CheckMode mode) FieldTable::ValuePtr v = result.getArguments().get(i->first); if (!v) { throw AssertionFailed((boost::format("Option %1% not set for %2%") % i->first % name).str()); - } else if (*i->second != *v) { + } else if (!areEquivalent(*i->second, *v)) { throw AssertionFailed((boost::format("Option %1% does not match for %2%, expected %3%, got %4%") % i->first % name % *(i->second) % *v).str()); } diff --git a/qpid/cpp/src/tests/assertions.py b/qpid/cpp/src/tests/assertions.py index f1db21b753..930afd124d 100644 --- a/qpid/cpp/src/tests/assertions.py +++ b/qpid/cpp/src/tests/assertions.py @@ -177,3 +177,18 @@ class AssertionTests (VersionTest): assert False, "Expected assertion to fail on unspecified option" except AssertionFailed: None except MessagingError: None + + def test_queue_autodelete_timeout(self): + name = str(uuid4()) + # create subscription queue with 0-10 to be sure of name + ssn_0_10 = self.create_connection("amqp0-10", True).session() + ssn_0_10.receiver("amq.direct; {link:{name:%s,timeout:30}}" % name) + self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name) + ssn_0_10_other = self.create_connection("amqp0-10", True).session() + ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 30}}}}" % name) + try: + self.ssn.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name) + ssn_0_10_other.sender("%s; {assert:always, node:{x-declare:{arguments: {qpid.auto_delete_timeout: 60}}}}" % name) + assert False, "Expected assertion to fail for auto_delete_timeout" + except AssertionFailed: None + except MessagingError: None -- cgit v1.2.1 From 5cdbf1f4f828d5e56605b52549a1b99e292d94ca Mon Sep 17 00:00:00 2001 From: "Darryl L. Pierce" Date: Wed, 21 Jan 2015 19:26:51 +0000 Subject: txshift: switch to uint for workers workaround missing size_t implementation of po::value_semantic* create_value(T& val, const std::string& arg) as it would conflict with when size_t == unsigned int even when it would help for size_t == unsigned long fixes https://issues.apache.org/jira/browse/QPID-6135 Contributed by Dan Horaz git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1653631 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/tests/txshift.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/tests/txshift.cpp b/qpid/cpp/src/tests/txshift.cpp index bf85bee986..6ec28c7233 100644 --- a/qpid/cpp/src/tests/txshift.cpp +++ b/qpid/cpp/src/tests/txshift.cpp @@ -40,7 +40,7 @@ namespace tests { struct Args : public qpid::TestOptions { std::string workQueue; - size_t workers; + uint workers; Args() : workQueue("txshift-control"), workers(1) { @@ -178,7 +178,7 @@ int main(int argc, char** argv) worker.run(); } else { boost::ptr_vector workers; - for (size_t i = 0; i < opts.workers; i++) { + for (uint i = 0; i < opts.workers; i++) { workers.push_back(new Worker(connection, opts.workQueue)); } std::for_each(workers.begin(), workers.end(), boost::bind(&Worker::start, _1)); -- cgit v1.2.1 From e08c11b991309252a19a572024961550e5adaf11 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Fri, 23 Jan 2015 20:25:45 +0000 Subject: QPID-6310: handle maximal range git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1654365 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/framing/SequenceSet.cpp | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) (limited to 'qpid/cpp/src') diff --git a/qpid/cpp/src/qpid/framing/SequenceSet.cpp b/qpid/cpp/src/qpid/framing/SequenceSet.cpp index 845bf8bfae..6510842c58 100644 --- a/qpid/cpp/src/qpid/framing/SequenceSet.cpp +++ b/qpid/cpp/src/qpid/framing/SequenceSet.cpp @@ -33,7 +33,18 @@ namespace framing { namespace { //each range contains 2 numbers, 4 bytes each -uint16_t RANGE_SIZE = 2 * 4; +uint16_t RANGE_SIZE = 2 * 4; +int32_t MAX_RANGE = 2147483647;//2^31-1 + +int32_t gap(const SequenceNumber& a, const SequenceNumber& b) +{ + return a < b ? b - a : a - b; +} + +bool is_max_range(const SequenceNumber& a, const SequenceNumber& b) +{ + return gap(a, b) == MAX_RANGE; +} } void SequenceSet::encode(Buffer& buffer) const @@ -58,7 +69,17 @@ void SequenceSet::decode(Buffer& buffer) SequenceNumber b(buffer.getLong()); if (b < a) throw IllegalArgumentException(QPID_MSG("Invalid range in sequence set: " << a << " -> " << b)); - add(a, b); + if (is_max_range(a, b)) { + //RangeSet holds 'half-closed' ranges, where the end is + //one past the 'highest' value in the range. So if the + //range is already the maximum expressable with a 32bit + //sequence number, we can't represent it as a + //'half-closed' range, so we represent it as two ranges. + add(a, b-1); + add(b); + } else { + add(a, b); + } } } -- cgit v1.2.1