diff options
author | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 15:03:16 +0000 |
---|---|---|
committer | Kenneth Anthony Giusti <kgiusti@apache.org> | 2011-02-19 15:03:16 +0000 |
commit | 81584c84fadc886b0ad53dceb479073e56bf8cdd (patch) | |
tree | f48206d10d52fdbb5a4ce93ec8068f0de4fbc9f5 | |
parent | ccd0e27fdf0c5a90a7f85099dac4f63dbd7a5d15 (diff) | |
download | qpid-python-81584c84fadc886b0ad53dceb479073e56bf8cdd.tar.gz |
QPID-2935: merge producer flow control (C++ broker).
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1072356 13f79535-47bb-0310-9956-ffa450edef68
40 files changed, 2038 insertions, 469 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 9fb133f4d0..d0ca2d9c2b 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -985,7 +985,6 @@ set (qpidbroker_SOURCES qpid/broker/ExchangeRegistry.cpp qpid/broker/FanOutExchange.cpp qpid/broker/HeadersExchange.cpp - qpid/broker/IncompleteMessageList.cpp qpid/broker/Link.cpp qpid/broker/LinkRegistry.cpp qpid/broker/Message.cpp @@ -998,6 +997,7 @@ set (qpidbroker_SOURCES qpid/broker/QueueEvents.cpp qpid/broker/QueuePolicy.cpp qpid/broker/QueueRegistry.cpp + qpid/broker/QueueFlowLimit.cpp qpid/broker/RateTracker.cpp qpid/broker/RecoveryManagerImpl.cpp qpid/broker/RecoveredEnqueue.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 2cd6ad462f..dfb2547613 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -561,8 +561,7 @@ libqpidbroker_la_SOURCES = \ qpid/broker/HandlerImpl.h \ qpid/broker/HeadersExchange.cpp \ qpid/broker/HeadersExchange.h \ - qpid/broker/IncompleteMessageList.cpp \ - qpid/broker/IncompleteMessageList.h \ + qpid/broker/AsyncCompletion.h \ qpid/broker/LegacyLVQ.h \ qpid/broker/LegacyLVQ.cpp \ qpid/broker/Link.cpp \ @@ -612,6 +611,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/QueueRegistry.cpp \ qpid/broker/QueueRegistry.h \ qpid/broker/QueuedMessage.h \ + qpid/broker/QueueFlowLimit.h \ + qpid/broker/QueueFlowLimit.cpp \ qpid/broker/RateFlowcontrol.h \ qpid/broker/RateTracker.cpp \ qpid/broker/RateTracker.h \ diff --git a/cpp/src/qpid/broker/AsyncCompletion.h b/cpp/src/qpid/broker/AsyncCompletion.h new file mode 100644 index 0000000000..1f3d11e0ee --- /dev/null +++ b/cpp/src/qpid/broker/AsyncCompletion.h @@ -0,0 +1,170 @@ +#ifndef _AsyncCompletion_ +#define _AsyncCompletion_ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Monitor.h" + +namespace qpid { +namespace broker { + +/** + * Class to implement asynchronous notification of completion. + * + * Use-case: An "initiator" needs to wait for a set of "completers" to + * finish a unit of work before an action can occur. This object + * tracks the progress of the set of completers, and allows the action + * to occur once all completers have signalled that they are done. + * + * The initiator and completers may be running in separate threads. + * + * The initiating thread is the thread that initiates the action, + * i.e. the connection read thread. + * + * A completing thread is any thread that contributes to completion, + * e.g. a store thread that does an async write. + * There may be zero or more completers. + * + * When the work is complete, a callback is invoked. The callback + * may be invoked in the Initiator thread, or one of the Completer + * threads. The callback is passed a flag indicating whether or not + * the callback is running under the context of the Initiator thread. + * + * Use model: + * 1) Initiator thread invokes begin() + * 2) After begin() has been invoked, zero or more Completers invoke + * startCompleter(). Completers may be running in the same or + * different thread as the Initiator, as long as they guarantee that + * startCompleter() is invoked at least once before the Initiator invokes end(). + * 3) Completers may invoke finishCompleter() at any time, even after the + * initiator has invoked end(). finishCompleter() may be called from any + * thread. + * 4) startCompleter()/finishCompleter() calls "nest": for each call to + * startCompleter(), a corresponding call to finishCompleter() must be made. + * Once the last finishCompleter() is called, the Completer must no longer + * reference the completion object. + * 5) The Initiator invokes end() at the point where it has finished + * dispatching work to the Completers, and is prepared for the callback + * handler to be invoked. Note: if there are no outstanding Completers + * pending when the Initiator invokes end(), the callback will be invoked + * directly, and the sync parameter will be set true. This indicates to the + * Initiator that the callback is executing in the context of the end() call, + * and the Initiator is free to optimize the handling of the completion, + * assuming no need for synchronization with Completer threads. + */ + +class AsyncCompletion +{ + private: + mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; + mutable qpid::sys::Monitor callbackLock; + bool inCallback, active; + + void invokeCallback(bool sync) { + qpid::sys::Mutex::ScopedLock l(callbackLock); + if (active) { + inCallback = true; + { + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + completed(sync); + } + inCallback = false; + active = false; + callbackLock.notifyAll(); + } + } + + protected: + /** Invoked when all completers have signalled that they have completed + * (via calls to finishCompleter()). bool == true if called via end() + */ + virtual void completed(bool) = 0; + + public: + AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {}; + virtual ~AsyncCompletion() { cancel(); } + + /** True when all outstanding operations have compeleted + */ + bool isDone() + { + qpid::sys::Mutex::ScopedLock l(callbackLock); + return !active; + } + + /** Called to signal the start of an asynchronous operation. The operation + * is considered pending until finishCompleter() is called. + * E.g. called when initiating an async store operation. + */ + void startCompleter() { ++completionsNeeded; } + + /** Called by completer to signal that it has finished the operation started + * when startCompleter() was invoked. + * e.g. called when async write complete. + */ + void finishCompleter() + { + if (--completionsNeeded == 0) { + invokeCallback(false); + } + } + + /** called by initiator before any calls to startCompleter can be done. + */ + void begin() + { + qpid::sys::Mutex::ScopedLock l(callbackLock); + ++completionsNeeded; + } + + /** called by initiator after all potential completers have called + * startCompleter(). + */ + void end() + { + assert(completionsNeeded.get() > 0); // ensure begin() has been called! + if (--completionsNeeded == 0) { + invokeCallback(true); + } + } + + /** may be called by Initiator to cancel the callback. Will wait for + * callback to complete if in progress. + */ + virtual void cancel() { + qpid::sys::Mutex::ScopedLock l(callbackLock); + while (inCallback) callbackLock.wait(); + active = false; + } + + /** may be called by Initiator after all completers have been added but + * prior to calling end(). Allows initiator to determine if it _really_ + * needs to wait for pending Completers (e.g. count > 1). + */ + //uint32_t getPendingCompleters() { return completionsNeeded.get(); } +}; + +}} // qpid::broker:: +#endif /*!_AsyncCompletion_*/ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index fbd7dd3361..7879989bf0 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -32,6 +32,7 @@ #include "qpid/broker/TopicExchange.h" #include "qpid/broker/Link.h" #include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qmf/org/apache/qpid/broker/Package.h" #include "qmf/org/apache/qpid/broker/ArgsBrokerCreate.h" @@ -118,7 +119,9 @@ Broker::Options::Options(const std::string& name) : maxSessionRate(0), asyncQueueEvents(false), // Must be false in a cluster. qmf2Support(true), - qmf1Support(true) + qmf1Support(true), + queueFlowStopRatio(80), + queueFlowResumeRatio(70) { int c = sys::SystemInfo::concurrency(); workerThreads=c+1; @@ -151,7 +154,9 @@ Broker::Options::Options(const std::string& name) : ("known-hosts-url", optValue(knownHosts, "URL or 'none'"), "URL to send as 'known-hosts' to clients ('none' implies empty list)") ("sasl-config", optValue(saslConfigPath, "FILE"), "gets sasl config from nonstandard location") ("max-session-rate", optValue(maxSessionRate, "MESSAGES/S"), "Sets the maximum message rate per session (0=unlimited)") - ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication"); + ("async-queue-events", optValue(asyncQueueEvents, "yes|no"), "Set Queue Events async, used for services like replication") + ("default-flow-stop-threshold", optValue(queueFlowStopRatio, "%MESSAGES"), "Queue capacity level at which flow control is activated.") + ("default-flow-resume-threshold", optValue(queueFlowResumeRatio, "%MESSAGES"), "Queue capacity level at which flow control is de-activated."); } const std::string empty; @@ -182,8 +187,9 @@ Broker::Broker(const Broker::Options& conf) : conf.replayHardLimit*1024), *this), queueCleaner(queues, timer), - queueEvents(poller,!conf.asyncQueueEvents), + queueEvents(poller,!conf.asyncQueueEvents), recovery(true), + inCluster(false), clusterUpdatee(false), expiryPolicy(new ExpiryPolicy), connectionCounter(conf.maxConnections), @@ -240,8 +246,16 @@ Broker::Broker(const Broker::Options& conf) : // Early-Initialize plugins Plugin::earlyInitAll(*this); + /** todo KAG - remove once cluster support for flow control done */ + if (isInCluster()) { + QPID_LOG(info, "Producer Flow Control TBD for clustered brokers - queue flow control disabled by default."); + QueueFlowLimit::setDefaults(0, 0, 0); + } else { + QueueFlowLimit::setDefaults(conf.queueLimit, conf.queueFlowStopRatio, conf.queueFlowResumeRatio); + } + // If no plugin store module registered itself, set up the null store. - if (NullMessageStore::isNullStore(store.get())) + if (NullMessageStore::isNullStore(store.get())) setStore(); exchanges.declare(empty, DirectExchange::typeName); // Default exchange. @@ -360,14 +374,14 @@ void Broker::run() { Dispatcher d(poller); int numIOThreads = config.workerThreads; std::vector<Thread> t(numIOThreads-1); - + // Run n-1 io threads for (int i=0; i<numIOThreads-1; ++i) t[i] = Thread(d); - + // Run final thread d.run(); - + // Now wait for n-1 io threads to exit for (int i=0; i<numIOThreads-1; ++i) { t[i].join(); @@ -414,9 +428,9 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, { case _qmf::Broker::METHOD_ECHO : QPID_LOG (debug, "Broker::echo(" - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence - << ", " - << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_sequence + << ", " + << dynamic_cast<_qmf::ArgsBrokerEcho&>(args).io_body << ")"); status = Manageable::STATUS_OK; break; diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9af9020c8f..d85927c43c 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -118,29 +118,31 @@ public: bool asyncQueueEvents; bool qmf2Support; bool qmf1Support; + uint queueFlowStopRatio; // producer flow control: on + uint queueFlowResumeRatio; // producer flow control: off private: std::string getHome(); }; - + class ConnectionCounter { int maxConnections; int connectionCount; sys::Mutex connectionCountLock; public: ConnectionCounter(int mc): maxConnections(mc),connectionCount(0) {}; - void inc_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); + void inc_connectionCount() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); connectionCount++; - } - void dec_connectionCount() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); + } + void dec_connectionCount() { + sys::ScopedLock<sys::Mutex> l(connectionCountLock); connectionCount--; } bool allowConnection() { - sys::ScopedLock<sys::Mutex> l(connectionCountLock); + sys::ScopedLock<sys::Mutex> l(connectionCountLock); return (maxConnections <= connectionCount); - } + } }; private: @@ -182,7 +184,7 @@ public: const boost::intrusive_ptr<Message>& msg); std::string federationTag; bool recovery; - bool clusterUpdatee; + bool inCluster, clusterUpdatee; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; ConnectionCounter connectionCounter; @@ -241,7 +243,7 @@ public: QPID_BROKER_EXTERN void accept(); /** Create a connection to another broker. */ - void connect(const std::string& host, uint16_t port, + void connect(const std::string& host, uint16_t port, const std::string& transport, boost::function2<void, int, std::string> failed, sys::ConnectionCodec::Factory* =0); @@ -253,9 +255,9 @@ public: /** Move messages from one queue to another. A zero quantity means to move all messages */ - uint32_t queueMoveMessages( const std::string& srcQueue, + uint32_t queueMoveMessages( const std::string& srcQueue, const std::string& destQueue, - uint32_t qty); + uint32_t qty); boost::shared_ptr<sys::ProtocolFactory> getProtocolFactory(const std::string& name = TCP_TRANSPORT) const; @@ -279,8 +281,17 @@ public: void setRecovery(bool set) { recovery = set; } bool getRecovery() const { return recovery; } - void setClusterUpdatee(bool set) { clusterUpdatee = set; } + /** True of this broker is part of a cluster. + * Only valid after early initialization of plugins is complete. + */ + bool isInCluster() const { return inCluster; } + void setInCluster(bool set) { inCluster = set; } + + /** True if this broker is joining a cluster and in the process of + * receiving a state update. + */ bool isClusterUpdatee() const { return clusterUpdatee; } + void setClusterUpdatee(bool set) { clusterUpdatee = set; } management::ManagementAgent* getManagementAgent() { return managementAgent.get(); } diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 460799280e..67713a6eb7 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -278,8 +278,7 @@ void Connection::setUserId(const string& userId) ConnectionState::setUserId(userId); // In a cluster, the cluster code will raise the connect event // when the connection is replicated to the cluster. - if (!sys::isCluster()) - raiseConnectEvent(); + if (!broker.isInCluster()) raiseConnectEvent(); } void Connection::raiseConnectEvent() { diff --git a/cpp/src/qpid/broker/IncompleteMessageList.cpp b/cpp/src/qpid/broker/IncompleteMessageList.cpp deleted file mode 100644 index 34d92fa752..0000000000 --- a/cpp/src/qpid/broker/IncompleteMessageList.cpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/broker/IncompleteMessageList.h" - -namespace qpid { -namespace broker { - -IncompleteMessageList::IncompleteMessageList() : - callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1)) -{} - -IncompleteMessageList::~IncompleteMessageList() -{ - // No lock here. We are relying on Messsag::reset*CompleteCallback - // to ensure no callbacks are in progress before they return. - for (Messages::iterator i = incomplete.begin(); i != incomplete.end(); ++i) { - (*i)->resetEnqueueCompleteCallback(); - (*i)->resetDequeueCompleteCallback(); - } -} - -void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg) -{ - sys::Mutex::ScopedLock l(lock); - msg->setEnqueueCompleteCallback(callback); - incomplete.push_back(msg); -} - -void IncompleteMessageList::enqueueComplete(const boost::intrusive_ptr<Message>& ) { - sys::Mutex::ScopedLock l(lock); - lock.notify(); -} - -void IncompleteMessageList::process(const CompletionListener& listen, bool sync) -{ - sys::Mutex::ScopedLock l(lock); - while (!incomplete.empty()) { - boost::intrusive_ptr<Message>& msg = incomplete.front(); - if (!msg->isEnqueueComplete()) { - if (sync){ - { - sys::Mutex::ScopedUnlock u(lock); - msg->flush(); // Can re-enter IncompleteMessageList::enqueueComplete - } - while (!msg->isEnqueueComplete()) - lock.wait(); - } else { - //leave the message as incomplete for now - return; - } - } - listen(msg); - incomplete.pop_front(); - } -} - -void IncompleteMessageList::each(const CompletionListener& listen) { - Messages snapshot; - { - sys::Mutex::ScopedLock l(lock); - snapshot = incomplete; - } - std::for_each(incomplete.begin(), incomplete.end(), listen); -} - -}} diff --git a/cpp/src/qpid/broker/IncompleteMessageList.h b/cpp/src/qpid/broker/IncompleteMessageList.h deleted file mode 100644 index a4debd1233..0000000000 --- a/cpp/src/qpid/broker/IncompleteMessageList.h +++ /dev/null @@ -1,58 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _IncompleteMessageList_ -#define _IncompleteMessageList_ - -#include "qpid/broker/BrokerImportExport.h" -#include "qpid/sys/Monitor.h" -#include "qpid/broker/Message.h" -#include <boost/intrusive_ptr.hpp> -#include <boost/function.hpp> -#include <list> - -namespace qpid { -namespace broker { - -class IncompleteMessageList -{ - typedef std::list< boost::intrusive_ptr<Message> > Messages; - - void enqueueComplete(const boost::intrusive_ptr<Message>&); - - sys::Monitor lock; - Messages incomplete; - Message::MessageCallback callback; - -public: - typedef Message::MessageCallback CompletionListener; - - QPID_BROKER_EXTERN IncompleteMessageList(); - QPID_BROKER_EXTERN ~IncompleteMessageList(); - - QPID_BROKER_EXTERN void add(boost::intrusive_ptr<Message> msg); - QPID_BROKER_EXTERN void process(const CompletionListener& l, bool sync); - void each(const CompletionListener& l); -}; - - -}} - -#endif diff --git a/cpp/src/qpid/broker/Link.cpp b/cpp/src/qpid/broker/Link.cpp index e1091df724..f3acf7c660 100644 --- a/cpp/src/qpid/broker/Link.cpp +++ b/cpp/src/qpid/broker/Link.cpp @@ -134,7 +134,7 @@ void Link::established () QPID_LOG (info, "Inter-broker link established to " << addr.str()); // Don't raise the management event in a cluster, other members wont't get this call. - if (!sys::isCluster()) + if (broker && !broker->isInCluster()) agent->raiseEvent(_qmf::EventBrokerLinkUp(addr.str())); { @@ -159,7 +159,7 @@ void Link::closed (int, std::string text) stringstream addr; addr << host << ":" << port; QPID_LOG (warning, "Inter-broker link disconnected from " << addr.str()); - if (!sys::isCluster()) + if (broker && !broker->isInCluster()) agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str())); } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index c589669e5a..122c5b9c1a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -50,14 +50,15 @@ TransferAdapter Message::TRANSFER; Message::Message(const framing::SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(FAR_FUTURE), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) {} + expiration(FAR_FUTURE), dequeueCallback(0), + inCallback(false), requiredCredit(0) +{} Message::Message(const Message& original) : PersistableMessage(), frames(original.frames), persistenceId(0), redelivered(false), loaded(false), staged(false), forcePersistentPolicy(false), publisher(0), adapter(0), - expiration(original.expiration), enqueueCallback(0), dequeueCallback(0), - inCallback(false), requiredCredit(0) + expiration(original.expiration), dequeueCallback(0), + inCallback(false), requiredCredit(0) { setExpiryPolicy(original.expiryPolicy); } @@ -415,30 +416,12 @@ struct ScopedSet { }; } -void Message::allEnqueuesComplete() { - ScopedSet ss(callbackLock, inCallback); - MessageCallback* cb = enqueueCallback; - if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); -} - void Message::allDequeuesComplete() { ScopedSet ss(callbackLock, inCallback); MessageCallback* cb = dequeueCallback; if (cb && *cb) (*cb)(intrusive_ptr<Message>(this)); } -void Message::setEnqueueCompleteCallback(MessageCallback& cb) { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = &cb; -} - -void Message::resetEnqueueCompleteCallback() { - sys::Mutex::ScopedLock l(callbackLock); - while (inCallback) callbackLock.wait(); - enqueueCallback = 0; -} - void Message::setDequeueCompleteCallback(MessageCallback& cb) { sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f7dd2734b6..2d0de27823 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -154,10 +154,6 @@ public: bool isForcedPersistent(); - /** Call cb when enqueue is complete, may call immediately. Holds cb by reference. */ - void setEnqueueCompleteCallback(MessageCallback& cb); - void resetEnqueueCompleteCallback(); - /** Call cb when dequeue is complete, may call immediately. Holds cb by reference. */ void setDequeueCompleteCallback(MessageCallback& cb); void resetDequeueCompleteCallback(); @@ -166,7 +162,6 @@ public: private: MessageAdapter& getAdapter() const; - void allEnqueuesComplete(); void allDequeuesComplete(); mutable sys::Mutex lock; @@ -187,7 +182,6 @@ public: mutable boost::intrusive_ptr<Message> empty; sys::Monitor callbackLock; - MessageCallback* enqueueCallback; MessageCallback* dequeueCallback; bool inCallback; diff --git a/cpp/src/qpid/broker/PersistableMessage.cpp b/cpp/src/qpid/broker/PersistableMessage.cpp index e5fbb71cbd..7ba28eb293 100644 --- a/cpp/src/qpid/broker/PersistableMessage.cpp +++ b/cpp/src/qpid/broker/PersistableMessage.cpp @@ -34,7 +34,6 @@ class MessageStore; PersistableMessage::~PersistableMessage() {} PersistableMessage::PersistableMessage() : - asyncEnqueueCounter(0), asyncDequeueCounter(0), store(0) {} @@ -68,24 +67,6 @@ bool PersistableMessage::isContentReleased() const return contentReleaseState.released; } -bool PersistableMessage::isEnqueueComplete() { - sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - return asyncEnqueueCounter == 0; -} - -void PersistableMessage::enqueueComplete() { - bool notify = false; - { - sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - if (asyncEnqueueCounter > 0) { - if (--asyncEnqueueCounter == 0) { - notify = true; - } - } - } - if (notify) - allEnqueuesComplete(); -} bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ if (store && (queue->getPersistenceId()!=0)) { @@ -109,12 +90,7 @@ void PersistableMessage::addToSyncList(PersistableQueue::shared_ptr queue, Messa void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { addToSyncList(queue, _store); - enqueueAsync(); -} - -void PersistableMessage::enqueueAsync() { - sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock); - asyncEnqueueCounter++; + enqueueStart(); } bool PersistableMessage::isDequeueComplete() { diff --git a/cpp/src/qpid/broker/PersistableMessage.h b/cpp/src/qpid/broker/PersistableMessage.h index 96fb922c1a..a84aa45d76 100644 --- a/cpp/src/qpid/broker/PersistableMessage.h +++ b/cpp/src/qpid/broker/PersistableMessage.h @@ -31,6 +31,7 @@ #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" #include "qpid/broker/PersistableQueue.h" +#include "qpid/broker/AsyncCompletion.h" namespace qpid { namespace broker { @@ -43,18 +44,19 @@ class MessageStore; class PersistableMessage : public Persistable { typedef std::list< boost::weak_ptr<PersistableQueue> > syncList; - sys::Mutex asyncEnqueueLock; sys::Mutex asyncDequeueLock; sys::Mutex storeLock; - + /** - * Tracks the number of outstanding asynchronous enqueue - * operations. When the message is enqueued asynchronously the - * count is incremented; when that enqueue completes it is - * decremented. Thus when it is 0, there are no outstanding - * enqueues. + * "Ingress" messages == messages sent _to_ the broker. + * Tracks the number of outstanding asynchronous operations that must + * complete before an inbound message can be considered fully received by the + * broker. E.g. all enqueues have completed, the message has been written + * to store, credit has been replenished, etc. Once all outstanding + * operations have completed, the transfer of this message from the client + * may be considered complete. */ - int asyncEnqueueCounter; + boost::shared_ptr<AsyncCompletion> ingressCompletion; /** * Tracks the number of outstanding asynchronous dequeue @@ -65,7 +67,6 @@ class PersistableMessage : public Persistable */ int asyncDequeueCounter; - void enqueueAsync(); void dequeueAsync(); syncList synclist; @@ -80,8 +81,6 @@ class PersistableMessage : public Persistable ContentReleaseState contentReleaseState; protected: - /** Called when all enqueues are complete for this message. */ - virtual void allEnqueuesComplete() = 0; /** Called when all dequeues are complete for this message. */ virtual void allDequeuesComplete() = 0; @@ -115,9 +114,13 @@ class PersistableMessage : public Persistable virtual QPID_BROKER_EXTERN bool isPersistent() const = 0; - QPID_BROKER_EXTERN bool isEnqueueComplete(); + /** track the progress of a message received by the broker - see ingressCompletion above */ + QPID_BROKER_EXTERN bool isIngressComplete() { return !ingressCompletion || ingressCompletion->isDone(); } + QPID_BROKER_EXTERN boost::shared_ptr<AsyncCompletion>& getIngressCompletion() { return ingressCompletion; } + QPID_BROKER_EXTERN void setIngressCompletion(boost::shared_ptr<AsyncCompletion>& ic) { ingressCompletion = ic; } - QPID_BROKER_EXTERN void enqueueComplete(); + QPID_BROKER_EXTERN void enqueueStart() { if (ingressCompletion) ingressCompletion->startCompleter(); } + QPID_BROKER_EXTERN void enqueueComplete() { if (ingressCompletion) ingressCompletion->finishCompleter(); } QPID_BROKER_EXTERN void enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store); @@ -133,7 +136,6 @@ class PersistableMessage : public Persistable bool isStoredOnQueue(PersistableQueue::shared_ptr queue); void addToSyncList(PersistableQueue::shared_ptr queue, MessageStore* _store); - }; }} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 40cb80010c..d18b0fcda3 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/MessageStore.h" #include "qpid/broker/NullMessageStore.h" #include "qpid/broker/QueueRegistry.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" #include "qpid/StringUtils.h" @@ -163,13 +164,8 @@ void Queue::deliver(boost::intrusive_ptr<Message> msg){ //drop message QPID_LOG(info, "Dropping excluded message from " << getName()); } else { - // if no store then mark as enqueued - if (!enqueue(0, msg)){ - push(msg); - msg->enqueueComplete(); - }else { - push(msg); - } + enqueue(0, msg); + push(msg); QPID_LOG(debug, "Message " << msg << " enqueued on " << name); } } @@ -546,7 +542,7 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ void isEnqueueComplete(uint32_t* result, const QueuedMessage& message) { - if (message.payload->isEnqueueComplete()) (*result)++; + if (message.payload->isIngressComplete()) (*result)++; } /** function only provided for unit tests, or code not in critical message path */ @@ -819,11 +815,14 @@ void Queue::configure(const FieldTable& _settings, bool recovering) if (autoDeleteTimeout) QPID_LOG(debug, "Configured queue " << getName() << " with qpid.auto_delete_timeout=" << autoDeleteTimeout); - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->set_arguments(ManagementAgent::toMap(_settings)); + } if ( isDurable() && ! getPersistenceId() && ! recovering ) store->create(*this, _settings); + + QueueFlowLimit::observe(*this, _settings); } void Queue::destroyed() @@ -1176,6 +1175,7 @@ void Queue::flush() if (u.acquired && store) store->flush(*this); } + bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, const qpid::framing::FieldTable& arguments) { @@ -1190,6 +1190,13 @@ bool Queue::bind(boost::shared_ptr<Exchange> exchange, const std::string& key, } } + +const Broker* Queue::getBroker() +{ + return broker; +} + + Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {} bool Queue::UsageBarrier::acquire() diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 1a3b632845..adf2a1dd64 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -363,6 +363,8 @@ class Queue : public boost::enable_shared_from_this<Queue>, void recoverPrepared(boost::intrusive_ptr<Message>& msg); void flush(); + + const Broker* getBroker(); }; } } diff --git a/cpp/src/qpid/broker/QueueFlowLimit.cpp b/cpp/src/qpid/broker/QueueFlowLimit.cpp new file mode 100644 index 0000000000..a99c9de7df --- /dev/null +++ b/cpp/src/qpid/broker/QueueFlowLimit.cpp @@ -0,0 +1,367 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/QueueFlowLimit.h" +#include "qpid/broker/Broker.h" +#include "qpid/broker/Queue.h" +#include "qpid/Exception.h" +#include "qpid/framing/FieldValue.h" +#include "qpid/framing/reply_exceptions.h" +#include "qpid/log/Statement.h" +#include "qpid/sys/Mutex.h" +#include "qpid/broker/SessionState.h" +#include "qpid/sys/ClusterSafe.h" + +#include "qmf/org/apache/qpid/broker/Queue.h" + +#include <sstream> + +using namespace qpid::broker; +using namespace qpid::framing; + +namespace { + /** ensure that the configured flow control stop and resume values are + * valid with respect to the maximum queue capacity, and each other + */ + template <typename T> + void validateFlowConfig(T max, T& stop, T& resume, const std::string& type, const std::string& queue) + { + if (resume > stop) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_resume_" << type + << "=" << resume + << " must be less than qpid.flow_stop_" << type + << "=" << stop)); + } + if (resume == 0) resume = stop; + if (max != 0 && (max < stop)) { + throw InvalidArgumentException(QPID_MSG("Queue \"" << queue << "\": qpid.flow_stop_" << type + << "=" << stop + << " must be less than qpid.max_" << type + << "=" << max)); + } + } + + /** extract a capacity value as passed in an argument map + */ + uint64_t getCapacity(const FieldTable& settings, const std::string& key, uint64_t defaultValue) + { + FieldTable::ValuePtr v = settings.get(key); + + int64_t result = 0; + + if (!v) return defaultValue; + if (v->getType() == 0x23) { + QPID_LOG(debug, "Value for " << key << " specified as float: " << v->get<float>()); + } else if (v->getType() == 0x33) { + QPID_LOG(debug, "Value for " << key << " specified as double: " << v->get<double>()); + } else if (v->convertsTo<int64_t>()) { + result = v->get<int64_t>(); + QPID_LOG(debug, "Got integer value for " << key << ": " << result); + if (result >= 0) return result; + } else if (v->convertsTo<string>()) { + string s(v->get<string>()); + QPID_LOG(debug, "Got string value for " << key << ": " << s); + std::istringstream convert(s); + if (convert >> result && result >= 0) return result; + } + + QPID_LOG(warning, "Cannot convert " << key << " to unsigned integer, using default (" << defaultValue << ")"); + return defaultValue; + } +} + + + +QueueFlowLimit::QueueFlowLimit(Queue *_queue, + uint32_t _flowStopCount, uint32_t _flowResumeCount, + uint64_t _flowStopSize, uint64_t _flowResumeSize) + : queue(_queue), queueName("<unknown>"), + flowStopCount(_flowStopCount), flowResumeCount(_flowResumeCount), + flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize), + flowStopped(false), count(0), size(0), queueMgmtObj(0), broker(0) +{ + uint32_t maxCount(0); + uint64_t maxSize(0); + + if (queue) { + queueName = _queue->getName(); + if (queue->getPolicy()) { + maxSize = _queue->getPolicy()->getMaxSize(); + maxCount = _queue->getPolicy()->getMaxCount(); + } + broker = queue->getBroker(); + queueMgmtObj = dynamic_cast<_qmfBroker::Queue*> (queue->GetManagementObject()); + if (queueMgmtObj) { + queueMgmtObj->set_flowStopped(isFlowControlActive()); + } + } + validateFlowConfig( maxCount, flowStopCount, flowResumeCount, "count", queueName ); + validateFlowConfig( maxSize, flowStopSize, flowResumeSize, "size", queueName ); + QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount=" << flowStopCount + << ", flowResumeCount=" << flowResumeCount + << ", flowStopSize=" << flowStopSize << ", flowResumeSize=" << flowResumeSize ); +} + + + +void QueueFlowLimit::enqueued(const QueuedMessage& msg) +{ + if (!msg.payload) return; + + sys::Mutex::ScopedLock l(indexLock); + + ++count; + size += msg.payload->contentSize(); + + if (!flowStopped) { + if (flowStopCount && count > flowStopCount) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopCount << " enqueued messages. Producer flow control activated." ); + } else if (flowStopSize && size > flowStopSize) { + flowStopped = true; + QPID_LOG(info, "Queue \"" << queueName << "\": has reached " << flowStopSize << " enqueued bytes. Producer flow control activated." ); + } + if (flowStopped && queueMgmtObj) + queueMgmtObj->set_flowStopped(true); + } + + /** @todo KAG: - REMOVE ONCE STABLE */ + if (index.find(msg.payload) != index.end()) { + QPID_LOG(error, "Queue \"" << queueName << "\": has enqueued a msg twice: " << msg.position); + } + + if (flowStopped || !index.empty()) { + // ignore flow control if we are populating the queue due to cluster replication: + if (broker && broker->isClusterUpdatee()) { + QPID_LOG(trace, "Queue \"" << queueName << "\": ignoring flow control for msg pos=" << msg.position); + return; + } + QPID_LOG(trace, "Queue \"" << queueName << "\": setting flow control for msg pos=" << msg.position); + msg.payload->getIngressCompletion()->startCompleter(); // don't complete until flow resumes + index.insert(msg.payload); + } +} + + + +void QueueFlowLimit::dequeued(const QueuedMessage& msg) +{ + if (!msg.payload) return; + + sys::Mutex::ScopedLock l(indexLock); + + if (count > 0) { + --count; + } else { + throw Exception(QPID_MSG("Flow limit count underflow on dequeue. Queue=" << queueName)); + } + + uint64_t _size = msg.payload->contentSize(); + if (_size <= size) { + size -= _size; + } else { + throw Exception(QPID_MSG("Flow limit size underflow on dequeue. Queue=" << queueName)); + } + + if (flowStopped && + (flowResumeSize == 0 || size < flowResumeSize) && + (flowResumeCount == 0 || count < flowResumeCount)) { + flowStopped = false; + if (queueMgmtObj) + queueMgmtObj->set_flowStopped(false); + QPID_LOG(info, "Queue \"" << queueName << "\": has drained below the flow control resume level. Producer flow control deactivated." ); + } + + if (!index.empty()) { + if (!flowStopped) { + // flow enabled - release all pending msgs + while (!index.empty()) { + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.begin(); + (*itr)->getIngressCompletion()->finishCompleter(); + index.erase(itr); + } + } else { + // even if flow controlled, we must release this msg as it is being dequeued + std::set< boost::intrusive_ptr<Message> >::iterator itr = index.find(msg.payload); + if (itr != index.end()) { // this msg is flow controlled, release it: + (*itr)->getIngressCompletion()->finishCompleter(); + index.erase(itr); + } + } + } +} + + +/** used by clustering: is the given message's completion blocked due to flow + * control? True if message is blocked. (for the clustering updater: done + * after msgs have been replicated to the updatee). + */ +bool QueueFlowLimit::getState(const QueuedMessage& msg) const +{ + sys::Mutex::ScopedLock l(indexLock); + return (index.find(msg.payload) != index.end()); +} + + +/** artificially force the flow control state of a given message + * (for the clustering updatee: done after msgs have been replicated to + * the updatee's queue) + */ +void QueueFlowLimit::setState(const QueuedMessage& msg, bool blocked) +{ + if (blocked && msg.payload) { + + sys::Mutex::ScopedLock l(indexLock); + assert(index.find(msg.payload) == index.end()); + + QPID_LOG(debug, "Queue \"" << queue->getName() << "\": forcing flow control for msg pos=" << msg.position << " for CLUSTER SYNC"); + index.insert(msg.payload); + } +} + + +void QueueFlowLimit::encode(Buffer& buffer) const +{ + buffer.putLong(flowStopCount); + buffer.putLong(flowResumeCount); + buffer.putLongLong(flowStopSize); + buffer.putLongLong(flowResumeSize); + buffer.putLong(count); + buffer.putLongLong(size); +} + + +void QueueFlowLimit::decode ( Buffer& buffer ) +{ + flowStopCount = buffer.getLong(); + flowResumeCount = buffer.getLong(); + flowStopSize = buffer.getLongLong(); + flowResumeSize = buffer.getLongLong(); + count = buffer.getLong(); + size = buffer.getLongLong(); +} + + +uint32_t QueueFlowLimit::encodedSize() const { + return sizeof(uint32_t) + // flowStopCount + sizeof(uint32_t) + // flowResumecount + sizeof(uint64_t) + // flowStopSize + sizeof(uint64_t) + // flowResumeSize + sizeof(uint32_t) + // count + sizeof(uint64_t); // size +} + + +const std::string QueueFlowLimit::flowStopCountKey("qpid.flow_stop_count"); +const std::string QueueFlowLimit::flowResumeCountKey("qpid.flow_resume_count"); +const std::string QueueFlowLimit::flowStopSizeKey("qpid.flow_stop_size"); +const std::string QueueFlowLimit::flowResumeSizeKey("qpid.flow_resume_size"); +uint64_t QueueFlowLimit::defaultMaxSize; +uint QueueFlowLimit::defaultFlowStopRatio; +uint QueueFlowLimit::defaultFlowResumeRatio; + + +void QueueFlowLimit::setDefaults(uint64_t maxQueueSize, uint flowStopRatio, uint flowResumeRatio) +{ + defaultMaxSize = maxQueueSize; + defaultFlowStopRatio = flowStopRatio; + defaultFlowResumeRatio = flowResumeRatio; + + /** @todo Verify valid range on Broker::Options instead of here */ + if (flowStopRatio > 100 || flowResumeRatio > 100) + throw InvalidArgumentException(QPID_MSG("Default queue flow ratios must be between 0 and 100, inclusive:" + << " flowStopRatio=" << flowStopRatio + << " flowResumeRatio=" << flowResumeRatio)); + if (flowResumeRatio > flowStopRatio) + throw InvalidArgumentException(QPID_MSG("Default queue flow stop ratio must be >= flow resume ratio:" + << " flowStopRatio=" << flowStopRatio + << " flowResumeRatio=" << flowResumeRatio)); +} + + +void QueueFlowLimit::observe(Queue& queue, const qpid::framing::FieldTable& settings) +{ + QueueFlowLimit *ptr = createLimit( &queue, settings ); + if (ptr) { + boost::shared_ptr<QueueFlowLimit> observer(ptr); + queue.addObserver(observer); + } +} + +/** returns ptr to a QueueFlowLimit, else 0 if no limit */ +QueueFlowLimit *QueueFlowLimit::createLimit(Queue *queue, const qpid::framing::FieldTable& settings) +{ + std::string type(QueuePolicy::getType(settings)); + + if (type == QueuePolicy::RING || type == QueuePolicy::RING_STRICT) { + // The size of a RING queue is limited by design - no need for flow control. + return 0; + } + + if (settings.get(flowStopCountKey) || settings.get(flowStopSizeKey)) { + uint32_t flowStopCount = getCapacity(settings, flowStopCountKey, 0); + uint32_t flowResumeCount = getCapacity(settings, flowResumeCountKey, 0); + uint64_t flowStopSize = getCapacity(settings, flowStopSizeKey, 0); + uint64_t flowResumeSize = getCapacity(settings, flowResumeSizeKey, 0); + if (flowStopCount == 0 && flowStopSize == 0) { // disable flow control + return 0; + } + /** @todo KAG - remove once cluster support for flow control done. */ + // TODO aconway 2011-02-16: is queue==0 only in tests? + // TODO kgiusti 2011-02-19: yes! The unit tests test this class in isolation */ + if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " + << queue->getName()); + return 0; + } + return new QueueFlowLimit(queue, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); + } + + if (defaultFlowStopRatio) { + uint64_t maxByteCount = getCapacity(settings, QueuePolicy::maxSizeKey, defaultMaxSize); + uint64_t flowStopSize = (uint64_t)(maxByteCount * (defaultFlowStopRatio/100.0) + 0.5); + uint64_t flowResumeSize = (uint64_t)(maxByteCount * (defaultFlowResumeRatio/100.0)); + + /** todo KAG - remove once cluster support for flow control done. */ + if (queue && queue->getBroker() && queue->getBroker()->isInCluster()) { + QPID_LOG(warning, "Producer Flow Control TBD for clustered brokers - queue flow control disabled for queue " + << queue->getName()); + return 0; + } + + return new QueueFlowLimit(queue, 0, 0, flowStopSize, flowResumeSize); + } + return 0; +} + + +namespace qpid { + namespace broker { + +std::ostream& operator<<(std::ostream& out, const QueueFlowLimit& f) +{ + out << "; flowStopCount=" << f.flowStopCount << ", flowResumeCount=" << f.flowResumeCount; + out << "; flowStopSize=" << f.flowStopSize << ", flowResumeSize=" << f.flowResumeSize; + return out; +} + + } +} + diff --git a/cpp/src/qpid/broker/QueueFlowLimit.h b/cpp/src/qpid/broker/QueueFlowLimit.h new file mode 100644 index 0000000000..4d33007f0d --- /dev/null +++ b/cpp/src/qpid/broker/QueueFlowLimit.h @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _QueueFlowLimit_ +#define _QueueFlowLimit_ + +#include <list> +#include <set> +#include <iostream> +#include <memory> +#include "qpid/broker/BrokerImportExport.h" +#include "qpid/broker/QueuedMessage.h" +#include "qpid/broker/QueueObserver.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/sys/AtomicValue.h" +#include "qpid/sys/Mutex.h" + +namespace qmf { +namespace org { +namespace apache { +namespace qpid { +namespace broker { + class Queue; +}}}}} +namespace _qmfBroker = qmf::org::apache::qpid::broker; + +namespace qpid { +namespace broker { + +class Broker; + +/** + * Producer flow control: when level is > flowStop*, flow control is ON. + * then level is < flowResume*, flow control is OFF. If == 0, flow control + * is not used. If both byte and msg count thresholds are set, then + * passing _either_ level may turn flow control ON, but _both_ must be + * below level before flow control will be turned OFF. + */ + class QueueFlowLimit : public QueueObserver +{ + static uint64_t defaultMaxSize; + static uint defaultFlowStopRatio; + static uint defaultFlowResumeRatio; + + Queue *queue; + std::string queueName; + + uint32_t flowStopCount; + uint32_t flowResumeCount; + uint64_t flowStopSize; + uint64_t flowResumeSize; + bool flowStopped; // true = producers held in flow control + + // current queue utilization + uint32_t count; + uint64_t size; + + public: + static QPID_BROKER_EXTERN const std::string flowStopCountKey; + static QPID_BROKER_EXTERN const std::string flowResumeCountKey; + static QPID_BROKER_EXTERN const std::string flowStopSizeKey; + static QPID_BROKER_EXTERN const std::string flowResumeSizeKey; + + virtual ~QueueFlowLimit() {} + + /** the queue has added QueuedMessage. Returns true if flow state changes */ + void enqueued(const QueuedMessage&); + /** the queue has removed QueuedMessage. Returns true if flow state changes */ + void dequeued(const QueuedMessage&); + + /** for clustering: */ + /** true if the given message is flow controlled, and cannot be completed. */ + bool getState(const QueuedMessage&) const; + void setState(const QueuedMessage&, bool blocked); + + uint32_t getFlowStopCount() const { return flowStopCount; } + uint32_t getFlowResumeCount() const { return flowResumeCount; } + uint64_t getFlowStopSize() const { return flowStopSize; } + uint64_t getFlowResumeSize() const { return flowResumeSize; } + + uint32_t getFlowCount() const { return count; } + uint64_t getFlowSize() const { return size; } + bool isFlowControlActive() const { return flowStopped; } + bool monitorFlowControl() const { return flowStopCount || flowStopSize; } + + void encode(framing::Buffer& buffer) const; + void decode(framing::Buffer& buffer); + uint32_t encodedSize() const; + + static QPID_BROKER_EXTERN void observe(Queue& queue, const qpid::framing::FieldTable& settings); + static QPID_BROKER_EXTERN void setDefaults(uint64_t defaultMaxSize, uint defaultFlowStopRatio, uint defaultFlowResumeRatio); + + friend QPID_BROKER_EXTERN std::ostream& operator<<(std::ostream&, const QueueFlowLimit&); + + protected: + // msgs waiting for flow to become available. + std::set< boost::intrusive_ptr<Message> > index; + mutable qpid::sys::Mutex indexLock; + + _qmfBroker::Queue *queueMgmtObj; + + const Broker *broker; + + QueueFlowLimit(Queue *queue, + uint32_t flowStopCount, uint32_t flowResumeCount, + uint64_t flowStopSize, uint64_t flowResumeSize); + static QueueFlowLimit *createLimit(Queue *queue, const qpid::framing::FieldTable& settings); +}; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index c2f90dce47..9e4516679e 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -24,6 +24,7 @@ #include "qpid/log/Statement.h" #include "qpid/framing/SequenceSet.h" #include "qpid/management/ManagementAgent.h" +#include "qpid/broker/SessionState.h" #include "qmf/org/apache/qpid/broker/EventExchangeDeclare.h" #include "qmf/org/apache/qpid/broker/EventExchangeDelete.h" #include "qmf/org/apache/qpid/broker/EventQueueDeclare.h" @@ -509,7 +510,12 @@ framing::MessageResumeResult SessionAdapter::MessageHandlerImpl::resume(const st -void SessionAdapter::ExecutionHandlerImpl::sync() {} //essentially a no-op +void SessionAdapter::ExecutionHandlerImpl::sync() +{ + session.addPendingExecutionSync(); + /** @todo KAG - need a generic mechanism to allow a command to returning "not completed" status back to SessionState */ + +} void SessionAdapter::ExecutionHandlerImpl::result(const SequenceNumber& /*commandId*/, const string& /*value*/) { diff --git a/cpp/src/qpid/broker/SessionContext.h b/cpp/src/qpid/broker/SessionContext.h index afbbb2cc22..253ce8dcf2 100644 --- a/cpp/src/qpid/broker/SessionContext.h +++ b/cpp/src/qpid/broker/SessionContext.h @@ -46,6 +46,7 @@ class SessionContext : public OwnershipToken, public sys::OutputControl virtual Broker& getBroker() = 0; virtual uint16_t getChannel() const = 0; virtual const SessionId& getSessionId() const = 0; + virtual void addPendingExecutionSync() = 0; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp index 1ca7b6dfc1..11f3e84b70 100644 --- a/cpp/src/qpid/broker/SessionState.cpp +++ b/cpp/src/qpid/broker/SessionState.cpp @@ -60,9 +60,9 @@ SessionState::SessionState( semanticState(*this, *this), adapter(semanticState), msgBuilder(&broker.getStore()), - enqueuedOp(boost::bind(&SessionState::enqueued, this, _1)), mgmtObject(0), - rateFlowcontrol(0) + rateFlowcontrol(0), + scheduledCompleterContext(new ScheduledCompleterContext(this)) { uint32_t maxRate = broker.getOptions().maxSessionRate; if (maxRate) { @@ -101,6 +101,26 @@ SessionState::~SessionState() { if (flowControlTimer) flowControlTimer->cancel(); + + // clean up any outstanding incomplete commands + { + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > copy(incompleteCmds); + incompleteCmds.clear(); + while (!copy.empty()) { + boost::shared_ptr<IncompleteCommandContext> ref(copy.begin()->second); + copy.erase(copy.begin()); + { + // note: need to drop lock, as callback may attempt to take it. + qpid::sys::ScopedUnlock<Mutex> ul(incompleteCmdsLock); + ref->cancel(); + } + } + } + + // At this point, we are guaranteed no further completion callbacks will be + // made. Cancel any outstanding scheduledCompleter calls... + scheduledCompleterContext->cancel(); } AMQP_ClientProxy& SessionState::getProxy() { @@ -202,15 +222,17 @@ Manageable::status_t SessionState::ManagementMethod (uint32_t methodId, } void SessionState::handleCommand(framing::AMQMethodBody* method, const SequenceNumber& id) { + currentCommandComplete = true; // assumed, can be overridden by invoker method (this sucks). Invoker::Result invocation = invoke(adapter, *method); - receiverCompleted(id); + if (currentCommandComplete) receiverCompleted(id); + if (!invocation.wasHandled()) { throw NotImplementedException(QPID_MSG("Not implemented: " << *method)); } else if (invocation.hasResult()) { getProxy().getExecution().result(id, invocation.getResult()); } - if (method->isSync()) { - incomplete.process(enqueuedOp, true); + + if (method->isSync() && currentCommandComplete) { sendAcceptAndCompletion(); } } @@ -254,22 +276,13 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id) msg->getFrames().append(header); } msg->setPublisher(&getConnection()); + + boost::shared_ptr<AsyncCompletion> ac(boost::dynamic_pointer_cast<AsyncCompletion>(createIngressMsgXferContext(msg))); + msg->setIngressCompletion( ac ); + ac->begin(); semanticState.handle(msg); msgBuilder.end(); - - if (msg->isEnqueueComplete()) { - enqueued(msg); - } else { - incomplete.add(msg); - } - - //hold up execution until async enqueue is complete - if (msg->getFrames().getMethod()->isSync()) { - incomplete.process(enqueuedOp, true); - sendAcceptAndCompletion(); - } else { - incomplete.process(enqueuedOp, false); - } + ac->end(); // allows msg to complete xfer } // Handle producer session flow control @@ -319,11 +332,38 @@ void SessionState::sendAcceptAndCompletion() sendCompletion(); } -void SessionState::enqueued(boost::intrusive_ptr<Message> msg) +/** Invoked when the given inbound message is finished being processed + * by all interested parties (eg. it is done being enqueued to all queues, + * its credit has been accounted for, etc). At this point, msg is considered + * by this receiver as 'completed' (as defined by AMQP 0_10) + */ +void SessionState::completeRcvMsg(SequenceNumber id, + bool requiresAccept, + bool requiresSync) { - receiverCompleted(msg->getCommandId()); - if (msg->requiresAccept()) - accepted.add(msg->getCommandId()); + bool callSendCompletion = false; + receiverCompleted(id); + if (requiresAccept) + // will cause msg's seq to appear in the next message.accept we send. + accepted.add(id); + + // Are there any outstanding Execution.Sync commands pending the + // completion of this msg? If so, complete them. + while (!pendingExecutionSyncs.empty() && + receiverGetIncomplete().front() >= pendingExecutionSyncs.front()) { + const SequenceNumber id = pendingExecutionSyncs.front(); + pendingExecutionSyncs.pop(); + QPID_LOG(debug, getId() << ": delayed execution.sync " << id << " is completed."); + receiverCompleted(id); + callSendCompletion = true; // likely peer is pending for this completion. + } + + // if the sender has requested immediate notification of the completion... + if (requiresSync) { + sendAcceptAndCompletion(); + } else if (callSendCompletion) { + sendCompletion(); + } } void SessionState::handleIn(AMQFrame& frame) { @@ -396,4 +436,126 @@ framing::AMQP_ClientProxy& SessionState::getClusterOrderProxy() { return handler->getClusterOrderProxy(); } + +// Current received command is an execution.sync command. +// Complete this command only when all preceding commands have completed. +// (called via the invoker() in handleCommand() above) +void SessionState::addPendingExecutionSync() +{ + SequenceNumber syncCommandId = receiverGetCurrent(); + if (receiverGetIncomplete().front() < syncCommandId) { + currentCommandComplete = false; + pendingExecutionSyncs.push(syncCommandId); + QPID_LOG(debug, getId() << ": delaying completion of execution.sync " << syncCommandId); + } +} + + +/** factory for creating IncompleteIngressMsgXfer objects which + * can be references from Messages as ingress AsyncCompletion objects. + */ +boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> +SessionState::createIngressMsgXferContext(boost::intrusive_ptr<Message> msg) +{ + SequenceNumber id = msg->getCommandId(); + boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> cmd(new SessionState::IncompleteIngressMsgXfer(this, id, msg)); + qpid::sys::ScopedLock<Mutex> l(incompleteCmdsLock); + incompleteCmds[id] = cmd; + return cmd; +} + + +/** Invoked by the asynchronous completer associated with + * a received msg that is pending Completion. May be invoked + * by the SessionState directly (sync == true), or some external + * entity (!sync). + */ +void SessionState::IncompleteIngressMsgXfer::completed(bool sync) +{ + if (!sync) { + /** note well: this path may execute in any thread. It is safe to access + * the session, as the SessionState destructor will cancel all outstanding + * callbacks before getting destroyed (so we'll never get here). + */ + QPID_LOG(debug, ": async completion callback scheduled for msg seq=" << id); + if (session->scheduledCompleterContext->scheduleCompletion(id)) + session->getConnection().requestIOProcessing(boost::bind(&scheduledCompleter, + session->scheduledCompleterContext)); + } else { // command is being completed in IO thread. + // this path runs only on the IO thread. + qpid::sys::ScopedLock<Mutex> l(session->incompleteCmdsLock); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + cmd = session->incompleteCmds.find(id); + if (cmd != session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + session->incompleteCmds.erase(cmd); + + if (session->isAttached()) { + QPID_LOG(debug, ": receive completed for msg seq=" << id); + qpid::sys::ScopedUnlock<Mutex> ul(session->incompleteCmdsLock); + session->completeRcvMsg(id, requiresAccept, requiresSync); + return; + } + } + } +} + + +/** Scheduled from incomplete command's completed callback, safely completes all + * completed commands in the IO Thread. Guaranteed not to be running at the same + * time as the message receive code. + */ +void SessionState::scheduledCompleter(boost::shared_ptr<SessionState::ScheduledCompleterContext> ctxt) +{ + ctxt->completeCommands(); +} + + +/** mark a command (sequence) as completed, return True if caller should + * schedule a call to completeCommands() + */ +bool SessionState::ScheduledCompleterContext::scheduleCompletion(SequenceNumber cmd) +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + completedCmds.push_back(cmd); + return (completedCmds.size() == 1); +} + + +/** Cause the session to complete all completed commands */ +void SessionState::ScheduledCompleterContext::completeCommands() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + + // when session is destroyed, it clears the session pointer via cancel(). + if (!session) return; + + while (!completedCmds.empty()) { + SequenceNumber id = completedCmds.front(); + completedCmds.pop_front(); + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> >::iterator cmd; + { + qpid::sys::ScopedLock<qpid::sys::Mutex> l(session->incompleteCmdsLock); + + cmd = session->incompleteCmds.find(id); + if (cmd !=session->incompleteCmds.end()) { + boost::shared_ptr<IncompleteCommandContext> tmp(cmd->second); + { + qpid::sys::ScopedUnlock<qpid::sys::Mutex> ul(session->incompleteCmdsLock); + tmp->do_completion(); // retakes incompleteCmdslock + } + } + } + } +} + + +/** cancel any pending calls to scheduleComplete */ +void SessionState::ScheduledCompleterContext::cancel() +{ + qpid::sys::ScopedLock<qpid::sys::Mutex> l(completedCmdsLock); + session = 0; +} + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/SessionState.h b/cpp/src/qpid/broker/SessionState.h index be79eb0eab..5e162e6475 100644 --- a/cpp/src/qpid/broker/SessionState.h +++ b/cpp/src/qpid/broker/SessionState.h @@ -30,10 +30,11 @@ #include "qmf/org/apache/qpid/broker/Session.h" #include "qpid/broker/SessionAdapter.h" #include "qpid/broker/DeliveryAdapter.h" -#include "qpid/broker/IncompleteMessageList.h" +#include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/MessageBuilder.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SemanticState.h" +#include "qpid/sys/Monitor.h" #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> @@ -123,6 +124,10 @@ class SessionState : public qpid::SessionState, const SessionId& getSessionId() const { return getId(); } + // Used by ExecutionHandler sync command processing. Notifies + // the SessionState of a received Execution.Sync command. + void addPendingExecutionSync(); + // Used to delay creation of management object for sessions // belonging to inter-broker bridges void addManagementObject(); @@ -130,7 +135,10 @@ class SessionState : public qpid::SessionState, private: void handleCommand(framing::AMQMethodBody* method, const framing::SequenceNumber& id); void handleContent(framing::AMQFrame& frame, const framing::SequenceNumber& id); - void enqueued(boost::intrusive_ptr<Message> msg); + + // indicate that the given ingress msg has been completely received by the + // broker, and the msg's message.transfer command can be considered completed. + void completeRcvMsg(SequenceNumber id, bool requiresAccept, bool requiresSync); void handleIn(framing::AMQFrame& frame); void handleOut(framing::AMQFrame& frame); @@ -156,8 +164,6 @@ class SessionState : public qpid::SessionState, SemanticState semanticState; SessionAdapter adapter; MessageBuilder msgBuilder; - IncompleteMessageList incomplete; - IncompleteMessageList::CompletionListener enqueuedOp; qmf::org::apache::qpid::broker::Session* mgmtObject; qpid::framing::SequenceSet accepted; @@ -166,6 +172,84 @@ class SessionState : public qpid::SessionState, boost::scoped_ptr<RateFlowcontrol> rateFlowcontrol; boost::intrusive_ptr<sys::TimerTask> flowControlTimer; + // sequence numbers for pending received Execution.Sync commands + std::queue<SequenceNumber> pendingExecutionSyncs; + bool currentCommandComplete; + + /** Abstract class that represents a command that is pending + * completion. + */ + class IncompleteCommandContext : public AsyncCompletion + { + public: + IncompleteCommandContext( SessionState *ss, SequenceNumber _id ) + : id(_id), session(ss) {} + virtual ~IncompleteCommandContext() {} + + /* allows manual invokation of completion, used by IO thread to + * complete a command that was originally finished on a different + * thread. + */ + void do_completion() { completed(true); } + + protected: + SequenceNumber id; + SessionState *session; + }; + + /** incomplete Message.transfer commands - inbound to broker from client + */ + class IncompleteIngressMsgXfer : public SessionState::IncompleteCommandContext + { + public: + IncompleteIngressMsgXfer( SessionState *ss, + SequenceNumber _id, + boost::intrusive_ptr<Message> msg ) + : IncompleteCommandContext(ss, _id), + requiresAccept(msg->requiresAccept()), + requiresSync(msg->getFrames().getMethod()->isSync()) {}; + virtual ~IncompleteIngressMsgXfer() {}; + + protected: + virtual void completed(bool); + + private: + /** meta-info required to complete the message */ + bool requiresAccept; + bool requiresSync; // method's isSync() flag + }; + /** creates a command context suitable for use as an AsyncCompletion in a message */ + boost::shared_ptr<SessionState::IncompleteIngressMsgXfer> createIngressMsgXferContext( boost::intrusive_ptr<Message> msg); + + /* A list of commands that are pending completion. These commands are + * awaiting some set of asynchronous operations to finish (eg: store, + * flow-control, etc). before the command can be completed to the client + */ + std::map<SequenceNumber, boost::shared_ptr<IncompleteCommandContext> > incompleteCmds; + qpid::sys::Mutex incompleteCmdsLock; // locks above container + + /** This context is shared between the SessionState and scheduledCompleter, + * holds the sequence numbers of all commands that have completed asynchronously. + */ + class ScheduledCompleterContext { + private: + std::list<SequenceNumber> completedCmds; + // ordering: take this lock first, then incompleteCmdsLock + qpid::sys::Mutex completedCmdsLock; + SessionState *session; + public: + ScheduledCompleterContext(SessionState *s) : session(s) {}; + bool scheduleCompletion(SequenceNumber cmd); + void completeCommands(); + void cancel(); + }; + boost::shared_ptr<ScheduledCompleterContext> scheduledCompleterContext; + + /** The following method runs the in IO thread and completes commands that + * where finished asynchronously. + */ + static void scheduledCompleter(boost::shared_ptr<ScheduledCompleterContext>); + friend class SessionManager; }; diff --git a/cpp/src/qpid/cluster/Cluster.cpp b/cpp/src/qpid/cluster/Cluster.cpp index dd4882774b..fe5a1c806e 100644 --- a/cpp/src/qpid/cluster/Cluster.cpp +++ b/cpp/src/qpid/cluster/Cluster.cpp @@ -36,28 +36,28 @@ * * IMPORTANT NOTE: any time code is added to the broker that uses timers, * the cluster may need to be updated to take account of this. - * + * * * USE OF TIMESTAMPS IN THE BROKER - * + * * The following are the current areas where broker uses timers or timestamps: - * + * * - Producer flow control: broker::SemanticState uses * connection::getClusterOrderOutput. a FrameHandler that sends * frames to the client via the cluster. Used by broker::SessionState - * + * * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is * implemented by cluster::ExpiryPolicy. - * + * * - Connection heartbeat: sends connection controls, not part of * session command counting so OK to ignore. - * + * * - LinkRegistry: only cluster elder is ever active for links. - * + * * - management::ManagementBroker: uses MessageHandler supplied by cluster * to send messages to the broker via the cluster. - * - * - Dtx: not yet supported with cluster. + * + * - Dtx: not yet supported with cluster. * * cluster::ExpiryPolicy implements the strategy for message expiry. * @@ -65,16 +65,16 @@ * Used for periodic management events. * * <h1>CLUSTER PROTOCOL OVERVIEW</h1> - * + * * Messages sent to/from CPG are called Events. * * An Event carries a ConnectionId, which includes a MemberId and a * connection number. - * + * * Events are either * - Connection events: non-0 connection number and are associated with a connection. * - Cluster Events: 0 connection number, are not associated with a connection. - * + * * Events are further categorized as: * - Control: carries method frame(s) that affect cluster behavior. * - Data: carries raw data received from a client connection. @@ -214,7 +214,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { { cluster.initialStatus( member, version, active, clusterId, - framing::cluster::StoreState(storeState), shutdownId, + framing::cluster::StoreState(storeState), shutdownId, firstConfig, l); } void ready(const std::string& url) { @@ -244,7 +244,7 @@ struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler { }; Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : - settings(set), + settings(set), broker(b), mgmtObject(0), poller(b.getPoller()), @@ -279,6 +279,8 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : updateClosed(false), error(*this) { + broker.setInCluster(true); + // We give ownership of the timer to the broker and keep a plain pointer. // This is OK as it means the timer has the same lifetime as the broker. timer = new ClusterTimer(*this); @@ -299,7 +301,7 @@ Cluster::Cluster(const ClusterSettings& set, broker::Broker& b) : // Load my store status before we go into initialization if (! broker::NullMessageStore::isNullStore(&broker.getStore())) { store.load(); - clusterId = store.getClusterId(); + clusterId = store.getClusterId(); QPID_LOG(notice, "Cluster store state: " << store) } cpg.join(name); @@ -360,14 +362,14 @@ void Cluster::addShadowConnection(const boost::intrusive_ptr<Connection>& c) { // Safe to use connections here because we're pre-catchup, stalled // and discarding, so deliveredFrame is not processing any // connection events. - assert(discarding); + assert(discarding); pair<ConnectionMap::iterator, bool> ib = connections.insert(ConnectionMap::value_type(c->getId(), c)); assert(ib.second); } void Cluster::erase(const ConnectionId& id) { - Lock l(lock); + Lock l(lock); erase(id,l); } @@ -393,9 +395,9 @@ std::vector<Url> Cluster::getUrls() const { std::vector<Url> Cluster::getUrls(Lock&) const { return map.memberUrls(); -} +} -void Cluster::leave() { +void Cluster::leave() { Lock l(lock); leave(l); } @@ -405,7 +407,7 @@ void Cluster::leave() { QPID_LOG(warning, *this << " error leaving cluster: " << e.what()); \ } do {} while(0) -void Cluster::leave(Lock&) { +void Cluster::leave(Lock&) { if (state != LEFT) { state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); @@ -424,7 +426,7 @@ void Cluster::deliver( uint32_t nodeid, uint32_t pid, void* msg, - int msg_len) + int msg_len) { MemberId from(nodeid, pid); framing::Buffer buf(static_cast<char*>(msg), msg_len); @@ -455,7 +457,7 @@ void Cluster::deliveredEvent(const Event& e) { EventFrame ef(e, e.getFrame()); // Stop the deliverEventQueue on update offers. // This preserves the connection decoder fragments for an update. - // Only do this for the two brokers that are directly involved in this + // Only do this for the two brokers that are directly involved in this // offer: the one making the offer, or the one receiving it. const ClusterUpdateOfferBody* offer = castUpdateOffer(ef.frame.getBody()); if (offer && ( e.getMemberId() == self || MemberId(offer->getUpdatee()) == self) ) { @@ -465,7 +467,7 @@ void Cluster::deliveredEvent(const Event& e) { } deliverFrame(ef); } - else if(!discarding) { + else if(!discarding) { if (e.isControl()) deliverFrame(EventFrame(e, e.getFrame())); else { @@ -507,7 +509,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { // the event queue. e.frame = AMQFrame( ClusterRetractOfferBody(ProtocolVersion(), offer->getUpdatee())); - deliverEventQueue.start(); + deliverEventQueue.start(); } // Process each frame through the error checker. if (error.isUnresolved()) { @@ -515,7 +517,7 @@ void Cluster::deliveredFrame(const EventFrame& efConst) { while (error.canProcess()) // There is a frame ready to process. processFrame(error.getNext(), l); } - else + else processFrame(e, l); } @@ -577,7 +579,7 @@ Cluster::ConnectionVector Cluster::getConnections(Lock&) { } // CPG config-change callback. -void Cluster::configChange ( +void Cluster::configChange ( cpg_handle_t /*handle*/, const cpg_name */*group*/, const cpg_address *members, int nMembers, @@ -607,7 +609,7 @@ void Cluster::setReady(Lock&) { } // Set the management status from the Cluster::state. -// +// // NOTE: Management updates are sent based on property changes. In // order to keep consistency across the cluster, we touch the local // management status property even if it is locally unchanged for any @@ -618,7 +620,7 @@ void Cluster::setMgmtStatus(Lock&) { } void Cluster::initMapCompleted(Lock& l) { - // Called on completion of the initial status map. + // Called on completion of the initial status map. QPID_LOG(debug, *this << " initial status map complete. "); setMgmtStatus(l); if (state == PRE_INIT) { @@ -701,8 +703,8 @@ void Cluster::configChange(const MemberId&, if (initMap.isResendNeeded()) { mcast.mcastControl( ClusterInitialStatusBody( - ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, - store.getState(), store.getShutdownId(), + ProtocolVersion(), CLUSTER_VERSION, state > INIT, clusterId, + store.getState(), store.getShutdownId(), initMap.getFirstConfigStr() ), self); @@ -759,7 +761,7 @@ std::string Cluster::debugSnapshot() { // point we know the poller has stopped so no poller callbacks will be // invoked. We must ensure that CPG has also shut down so no CPG // callbacks will be invoked. -// +// void Cluster::brokerShutdown() { sys::ClusterSafeScope css; // Don't trigger cluster-safe asserts. try { cpg.shutdown(); } @@ -775,7 +777,7 @@ void Cluster::updateRequest(const MemberId& id, const std::string& url, Lock& l) } void Cluster::initialStatus(const MemberId& member, uint32_t version, bool active, - const framing::Uuid& id, + const framing::Uuid& id, framing::cluster::StoreState store, const framing::Uuid& shutdownId, const std::string& firstConfig, @@ -969,7 +971,7 @@ void Cluster::updateOutDone(Lock& l) { void Cluster::updateOutError(const std::exception& e) { Monitor::ScopedLock l(lock); - QPID_LOG(error, *this << " error sending update: " << e.what()); + QPID_LOG(error, *this << " error sending update: " << e.what()); updateOutDone(l); } @@ -1067,7 +1069,7 @@ void Cluster::memberUpdate(Lock& l) { void Cluster::updateMgmtMembership(Lock& l) { if (!mgmtObject) return; std::vector<Url> urls = getUrls(l); - mgmtObject->set_clusterSize(urls.size()); + mgmtObject->set_clusterSize(urls.size()); string urlstr; for(std::vector<Url>::iterator i = urls.begin(); i != urls.end(); i++ ) { if (i != urls.begin()) urlstr += ";"; diff --git a/cpp/src/qpid/cluster/Event.h b/cpp/src/qpid/cluster/Event.h index 07f74d3ba5..c2dca073d1 100644 --- a/cpp/src/qpid/cluster/Event.h +++ b/cpp/src/qpid/cluster/Event.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -53,7 +53,7 @@ class EventHeader { /** Size of payload data, excluding header. */ size_t getSize() const { return size; } - /** Size of header + payload. */ + /** Size of header + payload. */ size_t getStoreSize() const { return size + HEADER_SIZE; } bool isCluster() const { return connectionId.getNumber() == 0; } @@ -62,7 +62,7 @@ class EventHeader { protected: static const size_t HEADER_SIZE; - + EventType type; ConnectionId connectionId; size_t size; @@ -86,7 +86,7 @@ class Event : public EventHeader { /** Create a control event. */ static Event control(const framing::AMQFrame&, const ConnectionId&); - + // Data excluding header. char* getData() { return store + HEADER_SIZE; } const char* getData() const { return store + HEADER_SIZE; } @@ -95,12 +95,12 @@ class Event : public EventHeader { char* getStore() { return store; } const char* getStore() const { return store; } - const framing::AMQFrame& getFrame() const; - + const framing::AMQFrame& getFrame() const; + operator framing::Buffer() const; iovec toIovec() const; - + private: void encodeHeader() const; diff --git a/cpp/src/qpid/cluster/EventFrame.h b/cpp/src/qpid/cluster/EventFrame.h index 61447c5525..6b702a9bf8 100644 --- a/cpp/src/qpid/cluster/EventFrame.h +++ b/cpp/src/qpid/cluster/EventFrame.h @@ -10,9 +10,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -48,7 +48,7 @@ struct EventFrame ConnectionId connectionId; - framing::AMQFrame frame; + framing::AMQFrame frame; int readCredit; ///< last frame in an event, give credit when processed. EventType type; }; diff --git a/cpp/src/qpid/sys/ClusterSafe.cpp b/cpp/src/qpid/sys/ClusterSafe.cpp index c6b527dfdf..b67b04c267 100644 --- a/cpp/src/qpid/sys/ClusterSafe.cpp +++ b/cpp/src/qpid/sys/ClusterSafe.cpp @@ -34,8 +34,6 @@ QPID_TSS bool inContext = false; bool isClusterSafe() { return !inCluster || inContext; } -bool isCluster() { return inCluster; } - void assertClusterSafe() { if (!isClusterSafe()) { QPID_LOG(critical, "Modified cluster state outside of cluster context"); diff --git a/cpp/src/qpid/sys/ClusterSafe.h b/cpp/src/qpid/sys/ClusterSafe.h index 15675e8cc5..42e290f4c8 100644 --- a/cpp/src/qpid/sys/ClusterSafe.h +++ b/cpp/src/qpid/sys/ClusterSafe.h @@ -52,9 +52,6 @@ QPID_COMMON_EXTERN void assertClusterSafe(); */ QPID_COMMON_EXTERN bool isClusterSafe(); -/** Return true in a clustered broker */ -QPID_COMMON_EXTERN bool isCluster(); - /** * Base class for classes that encapsulate state which is replicated * to all members of a cluster. Acts as a marker for clustered state diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index 3b3b232671..405718f12b 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -107,7 +107,6 @@ set(unit_tests_to_build MessagingSessionTests SequenceSet StringUtils - IncompleteMessageList RangeSet AtomicValue QueueTest @@ -119,6 +118,7 @@ set(unit_tests_to_build MessageTest QueueRegistryTest QueuePolicyTest + QueueFlowLimitTest FramingTest HeaderTest SequenceNumberTest diff --git a/cpp/src/tests/IncompleteMessageList.cpp b/cpp/src/tests/IncompleteMessageList.cpp deleted file mode 100644 index 10782572e5..0000000000 --- a/cpp/src/tests/IncompleteMessageList.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <iostream> -#include <sstream> -#include "qpid/broker/Message.h" -#include "qpid/broker/NullMessageStore.h" -#include "qpid/broker/Queue.h" -#include "qpid/broker/IncompleteMessageList.h" - -#include "unit_test.h" - -namespace qpid { -namespace tests { - -QPID_AUTO_TEST_SUITE(IncompleteMessageListTestSuite) - -using namespace qpid::broker; -using namespace qpid::framing; - -struct Checker -{ - std::list<SequenceNumber> ids; - - Checker() { } - - Checker(uint start, uint end) { - for (uint i = start; i <= end; i++) { - ids.push_back(i); - } - } - - Checker& expect(const SequenceNumber& id) { - ids.push_back(id); - return *this; - } - - void operator()(boost::intrusive_ptr<Message> msg) { - BOOST_CHECK(!ids.empty()); - BOOST_CHECK_EQUAL(msg->getCommandId(), ids.front()); - ids.pop_front(); - } -}; - -QPID_AUTO_TEST_CASE(testProcessSimple) -{ - IncompleteMessageList list; - SequenceNumber counter(1); - //fill up list with messages - for (int i = 0; i < 5; i++) { - boost::intrusive_ptr<Message> msg(new Message(counter++)); - list.add(msg); - } - //process and ensure they are all passed to completion listener - list.process(Checker(1, 5), false); - //process again and ensure none are resent to listener - list.process(Checker(), false); -} - -QPID_AUTO_TEST_CASE(testProcessWithIncomplete) -{ - Queue::shared_ptr queue; - IncompleteMessageList list; - SequenceNumber counter(1); - boost::intrusive_ptr<Message> middle; - //fill up list with messages - for (int i = 0; i < 5; i++) { - boost::intrusive_ptr<Message> msg(new Message(counter++)); - list.add(msg); - if (i == 2) { - //mark a message in the middle as incomplete - msg->enqueueAsync(queue, 0); - middle = msg; - } - } - //process and ensure only message upto incomplete message are passed to listener - list.process(Checker(1, 2), false); - //mark message complete and re-process to get remaining messages sent to listener - middle->enqueueComplete(); - list.process(Checker(3, 5), false); -} - - -struct MockStore : public NullMessageStore -{ - Queue::shared_ptr queue; - boost::intrusive_ptr<Message> msg; - - void flush(const qpid::broker::PersistableQueue& q) { - BOOST_CHECK_EQUAL(queue.get(), &q); - msg->enqueueComplete(); - } -}; - -QPID_AUTO_TEST_CASE(testSyncProcessWithIncomplete) -{ - IncompleteMessageList list; - SequenceNumber counter(1); - MockStore store; - store.queue = Queue::shared_ptr(new Queue("mock-queue", false, &store)); - //fill up list with messages - for (int i = 0; i < 5; i++) { - boost::intrusive_ptr<Message> msg(new Message(counter++)); - list.add(msg); - if (i == 2) { - //mark a message in the middle as incomplete - msg->enqueueAsync(store.queue, &store); - store.msg = msg; - } - } - //process with sync bit specified and ensure that all messages are passed to listener - list.process(Checker(1, 5), true); -} - -QPID_AUTO_TEST_SUITE_END() - -}} // namespace qpid::tests diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 07405bcd8f..9a1c9e51f6 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -87,7 +87,6 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ InlineVector.cpp \ SequenceSet.cpp \ StringUtils.cpp \ - IncompleteMessageList.cpp \ RangeSet.cpp \ AtomicValue.cpp \ QueueTest.cpp \ @@ -99,6 +98,7 @@ unit_test_SOURCES= unit_test.cpp unit_test.h \ MessageTest.cpp \ QueueRegistryTest.cpp \ QueuePolicyTest.cpp \ + QueueFlowLimitTest.cpp \ FramingTest.cpp \ HeaderTest.cpp \ SequenceNumberTest.cpp \ @@ -310,7 +310,9 @@ TESTS_ENVIRONMENT = \ $(srcdir)/run_test system_tests = qpid-client-test quick_perftest quick_topictest run_header_test quick_txtest -TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests run_acl_tests run_cli_tests replication_test dynamic_log_level_test +TESTS += start_broker $(system_tests) python_tests stop_broker run_federation_tests \ + run_acl_tests run_cli_tests replication_test dynamic_log_level_test \ + run_queue_flow_limit_tests EXTRA_DIST += \ run_test vg_check \ @@ -349,7 +351,8 @@ EXTRA_DIST += \ run_test.ps1 \ start_broker.ps1 \ stop_broker.ps1 \ - topictest.ps1 + topictest.ps1 \ + run_queue_flow_limit_tests check_LTLIBRARIES += libdlclose_noop.la libdlclose_noop_la_LDFLAGS = -module -rpath $(abs_builddir) diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h index a1b140d484..baca14cf4e 100644 --- a/cpp/src/tests/MessageUtils.h +++ b/cpp/src/tests/MessageUtils.h @@ -20,6 +20,7 @@ */ #include "qpid/broker/Message.h" +#include "qpid/broker/AsyncCompletion.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/Uuid.h" @@ -28,6 +29,17 @@ using namespace qpid; using namespace broker; using namespace framing; +namespace { + class DummyCompletion : public AsyncCompletion + { + public: + DummyCompletion() {} + virtual ~DummyCompletion() {} + protected: + void completed(bool) {} + }; +} + namespace qpid { namespace tests { @@ -50,6 +62,8 @@ struct MessageUtils msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); if (durable) msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setDeliveryMode(2); + boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion()); + msg->setIngressCompletion(dc); return msg; } diff --git a/cpp/src/tests/QueueFlowLimitTest.cpp b/cpp/src/tests/QueueFlowLimitTest.cpp new file mode 100644 index 0000000000..719d6ca6bd --- /dev/null +++ b/cpp/src/tests/QueueFlowLimitTest.cpp @@ -0,0 +1,461 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <sstream> +#include <deque> +#include "unit_test.h" +#include "test_tools.h" + +#include "qpid/broker/QueuePolicy.h" +#include "qpid/broker/QueueFlowLimit.h" +#include "qpid/sys/Time.h" +#include "qpid/framing/reply_exceptions.h" +#include "MessageUtils.h" +#include "BrokerFixture.h" + +using namespace qpid::broker; +using namespace qpid::framing; + +namespace qpid { +namespace tests { + +QPID_AUTO_TEST_SUITE(QueueFlowLimitTestSuite) + +namespace { + +class TestFlow : public QueueFlowLimit +{ +public: + TestFlow(uint32_t flowStopCount, uint32_t flowResumeCount, + uint64_t flowStopSize, uint64_t flowResumeSize) : + QueueFlowLimit(0, flowStopCount, flowResumeCount, flowStopSize, flowResumeSize) + {} + virtual ~TestFlow() {} + + static TestFlow *createTestFlow(const qpid::framing::FieldTable& settings) + { + FieldTable::ValuePtr v; + + v = settings.get(flowStopCountKey); + uint32_t flowStopCount = (v) ? (uint32_t)v->get<int64_t>() : 0; + v = settings.get(flowResumeCountKey); + uint32_t flowResumeCount = (v) ? (uint32_t)v->get<int64_t>() : 0; + v = settings.get(flowStopSizeKey); + uint64_t flowStopSize = (v) ? (uint64_t)v->get<int64_t>() : 0; + v = settings.get(flowResumeSizeKey); + uint64_t flowResumeSize = (v) ? (uint64_t)v->get<int64_t>() : 0; + + return new TestFlow(flowStopCount, flowResumeCount, flowStopSize, flowResumeSize); + } + + static QueueFlowLimit *getQueueFlowLimit(const qpid::framing::FieldTable& settings) + { + return QueueFlowLimit::createLimit(0, settings); + } +}; + + + +QueuedMessage createMessage(uint32_t size) +{ + QueuedMessage msg; + msg.payload = MessageUtils::createMessage(); + MessageUtils::addContent(msg.payload, std::string (size, 'x')); + return msg; +} +} + +QPID_AUTO_TEST_CASE(testFlowCount) +{ + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 7); + args.setInt(QueueFlowLimit::flowResumeCountKey, 5); + + std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); + + BOOST_CHECK_EQUAL((uint32_t) 7, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 5, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + + std::deque<QueuedMessage> msgs; + for (size_t i = 0; i < 6; i++) { + msgs.push_back(createMessage(10)); + flow->enqueued(msgs.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + BOOST_CHECK(!flow->isFlowControlActive()); // 6 on queue + msgs.push_back(createMessage(10)); + flow->enqueued(msgs.back()); + BOOST_CHECK(!flow->isFlowControlActive()); // 7 on queue + msgs.push_back(createMessage(10)); + flow->enqueued(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue, ON + msgs.push_back(createMessage(10)); + flow->enqueued(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 9 on queue, no change to flow control + + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 8 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 7 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 6 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 5 on queue, no change + + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); // 4 on queue, OFF +} + + +QPID_AUTO_TEST_CASE(testFlowSize) +{ + FieldTable args; + args.setUInt64(QueueFlowLimit::flowStopSizeKey, 70); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 50); + + std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); + + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint32_t) 70, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 50, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + + std::deque<QueuedMessage> msgs; + for (size_t i = 0; i < 6; i++) { + msgs.push_back(createMessage(10)); + flow->enqueued(msgs.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + BOOST_CHECK(!flow->isFlowControlActive()); // 60 on queue + BOOST_CHECK_EQUAL(6u, flow->getFlowCount()); + BOOST_CHECK_EQUAL(60u, flow->getFlowSize()); + + QueuedMessage msg_9 = createMessage(9); + flow->enqueued(msg_9); + BOOST_CHECK(!flow->isFlowControlActive()); // 69 on queue + QueuedMessage tinyMsg_1 = createMessage(1); + flow->enqueued(tinyMsg_1); + BOOST_CHECK(!flow->isFlowControlActive()); // 70 on queue + + QueuedMessage tinyMsg_2 = createMessage(1); + flow->enqueued(tinyMsg_2); + BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue, ON + msgs.push_back(createMessage(10)); + flow->enqueued(msgs.back()); + BOOST_CHECK(flow->isFlowControlActive()); // 81 on queue + BOOST_CHECK_EQUAL(10u, flow->getFlowCount()); + BOOST_CHECK_EQUAL(81u, flow->getFlowSize()); + + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 71 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 61 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // 51 on queue + + flow->dequeued(tinyMsg_1); + BOOST_CHECK(flow->isFlowControlActive()); // 50 on queue + flow->dequeued(tinyMsg_2); + BOOST_CHECK(!flow->isFlowControlActive()); // 49 on queue, OFF + + flow->dequeued(msg_9); + BOOST_CHECK(!flow->isFlowControlActive()); // 40 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); // 30 on queue + flow->dequeued(msgs.front()); + msgs.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); // 20 on queue + BOOST_CHECK_EQUAL(2u, flow->getFlowCount()); + BOOST_CHECK_EQUAL(20u, flow->getFlowSize()); +} + +QPID_AUTO_TEST_CASE(testFlowArgs) +{ + FieldTable args; + const uint64_t stop(0x2FFFFFFFF); + const uint64_t resume(0x1FFFFFFFF); + args.setInt(QueueFlowLimit::flowStopCountKey, 30); + args.setInt(QueueFlowLimit::flowResumeCountKey, 21); + args.setUInt64(QueueFlowLimit::flowStopSizeKey, stop); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, resume); + + std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); + + BOOST_CHECK_EQUAL((uint32_t) 30, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 21, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL(stop, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL(resume, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowCombo) +{ + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 10); + args.setInt(QueueFlowLimit::flowResumeCountKey, 5); + args.setUInt64(QueueFlowLimit::flowStopSizeKey, 200); + args.setUInt64(QueueFlowLimit::flowResumeSizeKey, 100); + + std::deque<QueuedMessage> msgs_1; + std::deque<QueuedMessage> msgs_10; + std::deque<QueuedMessage> msgs_50; + std::deque<QueuedMessage> msgs_100; + + QueuedMessage msg; + + std::auto_ptr<TestFlow> flow(TestFlow::createTestFlow(args)); + BOOST_CHECK(!flow->isFlowControlActive()); // count:0 size:0 + + // verify flow control comes ON when only count passes its stop point. + + for (size_t i = 0; i < 10; i++) { + msgs_10.push_back(createMessage(10)); + flow->enqueued(msgs_10.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + // count:10 size:100 + + msgs_1.push_back(createMessage(1)); + flow->enqueued(msgs_1.back()); // count:11 size: 101 ->ON + BOOST_CHECK(flow->isFlowControlActive()); + + for (size_t i = 0; i < 6; i++) { + flow->dequeued(msgs_10.front()); + msgs_10.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + } + // count:5 size: 41 + + flow->dequeued(msgs_1.front()); // count: 4 size: 40 ->OFF + msgs_1.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); + + for (size_t i = 0; i < 4; i++) { + flow->dequeued(msgs_10.front()); + msgs_10.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); + } + // count:0 size:0 + + // verify flow control comes ON when only size passes its stop point. + + msgs_100.push_back(createMessage(100)); + flow->enqueued(msgs_100.back()); // count:1 size: 100 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_50.push_back(createMessage(50)); + flow->enqueued(msgs_50.back()); // count:2 size: 150 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_50.push_back(createMessage(50)); + flow->enqueued(msgs_50.back()); // count:3 size: 200 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_1.push_back(createMessage(1)); + flow->enqueued(msgs_1.back()); // count:4 size: 201 ->ON + BOOST_CHECK(flow->isFlowControlActive()); + + flow->dequeued(msgs_100.front()); // count:3 size:101 + msgs_100.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + + flow->dequeued(msgs_1.front()); // count:2 size:100 + msgs_1.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + + flow->dequeued(msgs_50.front()); // count:1 size:50 ->OFF + msgs_50.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); + + // verify flow control remains ON until both thresholds drop below their + // resume point. + + for (size_t i = 0; i < 8; i++) { + msgs_10.push_back(createMessage(10)); + flow->enqueued(msgs_10.back()); + BOOST_CHECK(!flow->isFlowControlActive()); + } + // count:9 size:130 + + msgs_10.push_back(createMessage(10)); + flow->enqueued(msgs_10.back()); // count:10 size: 140 + BOOST_CHECK(!flow->isFlowControlActive()); + + msgs_1.push_back(createMessage(1)); + flow->enqueued(msgs_1.back()); // count:11 size: 141 ->ON + BOOST_CHECK(flow->isFlowControlActive()); + + msgs_100.push_back(createMessage(100)); + flow->enqueued(msgs_100.back()); // count:12 size: 241 (both thresholds crossed) + BOOST_CHECK(flow->isFlowControlActive()); + + // at this point: 9@10 + 1@50 + 1@100 + 1@1 == 12@241 + + flow->dequeued(msgs_50.front()); // count:11 size:191 + msgs_50.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + + for (size_t i = 0; i < 9; i++) { + flow->dequeued(msgs_10.front()); + msgs_10.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); + } + // count:2 size:101 + flow->dequeued(msgs_1.front()); // count:1 size:100 + msgs_1.pop_front(); + BOOST_CHECK(flow->isFlowControlActive()); // still active due to size + + flow->dequeued(msgs_100.front()); // count:0 size:0 ->OFF + msgs_100.pop_front(); + BOOST_CHECK(!flow->isFlowControlActive()); +} + + +QPID_AUTO_TEST_CASE(testFlowDefaultArgs) +{ + QueueFlowLimit::setDefaults(2950001, // max queue byte count + 80, // 80% stop threshold + 70); // 70% resume threshold + FieldTable args; + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + + BOOST_CHECK(ptr); + std::auto_ptr<QueueFlowLimit> flow(ptr); + BOOST_CHECK_EQUAL((uint64_t) 2360001, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 2065000, flow->getFlowResumeSize()); + BOOST_CHECK_EQUAL( 0u, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL( 0u, flow->getFlowResumeCount()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowOverrideArgs) +{ + QueueFlowLimit::setDefaults(2950001, // max queue byte count + 80, // 80% stop threshold + 70); // 70% resume threshold + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 35000); + args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); + + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr<QueueFlowLimit> flow(ptr); + + BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 0, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + } + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); + args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); + + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr<QueueFlowLimit> flow(ptr); + + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 0, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + } + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 35000); + args.setInt(QueueFlowLimit::flowResumeCountKey, 30000); + args.setInt(QueueFlowLimit::flowStopSizeKey, 350000); + args.setInt(QueueFlowLimit::flowResumeSizeKey, 300000); + + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr<QueueFlowLimit> flow(ptr); + + BOOST_CHECK_EQUAL((uint32_t) 35000, flow->getFlowStopCount()); + BOOST_CHECK_EQUAL((uint32_t) 30000, flow->getFlowResumeCount()); + BOOST_CHECK_EQUAL((uint64_t) 350000, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint64_t) 300000, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); + } +} + + +QPID_AUTO_TEST_CASE(testFlowOverrideDefaults) +{ + QueueFlowLimit::setDefaults(2950001, // max queue byte count + 97, // stop threshold + 73); // resume threshold + FieldTable args; + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(ptr); + std::auto_ptr<QueueFlowLimit> flow(ptr); + + BOOST_CHECK_EQUAL((uint32_t) 2861501, flow->getFlowStopSize()); + BOOST_CHECK_EQUAL((uint32_t) 2153500, flow->getFlowResumeSize()); + BOOST_CHECK(!flow->isFlowControlActive()); + BOOST_CHECK(flow->monitorFlowControl()); +} + + +QPID_AUTO_TEST_CASE(testFlowDisable) +{ + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopCountKey, 0); + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(!ptr); + } + { + FieldTable args; + args.setInt(QueueFlowLimit::flowStopSizeKey, 0); + QueueFlowLimit *ptr = TestFlow::getQueueFlowLimit(args); + BOOST_CHECK(!ptr); + } +} + + +QPID_AUTO_TEST_SUITE_END() + +}} // namespace qpid::tests diff --git a/cpp/src/tests/QueuePolicyTest.cpp b/cpp/src/tests/QueuePolicyTest.cpp index 90af9c7dd9..f9c058c771 100644 --- a/cpp/src/tests/QueuePolicyTest.cpp +++ b/cpp/src/tests/QueuePolicyTest.cpp @@ -23,6 +23,7 @@ #include "test_tools.h" #include "qpid/broker/QueuePolicy.h" +#include "qpid/broker/QueueFlowLimit.h" #include "qpid/client/QueueOptions.h" #include "qpid/sys/Time.h" #include "qpid/framing/reply_exceptions.h" @@ -38,6 +39,7 @@ namespace tests { QPID_AUTO_TEST_SUITE(QueuePolicyTestSuite) +namespace { QueuedMessage createMessage(uint32_t size) { QueuedMessage msg; @@ -45,7 +47,7 @@ QueuedMessage createMessage(uint32_t size) MessageUtils::addContent(msg.payload, std::string (size, 'x')); return msg; } - +} QPID_AUTO_TEST_CASE(testCount) { @@ -340,6 +342,8 @@ QPID_AUTO_TEST_CASE(testFlowToDiskWithNoStore) //fallback to rejecting messages QueueOptions args; args.setSizePolicy(FLOW_TO_DISK, 0, 5); + // Disable flow control, or else we'll never hit the max limit + args.setInt(QueueFlowLimit::flowStopCountKey, 0); ProxySessionFixture f; std::string q("my-queue"); diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp index 57b344498e..e4e9897195 100644 --- a/cpp/src/tests/QueueTest.cpp +++ b/cpp/src/tests/QueueTest.cpp @@ -36,6 +36,9 @@ #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/broker/QueuePolicy.h" +#include "qpid/broker/QueueFlowLimit.h" + #include <iostream> #include "boost/format.hpp" @@ -85,6 +88,8 @@ intrusive_ptr<Message> create_message(std::string exchange, std::string routingK msg->getFrames().append(method); msg->getFrames().append(header); msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + boost::shared_ptr<AsyncCompletion>dc(new DummyCompletion()); + msg->setIngressCompletion(dc); return msg; } @@ -508,6 +513,8 @@ QPID_AUTO_TEST_CASE(testLVQAcquire){ client::QueueOptions args; // set queue mode args.setOrdering(client::LVQ); + // disable flow control, as this test violates the enqueue/dequeue sequence. + args.setInt(QueueFlowLimit::flowStopCountKey, 0); Queue::shared_ptr queue(new Queue("my-queue", true )); queue->configure(args); diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp index 6b44d95baa..210abf0a5b 100644 --- a/cpp/src/tests/TxPublishTest.cpp +++ b/cpp/src/tests/TxPublishTest.cpp @@ -74,7 +74,7 @@ QPID_AUTO_TEST_CASE(testPrepare) BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[0].second); BOOST_CHECK_EQUAL(string("queue2"), t.store.enqueued[1].first); BOOST_CHECK_EQUAL(pmsg, t.store.enqueued[1].second); - BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isEnqueueComplete()); + BOOST_CHECK_EQUAL( true, ( boost::static_pointer_cast<PersistableMessage>(t.msg))->isIngressComplete()); } QPID_AUTO_TEST_CASE(testCommit) @@ -87,7 +87,7 @@ QPID_AUTO_TEST_CASE(testCommit) BOOST_CHECK_EQUAL((uint32_t) 1, t.queue1->getMessageCount()); intrusive_ptr<Message> msg_dequeue = t.queue1->get().payload; - BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete()); + BOOST_CHECK_EQUAL( true, (boost::static_pointer_cast<PersistableMessage>(msg_dequeue))->isIngressComplete()); BOOST_CHECK_EQUAL(t.msg, msg_dequeue); BOOST_CHECK_EQUAL((uint32_t) 1, t.queue2->getMessageCount()); diff --git a/cpp/src/tests/brokertest.py b/cpp/src/tests/brokertest.py index 98f58ebfdd..6e771bf5d6 100644 --- a/cpp/src/tests/brokertest.py +++ b/cpp/src/tests/brokertest.py @@ -29,6 +29,7 @@ from unittest import TestCase from copy import copy from threading import Thread, Lock, Condition from logging import getLogger +import qmf.console log = getLogger("qpid.brokertest") @@ -327,6 +328,10 @@ class Broker(Popen): log.debug("Started broker %s (%s, %s)" % (self.name, self.pname, self.log)) self._log_ready = False + def startQmf(self, handler=None): + self.qmf_session = qmf.console.Session(handler) + self.qmf_broker = self.qmf_session.addBroker("%s:%s" % (self.host(), self.port())) + def host(self): return self._host def port(self): diff --git a/cpp/src/tests/cluster_tests.py b/cpp/src/tests/cluster_tests.py index cbad4010b4..3e13a3ce8a 100755 --- a/cpp/src/tests/cluster_tests.py +++ b/cpp/src/tests/cluster_tests.py @@ -23,7 +23,7 @@ from qpid import datatypes, messaging from brokertest import * from qpid.harness import Skipped from qpid.messaging import Message, Empty -from threading import Thread, Lock +from threading import Thread, Lock, Condition from logging import getLogger from itertools import chain from tempfile import NamedTemporaryFile @@ -304,6 +304,113 @@ acl allow all all # Verify logs are consistent cluster_test_logs.verify_logs() + class BlockedSend(Thread): + """Send a message, send is expected to block. + Verify that it does block (for a given timeout), then allow + waiting till it unblocks when it is expected to do so.""" + def __init__(self, sender, msg): + self.sender, self.msg = sender, msg + self.blocked = True + self.condition = Condition() + self.timeout = 0.1 # Time to wait for expected results. + Thread.__init__(self) + def run(self): + try: + self.sender.send(self.msg) + self.condition.acquire() + try: + self.blocked = False + self.condition.notify() + finally: self.condition.release() + except Exception,e: print "BlockedSend exception: %s"%e + def start(self): + Thread.start(self) + time.sleep(self.timeout) + assert self.blocked # Expected to block + def assert_blocked(self): assert self.blocked + def wait(self): # Now expecting to unblock + self.condition.acquire() + try: + while self.blocked: + self.condition.wait(self.timeout) + if self.blocked: raise Exception("Timed out waiting for send to unblock") + finally: self.condition.release() + self.join() + + def queue_flowlimit_test(self, brokers): + """Verify that the queue's flowlimit configuration and state are + correctly replicated. + The brokers argument allows this test to run on single broker, + cluster of 2 pre-startd brokers or cluster where second broker + starts after queue is in flow control. + """ + # configure a queue with a specific flow limit on first broker + ssn0 = brokers.first().connect().session() + s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}") + brokers.first().startQmf() + q = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0] + oid = q.getObjectId() + self.assertEqual(q.name, "flq") + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert not q.flowStopped + + # fill the queue on one broker until flow control is active + for x in range(5): s0.send(Message(str(x))) + sender = ShortTests.BlockedSend(s0, Message(str(6))) + sender.start() # Tests that sender does block + # Verify the broker queue goes into a flowStopped state + deadline = time.time() + 1 + while not q.flowStopped and time.time() < deadline: q.update() + assert q.flowStopped + sender.assert_blocked() # Still blocked + + # Now verify the both brokers in cluster have same configuration + brokers.second().startQmf() + qs = brokers.second().qmf_session.getObjects(_objectId=oid) + self.assertEqual(len(qs), 1) + q = qs[0] + self.assertEqual(q.name, "flq") + self.assertEqual(q.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L}) + assert q.flowStopped + + # now drain the queue using a session to the other broker + ssn1 = brokers.second().connect().session() + r1 = ssn1.receiver("flq", capacity=6) + for x in range(4): + r1.fetch(timeout=0) + ssn1.acknowledge() + sender.wait() # Verify no longer blocked. + + ssn0.connection.close() + ssn1.connection.close() + cluster_test_logs.verify_logs() + + def test_queue_flowlimit(self): + """Test flow limits on a standalone broker""" + broker = self.broker() + class Brokers: + def first(self): return broker + def second(self): return broker + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 + cluster = self.cluster(2) + class Brokers: + def first(self): return cluster[0] + def second(self): return cluster[1] + self.queue_flowlimit_test(Brokers()) + + def test_queue_flowlimit_cluster_join(self): + return # TODO aconway 2011-02-18: disabled till fixed, QPID-2935 + cluster = self.cluster(1) + class Brokers: + def first(self): return cluster[0] + def second(self): + if len(cluster) == 1: cluster.start() + return cluster[1] + self.queue_flowlimit_test(Brokers()) + class LongTests(BrokerTest): """Tests that can run for a long time if -DDURATION=<minutes> is set""" def duration(self): diff --git a/cpp/src/tests/queue_flow_limit_tests.py b/cpp/src/tests/queue_flow_limit_tests.py new file mode 100644 index 0000000000..27fbe2afcc --- /dev/null +++ b/cpp/src/tests/queue_flow_limit_tests.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from qpid.testlib import TestBase010 +from qpid import datatypes, messaging +from qpid.messaging import Message, Empty +from threading import Thread, Lock +from logging import getLogger +from time import sleep +from subprocess import Popen, PIPE +from os import environ + +class QueueFlowLimitTests(TestBase010): + + def _create_queue(self, name, + stop_count=None, resume_count=None, + stop_size=None, resume_size=None): + """ Create a queue with the given flow settings via the queue.declare + command. + """ + args={} + if (stop_count is not None): + args["qpid.flow_stop_count"] = stop_count; + if (resume_count is not None): + args["qpid.flow_resume_count"] = resume_count; + if (stop_size is not None): + args["qpid.flow_stop_size"] = stop_size; + if (resume_size is not None): + args["qpid.flow_resume_size"] = resume_size; + + self.session.queue_declare(queue=name, arguments=args) + + qs = self.qmf.getObjects(_class="queue") + for i in qs: + if i.name == name: + # verify flow settings + if (stop_count is not None): + self.assertEqual(i.arguments.get("qpid.flow_stop_count"), stop_count) + if (resume_count is not None): + self.assertEqual(i.arguments.get("qpid.flow_resume_count"), resume_count) + if (stop_size is not None): + self.assertEqual(i.arguments.get("qpid.flow_stop_size"), stop_size) + if (resume_size is not None): + self.assertEqual(i.arguments.get("qpid.flow_resume_size"), resume_size) + self.assertFalse(i.flowStopped) + return i.getObjectId() + self.fail("Unable to create queue '%s'" % name) + return None + + + def _delete_queue(self, name): + """ Delete a named queue + """ + self.session.queue_delete(queue=name) + + + def _start_qpid_send(self, queue, count, content="X", capacity=10): + """ Use the qpid-send client to generate traffic to a queue. + """ + command = ["qpid-send", + "-b", "%s:%s" % (self.broker.host, self.broker.port), + "-a", str(queue), + "--messages", str(count), + "--content-string", str(content), + "--capacity", str(capacity) + ] + + return Popen(command, stdout=PIPE) + + def _start_qpid_receive(self, queue, count, timeout=5): + """ Use the qpid-receive client to consume from a queue. + Note well: prints one line of text to stdout for each consumed msg. + """ + command = ["qpid-receive", + "-b", "%s:%s" % (self.broker.host, self.broker.port), + "-a", str(queue), + "--messages", str(count), + "--timeout", str(timeout), + "--print-content", "yes" + ] + return Popen(command, stdout=PIPE) + + + + def test_qpid_config_cmd(self): + """ Test the qpid-config command's ability to configure a queue's flow + control thresholds. + """ + tool = environ.get("QPID_CONFIG_EXEC") + if tool: + command = [tool, + "--broker-addr=%s:%s" % (self.broker.host, self.broker.port), + "add", "queue", "test01", + "--flow-stop-count=999", + "--flow-resume-count=55", + "--flow-stop-size=5000000", + "--flow-resume-size=100000"] + #cmd = Popen(command, stdout=PIPE) + cmd = Popen(command) + cmd.wait() + self.assertEqual(cmd.returncode, 0) + + # now verify the settings + self.startQmf(); + qs = self.qmf.getObjects(_class="queue") + for i in qs: + if i.name == "test01": + self.assertEqual(i.arguments.get("qpid.flow_stop_count"), 999) + self.assertEqual(i.arguments.get("qpid.flow_resume_count"), 55) + self.assertEqual(i.arguments.get("qpid.flow_stop_size"), 5000000) + self.assertEqual(i.arguments.get("qpid.flow_resume_size"), 100000) + self.assertFalse(i.flowStopped) + break; + self.assertEqual(i.name, "test01") + self._delete_queue("test01") + + + def test_flow_count(self): + """ Create a queue with count-based flow limit. Spawn several + producers which will exceed the limit. Verify limit exceeded. Consume + all messages. Verify flow control released. + """ + self.startQmf(); + oid = self._create_queue("test-q", stop_count=373, resume_count=229) + + sndr1 = self._start_qpid_send("test-q", count=1213, content="XXX", capacity=50); + sndr2 = self._start_qpid_send("test-q", count=797, content="Y", capacity=13); + sndr3 = self._start_qpid_send("test-q", count=331, content="ZZZZZ", capacity=149); + totalMsgs = 1213 + 797 + 331 + + + # wait until flow control is active + count = 0 + while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + count < 10: + sleep(1); + count += 1; + self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + self.assertGreater(depth, 373) + + # now wait until the enqueues stop happening - ensure that + # not all msgs have been sent (senders are blocked) + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + while depth != newDepth: + depth = newDepth; + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + self.assertGreater(totalMsgs, depth) + + # drain the queue + rcvr = self._start_qpid_receive("test-q", + count=totalMsgs) + count = 0; + x = rcvr.stdout.readline() # prints a line for each received msg + while x: + count += 1; + x = rcvr.stdout.readline() + + sndr1.wait(); + sndr2.wait(); + sndr3.wait(); + rcvr.wait(); + + self.assertEqual(count, totalMsgs) + self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + + self._delete_queue("test-q") + + + def test_flow_size(self): + """ Create a queue with size-based flow limit. Spawn several + producers which will exceed the limit. Verify limit exceeded. Consume + all messages. Verify flow control released. + """ + self.startQmf(); + oid = self._create_queue("test-q", stop_size=351133, resume_size=251143) + + sndr1 = self._start_qpid_send("test-q", count=1699, content="X"*439, capacity=53); + sndr2 = self._start_qpid_send("test-q", count=1129, content="Y"*631, capacity=13); + sndr3 = self._start_qpid_send("test-q", count=881, content="Z"*823, capacity=149); + totalMsgs = 1699 + 1129 + 881 + totalBytes = 439 + 631 + 823 + + # wait until flow control is active + count = 0 + while self.qmf.getObjects(_objectId=oid)[0].flowStopped == False and \ + count < 10: + sleep(1); + count += 1; + self.assertTrue(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + self.assertGreater(self.qmf.getObjects(_objectId=oid)[0].byteDepth, 351133) + + # now wait until the enqueues stop happening - ensure that + # not all msgs have been sent (senders are blocked) + depth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + while depth != newDepth: + depth = newDepth; + sleep(1) + newDepth = self.qmf.getObjects(_objectId=oid)[0].msgDepth + self.assertGreater(totalMsgs, depth) + + # drain the queue + rcvr = self._start_qpid_receive("test-q", + count=totalMsgs) + count = 0; + x = rcvr.stdout.readline() # prints a line for each received msg + while x: + count += 1; + x = rcvr.stdout.readline() + + sndr1.wait(); + sndr2.wait(); + sndr3.wait(); + rcvr.wait(); + + self.assertEqual(count, totalMsgs) + self.assertFalse(self.qmf.getObjects(_objectId=oid)[0].flowStopped) + + self._delete_queue("test-q") + + + + diff --git a/cpp/src/tests/run_queue_flow_limit_tests b/cpp/src/tests/run_queue_flow_limit_tests new file mode 100755 index 0000000000..9f2f093353 --- /dev/null +++ b/cpp/src/tests/run_queue_flow_limit_tests @@ -0,0 +1,55 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Run tests against Queue producer flow control. + +source ./test_env.sh +test -d $PYTHON_DIR || { echo "Skipping queue flow control tests, no python dir."; exit 0; } + +LOG_FILE=queue_flow_limit_test.log +PORT="" + +trap stop_broker INT TERM QUIT + +error() { + echo $* + exit 1; +} + +start_broker() { + rm -rf $LOG_FILE + PORT=$($QPIDD_EXEC --auth=no --no-module-dir --daemon --port=0 -t --log-to-file $LOG_FILE) || error "Could not start broker" +} + +stop_broker() { + test -n "$PORT" && $QPIDD_EXEC --no-module-dir --quit --port $PORT +} + +start_broker +echo "Running Queue flow limit tests using broker on port $PORT" +$QPID_PYTHON_TEST -m queue_flow_limit_tests $SKIPTESTS -b localhost:$PORT $@ +RETCODE=$? +stop_broker +if test x$RETCODE != x0; then + echo "FAIL queue flow limit tests"; exit 1; +fi +rm -rf $LOG_FILE + diff --git a/specs/management-schema.xml b/specs/management-schema.xml index 277534250e..e52e3f23cf 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -175,6 +175,7 @@ <statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/> <statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> <statistic name="messageLatency" type="mmaTime" unit="nanosecond" desc="Broker latency through this queue"/> + <statistic name="flowStopped" type="bool" desc="Flow control active."/> <method name="purge" desc="Discard all or some messages on a queue"> <arg name="request" dir="I" type="uint32" desc="0 for all messages or n>0 for n messages"/> diff --git a/tools/src/py/qpid-config b/tools/src/py/qpid-config index 04b31e98ed..9ff405541c 100755 --- a/tools/src/py/qpid-config +++ b/tools/src/py/qpid-config @@ -97,6 +97,10 @@ class Config: self._eventGeneration = None self._file = None self._sasl_mechanism = None + self._flowStopCount = None + self._flowResumeCount = None + self._flowStopSize = None + self._flowResumeSize = None config = Config() @@ -111,6 +115,10 @@ LVQNB = "qpid.last_value_queue_no_browse" MSG_SEQUENCE = "qpid.msg_sequence" IVE = "qpid.ive" QUEUE_EVENT_GENERATION = "qpid.queue_event_generation" +FLOW_STOP_COUNT = "qpid.flow_stop_count" +FLOW_RESUME_COUNT = "qpid.flow_resume_count" +FLOW_STOP_SIZE = "qpid.flow_stop_size" +FLOW_RESUME_SIZE = "qpid.flow_resume_size" class JHelpFormatter(IndentedHelpFormatter): """Format usage and description without stripping newlines from usage strings @@ -162,6 +170,14 @@ def OptionsAndArguments(argv): group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached") group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy") group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.") + group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>", + help="Turn on sender flow control when the number of queued bytes exceeds this value.") + group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>", + help="Turn off sender flow control when the number of queued bytes drops below this value.") + group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>", + help="Turn on sender flow control when the number of queued messages exceeds this value.") + group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>", + help="Turn off sender flow control when the number of queued messages drops below this value.") # no option for declaring an exclusive queue - which can only be used by the session that creates it. parser.add_option_group(group3) @@ -233,6 +249,14 @@ def OptionsAndArguments(argv): config._if_unused = False if opts.sasl_mechanism: config._sasl_mechanism = opts.sasl_mechanism + if opts.flow_stop_size: + config._flowStopSize = opts.flow_stop_size + if opts.flow_resume_size: + config._flowResumeSize = opts.flow_resume_size + if opts.flow_stop_count: + config._flowStopCount = opts.flow_stop_count + if opts.flow_resume_count: + config._flowResumeCount = opts.flow_resume_count return args @@ -391,6 +415,10 @@ class BrokerManager: if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION], if q.altExchange: print "--alternate-exchange=%s" % q._altExchange_.name, + if FLOW_STOP_SIZE in args: print "--flow-stop-size=%d" % args[FLOW_STOP_SIZE], + if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%d" % args[FLOW_RESUME_SIZE], + if FLOW_STOP_COUNT in args: print "--flow-stop-count=%d" % args[FLOW_STOP_COUNT], + if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%d" % args[FLOW_RESUME_COUNT], print def QueueListRecurse(self, filter): @@ -468,11 +496,21 @@ class BrokerManager: if config._eventGeneration: declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration + if config._flowStopSize: + declArgs[FLOW_STOP_SIZE] = config._flowStopSize + if config._flowResumeSize: + declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize + if config._flowStopCount: + declArgs[FLOW_STOP_COUNT] = config._flowStopCount + if config._flowResumeCount: + declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount + if config._altern_ex != None: self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs) else: self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs) + def DelQueue(self, args): if len(args) < 1: Usage() |