diff options
27 files changed, 1976 insertions, 278 deletions
diff --git a/cpp/include/qpid/sys/MemStat.h b/cpp/include/qpid/sys/MemStat.h new file mode 100644 index 0000000000..d855786cd5 --- /dev/null +++ b/cpp/include/qpid/sys/MemStat.h @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef sys_MemStat +#define sys_MemStat + +#include "qpid/CommonImportExport.h" +#include "qmf/org/apache/qpid/broker/Memory.h" + +namespace qpid { +namespace sys { + + class QPID_COMMON_CLASS_EXTERN MemStat { + public: + QPID_COMMON_EXTERN static void loadMemInfo(qmf::org::apache::qpid::broker::Memory* object); + }; + +}} + +#endif + diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index 41d3cec1f6..1a84f5e79a 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -669,6 +669,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/SystemInfo.cpp qpid/sys/windows/Thread.cpp qpid/sys/windows/Time.cpp + qpid/sys/windows/MemStat.cpp qpid/client/windows/SaslFactory.cpp ${sslcommon_windows_SOURCES} ) @@ -740,6 +741,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/FileSysDir.cpp qpid/sys/posix/IOHandle.cpp qpid/sys/posix/LockFile.cpp + qpid/sys/posix/MemStat.cpp qpid/sys/posix/Mutex.cpp qpid/sys/posix/PipeHandle.cpp qpid/sys/posix/PollableCondition.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index fb26251da0..9533e37565 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -53,6 +53,7 @@ windows_dist = \ ../include/qpid/sys/windows/Time.h \ qpid/sys/windows/uuid.cpp \ qpid/sys/windows/uuid.h \ + qpid/sys/windows/MemStat.cpp \ windows/QpiddBroker.cpp \ windows/SCM.h \ windows/SCM.cpp \ @@ -163,6 +164,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/posix/Time.cpp \ qpid/sys/posix/Thread.cpp \ qpid/sys/posix/Shlib.cpp \ + qpid/sys/posix/MemStat.cpp \ qpid/sys/posix/Mutex.cpp \ qpid/sys/posix/Fork.cpp \ qpid/sys/posix/StrError.cpp \ diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 89532ae256..ff6da087c3 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -194,6 +194,7 @@ Broker::Broker(const Broker::Options& conf) : conf.replayFlushLimit*1024, // convert kb to bytes. conf.replayHardLimit*1024), *this), + mgmtObject(0), queueCleaner(queues, &timer), queueEvents(poller,!conf.asyncQueueEvents), recovery(true), diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 0b8fe95d5e..adc145dc84 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -142,6 +142,7 @@ void DeliveryRecord::reject() //just drop it QPID_LOG(info, "Dropping rejected message from " << queue->getName()); } + queue->countRejected(); dequeue(); setEnded(); } diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 5d763bf0da..ecaa492903 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -142,6 +142,8 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b) //QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found"); mgmtExchange->inc_msgDrops (); mgmtExchange->inc_byteDrops (msg.contentSize ()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsNoRoute(); } else { @@ -161,7 +163,7 @@ void Exchange::routeIVE(){ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : name(_name), durable(false), persistenceId(0), sequence(false), - sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) + sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { @@ -172,6 +174,8 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : mgmtExchange->set_durable(durable); mgmtExchange->set_autoDelete(false); agent->addObject(mgmtExchange, 0, durable); + if (broker) + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); } } } @@ -179,7 +183,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent, Broker* b) : name(_name), durable(_durable), alternateUsers(0), persistenceId(0), - args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false) + args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false) { if (parent != 0 && broker != 0) { @@ -191,6 +195,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel mgmtExchange->set_autoDelete(false); mgmtExchange->set_arguments(ManagementAgent::toMap(args)); agent->addObject(mgmtExchange, 0, durable); + if (broker) + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); } } diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index b12af9a1dd..9179dd5c7c 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -32,6 +32,7 @@ #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Exchange.h" #include "qmf/org/apache/qpid/broker/Binding.h" +#include "qmf/org/apache/qpid/broker/Broker.h" namespace qpid { namespace broker { @@ -158,6 +159,7 @@ protected: }; qmf::org::apache::qpid::broker::Exchange* mgmtExchange; + qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; public: typedef boost::shared_ptr<Exchange> shared_ptr; diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 4bda70d313..142c23f276 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -200,6 +200,8 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons mgmtExchange->inc_byteReceives(msg.contentSize()); mgmtExchange->inc_msgDrops(); mgmtExchange->inc_byteDrops(msg.contentSize()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsNoRoute(); } return; } diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index d13109dad1..ae4503328a 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -270,6 +270,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16 morecontent = getContentFrame(queue, frame, maxContentSize, offset); out.handle(frame); } + queue.countLoadedFromDisk(contentSize()); } else { Count c; frames.map_if(c, TypeFilter<CONTENT_BODY>()); diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index dd23760922..969d510e26 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -109,6 +109,7 @@ Queue::Queue(const string& _name, bool _autodelete, persistenceId(0), policyExceeded(false), mgmtObject(0), + brokerMgmtObject(0), eventMode(0), insertSeqNo(0), broker(b), @@ -123,14 +124,20 @@ Queue::Queue(const string& _name, bool _autodelete, if (agent != 0) { mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0); agent->addObject(mgmtObject, 0, store != 0); + brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject(); + if (brokerMgmtObject) + brokerMgmtObject->inc_queueCount(); } } } Queue::~Queue() { - if (mgmtObject != 0) + if (mgmtObject != 0) { mgmtObject->resourceDestroy(); + if (brokerMgmtObject) + brokerMgmtObject->dec_queueCount(); + } } bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg) @@ -204,6 +211,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){ if (mgmtObject != 0){ mgmtObject->inc_msgTxnEnqueues (); mgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgTxnEnqueues (); + brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ()); + } } } @@ -221,7 +232,13 @@ void Queue::requeue(const QueuedMessage& msg){ if (alternateExchange.get()) { DeliverableMessage dmsg(msg.payload); alternateExchange->routeWithAlternate(dmsg); + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); } + mgntDeqStats(msg.payload); } else { messages->reinsert(msg); listeners.populate(copy); @@ -234,8 +251,8 @@ void Queue::requeue(const QueuedMessage& msg){ enqueue(0, payload); } } + observeRequeue(msg, locker); } - observeRequeue(msg, locker); } copy.notify(); } @@ -323,6 +340,11 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_ c->setPosition(msg.position); acquire( msg.position, msg, locker); dequeue( 0, msg ); + if (mgmtObject) { + mgmtObject->inc_discardsTtl(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(); + } continue; } @@ -504,6 +526,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse) messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1)); } + // + // Report the count of discarded-by-ttl messages + // + if (mgmtObject && !expired.empty()) { + mgmtObject->inc_discardsTtl(expired.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsTtl(expired.size()); + } + for (std::deque<QueuedMessage>::const_iterator i = expired.begin(); i != expired.end(); ++i) { { @@ -638,6 +669,19 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange> Mutex::ScopedLock locker(messageLock); messages->removeIf( boost::bind<bool>(boost::ref(c), _1) ); + + if (mgmtObject && !c.matches.empty()) { + if (dest.get()) { + mgmtObject->inc_reroutes(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_reroutes(c.matches.size()); + } else { + mgmtObject->inc_discardsPurge(c.matches.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsPurge(c.matches.size()); + } + } + for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin(); qmsg != c.matches.end(); ++qmsg) { // Update observers and message state: @@ -710,8 +754,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence); dequeueRequired = messages->push(qm, removed); - if (dequeueRequired) + if (dequeueRequired) { observeAcquire(removed, locker); + if (mgmtObject) { + mgmtObject->inc_discardsLvq(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsLvq(); + } + } listeners.populate(copy); observeEnqueue(qm, locker); } @@ -799,10 +849,30 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg std::deque<QueuedMessage> dequeues; { Mutex::ScopedLock locker(messageLock); - policy->tryEnqueue(msg); + try { + policy->tryEnqueue(msg); + } catch(ResourceLimitExceededException&) { + if (mgmtObject) { + mgmtObject->inc_discardsOverflow(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsOverflow(); + } + throw; + } policy->getPendingDequeues(dequeues); } //depending on policy, may have some dequeues that need to performed without holding the lock + + // + // Count the dequeues as ring-discards. We know that these aren't rejects because + // policy->tryEnqueue would have thrown an exception. + // + if (mgmtObject && !dequeues.empty()) { + mgmtObject->inc_discardsRing(dequeues.size()); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsRing(dequeues.size()); + } + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); } @@ -871,6 +941,10 @@ void Queue::dequeueCommitted(const QueuedMessage& msg) if (mgmtObject != 0) { mgmtObject->inc_msgTxnDequeues(); mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgTxnDequeues(); + brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize()); + } } } @@ -893,8 +967,8 @@ void Queue::popAndDequeue(const Mutex::ScopedLock& held) */ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { - if (policy.get()) policy->dequeued(msg); mgntDeqStats(msg.payload); + if (policy.get()) policy->dequeued(msg); for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->dequeued(msg); @@ -909,6 +983,12 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) */ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) { + if (mgmtObject) { + mgmtObject->inc_acquires(); + if (brokerMgmtObject) + brokerMgmtObject->inc_acquires(); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->acquired(msg); @@ -923,6 +1003,12 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&) */ void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&) { + if (mgmtObject) { + mgmtObject->inc_releases(); + if (brokerMgmtObject) + brokerMgmtObject->inc_releases(); + } + for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) { try{ (*i)->requeued(msg); @@ -1079,14 +1165,22 @@ void Queue::configureImpl(const FieldTable& _settings) void Queue::destroyed() { unbind(broker->getExchanges()); - if (alternateExchange.get()) { + { Mutex::ScopedLock locker(messageLock); while(!messages->empty()){ DeliverableMessage msg(messages->front().payload); - alternateExchange->routeWithAlternate(msg); + if (alternateExchange.get()) { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandonedViaAlt(); + alternateExchange->routeWithAlternate(msg); + } else { + if (brokerMgmtObject) + brokerMgmtObject->inc_abandoned(); + } popAndDequeue(locker); } - alternateExchange->decAlternateUsers(); + if (alternateExchange.get()) + alternateExchange->decAlternateUsers(); } if (store) { @@ -1124,6 +1218,8 @@ void Queue::unbind(ExchangeRegistry& exchanges) void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy) { policy = _policy; + if (policy.get()) + policy->setQueue(this); } const QueuePolicy* Queue::getPolicy() @@ -1291,6 +1387,40 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) { } } +void Queue::countRejected() const +{ + if (mgmtObject) { + mgmtObject->inc_discardsSubscriber(); + if (brokerMgmtObject) + brokerMgmtObject->inc_discardsSubscriber(); + } +} + +void Queue::countFlowedToDisk(uint64_t size) const +{ + if (mgmtObject) { + mgmtObject->inc_msgFtdEnqueues(); + mgmtObject->inc_byteFtdEnqueues(size); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgFtdEnqueues(); + brokerMgmtObject->inc_byteFtdEnqueues(size); + } + } +} + +void Queue::countLoadedFromDisk(uint64_t size) const +{ + if (mgmtObject) { + mgmtObject->inc_msgFtdDequeues(); + mgmtObject->inc_byteFtdDequeues(size); + if (brokerMgmtObject) { + brokerMgmtObject->inc_msgFtdDequeues(); + brokerMgmtObject->inc_byteFtdDequeues(size); + } + } +} + + ManagementObject* Queue::GetManagementObject (void) const { return (ManagementObject*) mgmtObject; diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h index 59ae41e768..5eca1e9b0c 100644 --- a/cpp/src/qpid/broker/Queue.h +++ b/cpp/src/qpid/broker/Queue.h @@ -39,6 +39,7 @@ #include "qpid/sys/Timer.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Queue.h" +#include "qmf/org/apache/qpid/broker/Broker.h" #include "qpid/framing/amqp_types.h" #include <boost/shared_ptr.hpp> @@ -92,7 +93,6 @@ class Queue : public boost::enable_shared_from_this<Queue>, typedef std::set< boost::shared_ptr<QueueObserver> > Observers; enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2}; - const std::string name; const bool autodelete; MessageStore* store; @@ -119,6 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>, boost::shared_ptr<Exchange> alternateExchange; framing::SequenceNumber sequence; qmf::org::apache::qpid::broker::Queue* mgmtObject; + qmf::org::apache::qpid::broker::Broker* brokerMgmtObject; sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge. int eventMode; Observers observers; @@ -165,9 +166,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, if (mgmtObject != 0) { mgmtObject->inc_msgTotalEnqueues (); mgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); + brokerMgmtObject->inc_msgTotalEnqueues (); + brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ()); if (msg->isPersistent ()) { mgmtObject->inc_msgPersistEnqueues (); mgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); + brokerMgmtObject->inc_msgPersistEnqueues (); + brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ()); } } } @@ -176,9 +181,13 @@ class Queue : public boost::enable_shared_from_this<Queue>, if (mgmtObject != 0){ mgmtObject->inc_msgTotalDequeues (); mgmtObject->inc_byteTotalDequeues (msg->contentSize()); + brokerMgmtObject->inc_msgTotalDequeues (); + brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize()); if (msg->isPersistent ()){ mgmtObject->inc_msgPersistDequeues (); mgmtObject->inc_bytePersistDequeues (msg->contentSize()); + brokerMgmtObject->inc_msgPersistDequeues (); + brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize()); } } } @@ -355,6 +364,11 @@ class Queue : public boost::enable_shared_from_this<Queue>, virtual void setExternalQueueStore(ExternalQueueStore* inst); + // Increment the rejected-by-consumer counter. + void countRejected() const; + void countFlowedToDisk(uint64_t size) const; + void countLoadedFromDisk(uint64_t size) const; + // Manageable entry points management::ManagementObject* GetManagementObject (void) const; management::Manageable::status_t diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp index dafcf92a63..d5b4c1ae86 100644 --- a/cpp/src/qpid/broker/QueuePolicy.cpp +++ b/cpp/src/qpid/broker/QueuePolicy.cpp @@ -31,7 +31,7 @@ using namespace qpid::broker; using namespace qpid::framing; QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) : - maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) { + maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), queue(0), name(_name) { QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize); } @@ -204,7 +204,11 @@ FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount, bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m) { - if (!QueuePolicy::checkLimit(m)) m->requestContentRelease(); + if (!QueuePolicy::checkLimit(m)) { + m->requestContentRelease(); + if (queue) + queue->countFlowedToDisk(m->contentSize()); + } return true; } diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h index ec7f846704..f23b709f18 100644 --- a/cpp/src/qpid/broker/QueuePolicy.h +++ b/cpp/src/qpid/broker/QueuePolicy.h @@ -33,6 +33,8 @@ namespace qpid { namespace broker { +class Queue; + class QueuePolicy { static uint64_t defaultMaxSize; @@ -44,8 +46,8 @@ class QueuePolicy uint64_t size; bool policyExceeded; - protected: + Queue* queue; uint64_t getCurrentQueueSize() const { return size; } public: @@ -72,6 +74,8 @@ class QueuePolicy void decode ( framing::Buffer& buffer ); uint32_t encodedSize() const; virtual void getPendingDequeues(Messages& result); + std::string getType() const { return type; } + void setQueue(Queue* q) { queue = q; } static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings); static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT); diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index ff5271f83b..741ef442b0 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -122,7 +122,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100), - msgBuffer(MA_BUFFER_SIZE) + msgBuffer(MA_BUFFER_SIZE), memstat(0) { nextObjectId = 1; brokerBank = 1; @@ -132,6 +132,9 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : clientWasAdded = false; attrMap["_vendor"] = defaultVendorName; attrMap["_product"] = defaultProductName; + + memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker"); + addObject(memstat, "amqp-broker"); } ManagementAgent::~ManagementAgent () @@ -720,6 +723,7 @@ void ManagementAgent::periodicProcessing (void) static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); moveNewObjectsLH(); + qpid::sys::MemStat::loadMemInfo(memstat); // // Clear the been-here flag on all objects in the map. @@ -1834,6 +1838,9 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe string className (value->get<string>()); std::list<ObjectId>matches; + if (className == "memory") + qpid::sys::MemStat::loadMemInfo(memstat); + // build up a set of all objects to be dumped for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); @@ -1946,6 +1953,8 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co packageName = s_iter->second.asString(); } + if (className == "memory") + qpid::sys::MemStat::loadMemInfo(memstat); /* * Unpack the _object_id element of the query if it is present. If it is present, find that one @@ -1968,6 +1977,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co Variant::Map values; Variant::Map oidMap; + object->writeTimestamps(map_); object->mapEncodeValues(values, true, true); // write both stats and properties objId.mapEncode(oidMap); map_["_values"] = values; diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index c21f384433..f68bfe0577 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -32,6 +32,8 @@ #include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Agent.h" +#include "qmf/org/apache/qpid/broker/Memory.h" +#include "qpid/sys/MemStat.h" #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> #include <qpid/framing/FieldValue.h> @@ -343,6 +345,11 @@ private: char eventBuffer[MA_BUFFER_SIZE]; framing::ResizableBuffer msgBuffer; + // + // Memory statistics object + // + qmf::org::apache::qpid::broker::Memory *memstat; + void writeData (); void periodicProcessing (void); void deleteObjectNowLH(const ObjectId& oid); diff --git a/cpp/src/qpid/sys/posix/MemStat.cpp b/cpp/src/qpid/sys/posix/MemStat.cpp new file mode 100644 index 0000000000..72c53e5886 --- /dev/null +++ b/cpp/src/qpid/sys/posix/MemStat.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/MemStat.h" +#include <malloc.h> + +void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object) +{ + struct mallinfo info(mallinfo()); + + object->set_malloc_arena(info.arena); + object->set_malloc_ordblks(info.ordblks); + object->set_malloc_hblks(info.hblks); + object->set_malloc_hblkhd(info.hblkhd); + object->set_malloc_uordblks(info.uordblks); + object->set_malloc_fordblks(info.fordblks); + object->set_malloc_keepcost(info.keepcost); +} + + diff --git a/cpp/src/qpid/sys/windows/MemStat.cpp b/cpp/src/qpid/sys/windows/MemStat.cpp new file mode 100644 index 0000000000..4ad73933ad --- /dev/null +++ b/cpp/src/qpid/sys/windows/MemStat.cpp @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/sys/MemStat.h" + +qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory*) +{ + // TODO: Add Windows-specific memory stats to the object and load them here. +} + + diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java index 27345f0a88..4367a04da9 100644 --- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java +++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java @@ -678,6 +678,204 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return (System.currentTimeMillis() - _obj.getCreateTime()) * 1000000L; } + public Long getQueueCount() + { + // TODO + return 0L; + } + + public Long getMsgTotalEnqueues() + { + // TODO + return 0L; + } + + public Long getMsgTotalDequeues() + { + // TODO + return 0L; + } + + public Long getByteTotalEnqueues() + { + // TODO + return 0L; + } + + public Long getByteTotalDequeues() + { + // TODO + return 0L; + } + + public Long getMsgDepth() + { + // TODO + return 0L; + } + + public Long getByteDepth() + { + // TODO + return 0L; + } + + public Long getMsgPersistEnqueues() + { + // TODO + return 0L; + } + + public Long getMsgPersistDequeues() + { + // TODO + return 0L; + } + + public Long getBytePersistEnqueues() + { + // TODO + return 0L; + } + + public Long getBytePersistDequeues() + { + // TODO + return 0L; + } + + public Long getMsgTxnEnqueues() + { + // TODO + return 0L; + } + + public Long getMsgTxnDequeues() + { + // TODO + return 0L; + } + + public Long getByteTxnEnqueues() + { + // TODO + return 0L; + } + + public Long getByteTxnDequeues() + { + // TODO + return 0L; + } + + public Long getMsgFtdEnqueues() + { + // TODO + return 0L; + } + + public Long getMsgFtdDequeues() + { + // TODO + return 0L; + } + + public Long getByteFtdEnqueues() + { + // TODO + return 0L; + } + + public Long getByteFtdDequeues() + { + // TODO + return 0L; + } + + public Long getMsgFtdDepth() + { + // TODO + return 0L; + } + + public Long getByteFtdDepth() + { + // TODO + return 0L; + } + + public Long getReleases() + { + // TODO + return 0L; + } + + public Long getAcquires() + { + // TODO + return 0L; + } + + public Long getDiscardsNoRoute() + { + // TODO + return 0L; + } + + public Long getDiscardsTtl() + { + // TODO + return 0L; + } + + public Long getDiscardsRing() + { + // TODO + return 0L; + } + + public Long getDiscardsLvq() + { + // TODO + return 0L; + } + + public Long getDiscardsOverflow() + { + // TODO + return 0L; + } + + public Long getDiscardsSubscriber() + { + // TODO + return 0L; + } + + public Long getDiscardsPurge() + { + // TODO + return 0L; + } + + public Long getReroutes() + { + // TODO + return 0L; + } + + public Long getAbandoned() + { + // TODO + return 0L; + } + + public Long getAbandonedViaAlt() + { + // TODO + return 0L; + } + public BrokerSchema.BrokerClass.EchoMethodResponseCommand echo(final BrokerSchema.BrokerClass.EchoMethodResponseCommandFactory factory, final Long sequence, final String body) @@ -1064,6 +1262,96 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable return _obj.getPersistentByteDequeues(); } + public Long getMsgFtdEnqueues() + { + // TODO + return 0L; + } + + public Long getMsgFtdDequeues() + { + // TODO + return 0L; + } + + public Long getByteFtdEnqueues() + { + // TODO + return 0L; + } + + public Long getByteFtdDequeues() + { + // TODO + return 0L; + } + + public Long getMsgFtdDepth() + { + // TODO + return 0L; + } + + public Long getByteFtdDepth() + { + // TODO + return 0L; + } + + public Long getReleases() + { + // TODO + return 0L; + } + + public Long getAcquires() + { + // TODO + return 0L; + } + + public Long getDiscardsTtl() + { + // TODO + return 0L; + } + + public Long getDiscardsRing() + { + // TODO + return 0L; + } + + public Long getDiscardsLvq() + { + // TODO + return 0L; + } + + public Long getDiscardsOverflow() + { + // TODO + return 0L; + } + + public Long getDiscardsSubscriber() + { + // TODO + return 0L; + } + + public Long getDiscardsPurge() + { + // TODO + return 0L; + } + + public Long getReroutes() + { + // TODO + return 0L; + } + public Long getConsumerCount() { return (long) _obj.getConsumerCount(); diff --git a/java/broker/src/xsl/qmf.xsl b/java/broker/src/xsl/qmf.xsl index 1e98c97466..c45d1e419e 100644 --- a/java/broker/src/xsl/qmf.xsl +++ b/java/broker/src/xsl/qmf.xsl @@ -794,9 +794,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage <xsl:when test="$type='objId'">OBJECTREFERENCE</xsl:when> <xsl:when test="$type='sstr'">STR8</xsl:when> <xsl:when test="$type='lstr'">STR16</xsl:when> +<xsl:when test="$type='uint8'">UINT8</xsl:when> <xsl:when test="$type='uint16'">UINT16</xsl:when> <xsl:when test="$type='uint32'">UINT32</xsl:when> <xsl:when test="$type='uint64'">UINT64</xsl:when> +<xsl:when test="$type='int8'">INT8</xsl:when> +<xsl:when test="$type='int16'">INT16</xsl:when> +<xsl:when test="$type='int32'">INT32</xsl:when> +<xsl:when test="$type='int64'">INT64</xsl:when> <xsl:when test="$type='uuid'">UUID</xsl:when> <xsl:when test="$type='deltaTime'">DELTATIME</xsl:when> <xsl:when test="$type='count32'">UINT32</xsl:when> @@ -813,9 +818,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage <xsl:when test="$type='objId'"><xsl:value-of select="$referenceType"/>Object</xsl:when> <xsl:when test="$type='sstr'">String</xsl:when> <xsl:when test="$type='lstr'">String</xsl:when> + <xsl:when test="$type='uint8'">Integer</xsl:when> <xsl:when test="$type='uint16'">Integer</xsl:when> <xsl:when test="$type='uint32'">Long</xsl:when> <xsl:when test="$type='uint64'">Long</xsl:when> + <xsl:when test="$type='int8'">Integer</xsl:when> + <xsl:when test="$type='int16'">Integer</xsl:when> + <xsl:when test="$type='int32'">Long</xsl:when> + <xsl:when test="$type='int64'">Long</xsl:when> <xsl:when test="$type='uuid'">UUID</xsl:when> <xsl:when test="$type='deltaTime'">Long</xsl:when> <xsl:when test="$type='count32'">Long</xsl:when> @@ -831,9 +841,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage <xsl:when test="$type='objId'">writeBin128( <xsl:value-of select="$var"/>.getId() )</xsl:when> <xsl:when test="$type='sstr'">writeStr8( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='lstr'">writeStr16( <xsl:value-of select="$var"/> )</xsl:when> + <xsl:when test="$type='uint8'">writeUint8( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='uint16'">writeUint16( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='uint32'">writeUint32( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='uint64'">writeUint64( <xsl:value-of select="$var"/> )</xsl:when> + <xsl:when test="$type='int8'">writeInt8( <xsl:value-of select="$var"/> )</xsl:when> + <xsl:when test="$type='int16'">writeInt16( <xsl:value-of select="$var"/> )</xsl:when> + <xsl:when test="$type='int32'">writeInt32( <xsl:value-of select="$var"/> )</xsl:when> + <xsl:when test="$type='int64'">writeInt64( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='uuid'">writeUuid( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='deltaTime'">writeUint64( <xsl:value-of select="$var"/> )</xsl:when> <xsl:when test="$type='count32'">writeUint32( <xsl:value-of select="$var"/> )</xsl:when> @@ -849,9 +864,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage <xsl:when test="$type='objId'">readBin128()</xsl:when> <xsl:when test="$type='sstr'">readStr8()</xsl:when> <xsl:when test="$type='lstr'">readStr16()</xsl:when> + <xsl:when test="$type='uint8'">readUint8()</xsl:when> <xsl:when test="$type='uint16'">readUint16()</xsl:when> <xsl:when test="$type='uint32'">readUint32()</xsl:when> <xsl:when test="$type='uint64'">readUint64()</xsl:when> + <xsl:when test="$type='int8'">readInt8()</xsl:when> + <xsl:when test="$type='int16'">readInt16()</xsl:when> + <xsl:when test="$type='int32'">readInt32()</xsl:when> + <xsl:when test="$type='int64'">readInt64()</xsl:when> <xsl:when test="$type='uuid'">readUuid()</xsl:when> <xsl:when test="$type='deltaTime'">readUint64()</xsl:when> <xsl:when test="$type='count32'">readUint32()</xsl:when> diff --git a/specs/management-schema.xml b/specs/management-schema.xml index 9e2a644c2a..8e3e798af6 100644 --- a/specs/management-schema.xml +++ b/specs/management-schema.xml @@ -57,6 +57,22 @@ <!-- =============================================================== + Memory + =============================================================== + --> + <class name="Memory"> + <property name="name" type="sstr" access="RC" index="y" desc="Index for the broker at this agent"/> + <property name="malloc_arena" type="uint64" access="RO" optional="y" desc="Total size of memory allocated with `sbrk' by `malloc', in bytes"/> + <property name="malloc_ordblks" type="uint64" access="RO" optional="y" desc="The number of chunks not in use"/> + <property name="malloc_hblks" type="uint64" access="RO" optional="y" desc="Total number of chunks allocated with `mmap'"/> + <property name="malloc_hblkhd" type="uint64" access="RO" optional="y" desc="Total size of memory allocated with `mmap', in bytes"/> + <property name="malloc_uordblks" type="uint64" access="RO" optional="y" desc="Total size of memory occupied by chunks handed out by `malloc'"/> + <property name="malloc_fordblks" type="uint64" access="RO" optional="y" desc="Total size of memory occupied by free (not in use) chunks"/> + <property name="malloc_keepcost" type="uint64" access="RO" optional="y" desc="The size of the top-most releasable chunk that normally borders the end of the heap"/> + </class> + + <!-- + =============================================================== Broker =============================================================== --> @@ -73,6 +89,40 @@ <property name="dataDir" type="lstr" access="RO" optional="y" desc="Persistent configuration storage location"/> <statistic name="uptime" type="deltaTime"/> + <statistic name="queueCount" type="count64" unit="queue" desc="Number of queues in the broker"/> + <statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued to broker"/> + <statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued from broker"/> + <statistic name="byteTotalEnqueues" type="count64" unit="octet" desc="Total bytes enqueued to broker"/> + <statistic name="byteTotalDequeues" type="count64" unit="octet" desc="Total bytes dequeued from broker"/> + <statistic name="msgDepth" type="count64" unit="message" desc="Current number of messages on queues in broker" assign="msgTotalEnqueues - msgTotalDequeues"/> + <statistic name="byteDepth" type="count64" unit="octet" desc="Current number of bytes on queues in broker" assign="byteTotalEnqueues - byteTotalDequeues"/> + <statistic name="msgPersistEnqueues" type="count64" unit="message" desc="Total persistent messages enqueued to broker"/> + <statistic name="msgPersistDequeues" type="count64" unit="message" desc="Total persistent messages dequeued from broker"/> + <statistic name="bytePersistEnqueues" type="count64" unit="octet" desc="Total persistent bytes enqueued to broker"/> + <statistic name="bytePersistDequeues" type="count64" unit="octet" desc="Total persistent bytes dequeued from broker"/> + <statistic name="msgTxnEnqueues" type="count64" unit="message" desc="Total transactional messages enqueued to broker"/> + <statistic name="msgTxnDequeues" type="count64" unit="message" desc="Total transactional messages dequeued from broker"/> + <statistic name="byteTxnEnqueues" type="count64" unit="octet" desc="Total transactional bytes enqueued to broker"/> + <statistic name="byteTxnDequeues" type="count64" unit="octet" desc="Total transactional bytes dequeued from broker"/> + <statistic name="msgFtdEnqueues" type="count64" unit="message" desc="Total message bodies released from memory and flowed-to-disk on broker"/> + <statistic name="msgFtdDequeues" type="count64" unit="message" desc="Total message bodies dequeued from the broker having been flowed-to-disk"/> + <statistic name="byteFtdEnqueues" type="count64" unit="octet" desc="Total bytes released from memory and flowed-to-disk on broker"/> + <statistic name="byteFtdDequeues" type="count64" unit="octet" desc="Total bytes dequeued from the broker having been flowed-to-disk"/> + <statistic name="msgFtdDepth" type="count64" unit="message" desc="Current number of messages flowed-to-disk" assign="msgFtdEnqueues - msgFtdDequeues"/> + <statistic name="byteFtdDepth" type="count64" unit="octet" desc="Current number of bytes flowed-to-disk" assign="byteFtdEnqueues - byteFtdDequeues"/> + <statistic name="releases" type="count64" unit="message" desc="Acquired messages reinserted into the queue"/> + <statistic name="acquires" type="count64" unit="message" desc="Messages acquired from the queue"/> + <statistic name="discardsNoRoute" type="count64" unit="message" desc="Messages discarded due to no-route from exchange"/> + <statistic name="discardsTtl" type="count64" unit="message" desc="Messages discarded due to TTL expiration"/> + <statistic name="discardsRing" type="count64" unit="message" desc="Messages discarded due to ring-queue overflow"/> + <statistic name="discardsLvq" type="count64" unit="message" desc="Messages discarded due to LVQ insert"/> + <statistic name="discardsOverflow" type="count64" unit="message" desc="Messages discarded due to reject-policy overflow"/> + <statistic name="discardsSubscriber" type="count64" unit="message" desc="Messages discarded due to subscriber reject"/> + <statistic name="discardsPurge" type="count64" unit="message" desc="Messages discarded due to management purge"/> + <statistic name="reroutes" type="count64" unit="message" desc="Messages dequeued to management re-route"/> + <statistic name="abandoned" type="count64" unit="message" desc="Messages left in a deleted queue"/> + <statistic name="abandonedViaAlt" type="count64" unit="message" desc="Messages routed to alternate exchange from a deleted queue"/> + <method name="echo" desc="Request a response to test the path to the management broker"> <arg name="sequence" dir="IO" type="uint32" default="0"/> <arg name="body" dir="IO" type="lstr" default=""/> @@ -187,6 +237,31 @@ <statistic name="byteTxnDequeues" type="count64" unit="octet" desc="Transactional messages dequeued"/> <statistic name="bytePersistEnqueues" type="count64" unit="octet" desc="Persistent messages enqueued"/> <statistic name="bytePersistDequeues" type="count64" unit="octet" desc="Persistent messages dequeued"/> + + <!-- Flow-to-disk Statistics --> + + <statistic name="msgFtdEnqueues" type="count64" unit="message" desc="Total message bodies released from memory and flowed-to-disk on broker"/> + <statistic name="msgFtdDequeues" type="count64" unit="message" desc="Total message bodies dequeued from the broker having been flowed-to-disk"/> + <statistic name="byteFtdEnqueues" type="count64" unit="octet" desc="Total bytes released from memory and flowed-to-disk on broker"/> + <statistic name="byteFtdDequeues" type="count64" unit="octet" desc="Total bytes dequeued from the broker having been flowed-to-disk"/> + <statistic name="msgFtdDepth" type="count64" unit="message" desc="Current number of messages flowed-to-disk" assign="msgFtdEnqueues - msgFtdDequeues"/> + <statistic name="byteFtdDepth" type="count64" unit="octet" desc="Current number of bytes flowed-to-disk" assign="byteFtdEnqueues - byteFtdDequeues"/> + + <!-- Acquire and Release Statistics - These do not affect msgDepth since msgDepth includes acquired-but-not-completed messages. --> + + <statistic name="releases" type="count64" unit="message" desc="Acquired messages reinserted into the queue"/> + <statistic name="acquires" type="count64" unit="message" desc="Messages acquired from the queue"/> + + <!-- Dequeue Details - all of these are included in msgTotalDequeues --> + + <statistic name="discardsTtl" type="count64" unit="message" desc="Messages discarded due to TTL expiration"/> + <statistic name="discardsRing" type="count64" unit="message" desc="Messages discarded due to ring-queue overflow"/> + <statistic name="discardsLvq" type="count64" unit="message" desc="Messages discarded due to LVQ insert"/> + <statistic name="discardsOverflow" type="count64" unit="message" desc="Messages discarded due to reject-policy overflow"/> + <statistic name="discardsSubscriber" type="count64" unit="message" desc="Messages discarded due to subscriber reject"/> + <statistic name="discardsPurge" type="count64" unit="message" desc="Messages discarded due to management purge"/> + <statistic name="reroutes" type="count64" unit="message" desc="Messages dequeued to management re-route"/> + <statistic name="consumerCount" type="hilo32" unit="consumer" desc="Current consumers on queue"/> <statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/> <statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/> diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py index 5b1964ae98..107b34c82b 100644 --- a/tests/src/py/qpid_tests/broker_0_10/__init__.py +++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py @@ -35,3 +35,4 @@ from threshold import * from extensions import * from msg_groups import * from new_api import * +from stats import * diff --git a/tests/src/py/qpid_tests/broker_0_10/stats.py b/tests/src/py/qpid_tests/broker_0_10/stats.py new file mode 100644 index 0000000000..f1f809cf2f --- /dev/null +++ b/tests/src/py/qpid_tests/broker_0_10/stats.py @@ -0,0 +1,489 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import * +from qpid.tests.messaging import Base +import qmf.console +from time import sleep +import os +import sys + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("../../../../tools/src/py")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs.broker import BrokerAgent + +# +# Tests the Broker's statistics reporting +# + +class BrokerStatsTests(Base): + """ + Tests of the broker's statistics + """ + + def assertEqual(self, left, right, text=None): + if not left == right: + print "assertEqual failure: %r != %r" % (left, right) + if text: + print " %r" % text + assert None + + def failUnless(self, value, text=None): + if value: + return + print "failUnless failure", + if text: + print ": %r" % text + else: + print + assert None + + def fail(self, text=None): + if text: + print "Fail: %r" % text + assert None + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self, tx=False): + return self.conn.session(transactional=tx) + + def setup_access(self): + return BrokerAgent(self.conn) + + + def test_enqueues_dequeues(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("enqueue_test;{create:always,delete:always}") + rx = sess.receiver("enqueue_test") + + queue = agent.getQueue("enqueue_test") + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues") + self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues") + self.assertEqual(queue.msgDepth, 0, "msgDepth") + self.assertEqual(queue.byteDepth, 0, "byteDepth") + + tx.send("0123456789") + tx.send("01234567890123456789") + tx.send("012345678901234567890123456789") + tx.send("0123456789012345678901234567890123456789") + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues") + self.assertEqual(queue.byteTotalEnqueues, 100, "byteTotalEnqueues") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues") + self.assertEqual(queue.msgDepth, 4, "msgDepth") + self.assertEqual(queue.byteDepth, 100, "byteDepth") + + now_broker = agent.getBroker() + self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues") + self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues") + + m = rx.fetch() + m = rx.fetch() + sess.acknowledge() + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues") + self.assertEqual(queue.byteTotalEnqueues, 100, "byteTotalEnqueues") + self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues") + self.assertEqual(queue.byteTotalDequeues, 30, "byteTotalDequeues") + self.assertEqual(queue.msgDepth, 2, "msgDepth") + self.assertEqual(queue.byteDepth, 70, "byteDepth") + + now_broker = agent.getBroker() + self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues") + self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues") + + sess.close() + + now_broker = agent.getBroker() + self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages") + self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged") + self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged") + + + def test_transactional_enqueues_dequeues(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session(True) + tx = sess.sender("tx_enqueue_test;{create:always,delete:always}") + + tx.send("0123456789") + tx.send("0123456789") + tx.send("0123456789") + tx.send("0123456789") + + queue = agent.getQueue("tx_enqueue_test") + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues pre-tx-commit") + self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues pre-tx-commit") + self.assertEqual(queue.msgTxnEnqueues, 0, "msgTxnEnqueues pre-tx-commit") + self.assertEqual(queue.byteTxnEnqueues, 0, "byteTxnEnqueues pre-tx-commit") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-tx-commit") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-tx-commit") + self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-tx-commit") + self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-tx-commit") + + sess.commit() + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit") + self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues post-tx-commit") + self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit") + self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues post-tx-commit") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit") + self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit") + self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues post-tx-commit") + + sess2 = self.setup_session(True) + rx = sess2.receiver("tx_enqueue_test") + + m = rx.fetch() + m = rx.fetch() + m = rx.fetch() + m = rx.fetch() + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit") + self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues pre-rx-commit") + self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit") + self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues pre-rx-commit") + self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit") + self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit") + self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit") + self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-rx-commit") + + sess2.acknowledge() + sess2.commit() + + queue.update() + self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit") + self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues post-rx-commit") + self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit") + self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues post-rx-commit") + self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit") + self.assertEqual(queue.byteTotalDequeues, 40, "byteTotalDequeues post-rx-commit") + self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit") + self.assertEqual(queue.byteTxnDequeues, 40, "byteTxnDequeues post-rx-commit") + + sess.close() + sess2.close() + + now_broker = agent.getBroker() + self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues") + self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40, "broker byteTxnEnqueues") + self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues") + self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40, "broker byteTxnDequeues") + + + def test_discards_no_route(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("amq.topic/non.existing.key") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + tx.send("NO_ROUTE") + + now_broker = agent.getBroker() + + self.assertEqual(now_broker.discardsNoRoute - start_broker.discardsNoRoute, 5, "Expect 5 no-routes") + + sess.close() + + + def test_abandoned_alt(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("abandon_alt;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}") + rx = sess.receiver("abandon_alt") + rx.capacity = 2 + + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + tx.send("ABANDON_ALT") + + rx.fetch() + + sess.close() + now_broker = agent.getBroker() + self.assertEqual(now_broker.abandonedViaAlt - start_broker.abandonedViaAlt, 5, "Expect 5 abandonedViaAlt") + self.assertEqual(now_broker.abandoned - start_broker.abandoned, 0, "Expect 0 abandoned") + + + def test_discards_ttl(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_ttl;{create:always,delete:always}") + msg = Message("TTL") + msg.ttl = 1 + + tx.send(msg) + tx.send(msg) + tx.send(msg) + tx.send(msg) + tx.send(msg) + tx.send(msg) + + sleep(2) + + rx = sess.receiver("discards_ttl") + try: + rx.fetch(0) + except: + pass + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_ttl") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsTtl, 6, "expect 6 TTL discards on queue") + self.assertEqual(now_broker.discardsTtl - start_broker.discardsTtl, 6, "expect 6 TTL discards on broker") + self.assertEqual(queue.msgTotalDequeues, 6, "expect 6 total dequeues on queue") + + sess.close() + + + def test_discards_limit_overflow(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_limit;{create:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0}}}}") + tx.send("LIMIT") + tx.send("LIMIT") + tx.send("LIMIT") + try: + tx.send("LIMIT") + self.fail("expected to fail sending 4th message") + except: + pass + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_limit") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsOverflow, 1, "expect 1 overflow discard on queue") + self.assertEqual(now_broker.discardsOverflow - start_broker.discardsOverflow, 1, "expect 1 overflow discard on broker") + + ## + ## Shut down and restart the connection to clear the error condition. + ## + try: + self.conn.close() + except: + pass + self.conn = self.setup_connection() + + ## + ## Re-create the session to delete the queue. + ## + sess = self.setup_session() + tx = sess.sender("discards_limit;{create:always,delete:always}") + sess.close() + + + def test_discards_ring_overflow(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_ring;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.policy_type':ring}}}}") + + tx.send("RING") + tx.send("RING") + tx.send("RING") + tx.send("RING") + tx.send("RING") + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_ring") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsRing, 2, "expect 2 ring discards on queue") + self.assertEqual(now_broker.discardsRing - start_broker.discardsRing, 2, "expect 2 ring discards on broker") + self.assertEqual(queue.msgTotalDequeues, 2, "expect 2 total dequeues on queue") + + sess.close() + + + def test_discards_lvq_replace(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_lvq;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.last_value_queue_key':key}}}}") + msgA = Message("LVQ_A") + msgA.properties['key'] = 'AAA' + msgB = Message("LVQ_B") + msgB.properties['key'] = 'BBB' + + tx.send(msgA) + tx.send(msgB) + tx.send(msgA) + tx.send(msgA) + tx.send(msgB) + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_lvq") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsLvq, 3, "expect 3 lvq discards on queue") + self.assertEqual(now_broker.discardsLvq - start_broker.discardsLvq, 3, "expect 3 lvq discards on broker") + self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") + + sess.close() + + + def test_discards_reject(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_reject;{create:always,delete:always}") + tx.send("REJECT") + tx.send("REJECT") + tx.send("REJECT") + + rx = sess.receiver("discards_reject") + m = rx.fetch() + sess.acknowledge() + m1 = rx.fetch() + m2 = rx.fetch() + sess.acknowledge(m1, Disposition(REJECTED)) + sess.acknowledge(m2, Disposition(REJECTED)) + + now_broker = agent.getBroker() + queue = agent.getQueue("discards_reject") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.discardsSubscriber, 2, "expect 2 reject discards on queue") + self.assertEqual(now_broker.discardsSubscriber - start_broker.discardsSubscriber, 2, "expect 2 reject discards on broker") + self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") + + sess.close() + + + def test_message_release(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("message_release;{create:always,delete:always}") + tx.send("RELEASE") + tx.send("RELEASE") + tx.send("RELEASE") + tx.send("RELEASE") + tx.send("RELEASE") + + rx = sess.receiver("message_release") + m1 = rx.fetch() + m2 = rx.fetch() + sess.acknowledge(m1, Disposition(RELEASED)) + sess.acknowledge(m2, Disposition(RELEASED)) + + now_broker = agent.getBroker() + queue = agent.getQueue("message_release") + + self.failUnless(queue, "expected a valid queue object") + self.assertEqual(queue.acquires, 2, "expect 2 acquires on queue") + self.failUnless(now_broker.acquires - start_broker.acquires >= 2, "expect at least 2 acquires on broker") + self.assertEqual(queue.msgTotalDequeues, 0, "expect 0 total dequeues on queue") + + self.assertEqual(queue.releases, 2, "expect 2 releases on queue") + self.failUnless(now_broker.releases - start_broker.releases >= 2, "expect at least 2 releases on broker") + + sess.close() + + + def test_discards_purge(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("discards_purge;{create:always,delete:always}") + tx.send("PURGE") + tx.send("PURGE") + tx.send("PURGE") + tx.send("PURGE") + tx.send("PURGE") + + queue = agent.getQueue("discards_purge") + self.failUnless(queue, "expected a valid queue object") + + queue.purge(3) + queue.update() + + now_broker = agent.getBroker() + self.assertEqual(queue.discardsPurge, 3, "expect 3 purge discards on queue") + self.assertEqual(now_broker.discardsPurge - start_broker.discardsPurge, 3, "expect 3 purge discards on broker") + self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue") + + sess.close() + + + def test_reroutes(self): + agent = self.setup_access() + start_broker = agent.getBroker() + + sess = self.setup_session() + tx = sess.sender("reroute;{create:always,delete:always}") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + tx.send("REROUTE") + + queue = agent.getQueue("reroute") + self.failUnless(queue, "expected a valid queue object") + + queue.reroute(5, False, 'amq.fanout') + queue.update() + + now_broker = agent.getBroker() + self.assertEqual(queue.reroutes, 5, "expect 5 reroutes on queue") + self.assertEqual(now_broker.reroutes - start_broker.reroutes, 5, "expect 5 reroutes on broker") + self.assertEqual(queue.msgTotalDequeues, 5, "expect 5 total dequeues on queue") + + sess.close() + + diff --git a/tools/setup.py b/tools/setup.py index feae4bb1bd..b04bb65c87 100755 --- a/tools/setup.py +++ b/tools/setup.py @@ -23,15 +23,16 @@ setup(name="qpid-tools", version="0.15", author="Apache Qpid", author_email="dev@qpid.apache.org", - scripts=["src/py/qpid-cluster", - "src/py/qpid-cluster-store", - "src/py/qpid-config", - "src/py/qpid-printevents", - "src/py/qpid-queue-stats", - "src/py/qpid-route", - "src/py/qpid-stat", - "src/py/qpid-tool", - "src/py/qmf-tool"], + packages=["qpidtoollibs"], + scripts=["qpid-cluster", + "qpid-cluster-store", + "qpid-config", + "qpid-printevents", + "qpid-queue-stats", + "qpid-route", + "qpid-stat", + "qpid-tool", + "qmf-tool"], url="http://qpid.apache.org/", license="Apache Software License", description="Diagnostic and management tools for Apache Qpid brokers.") diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat index a7272da3f1..bb094554e6 100755 --- a/tools/src/py/qpid-stat +++ b/tools/src/py/qpid-stat @@ -21,13 +21,18 @@ import os from optparse import OptionParser, OptionGroup -from time import sleep ### debug import sys import locale import socket import re -from qmf.console import Session, Console -from qpid.disp import Display, Header, Sorter +from qpid.messaging import Connection + +home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools")) +sys.path.append(os.path.join(home, "python")) + +from qpidtoollibs.broker import BrokerAgent +from qpidtoollibs.disp import Display, Header, Sorter + class Config: def __init__(self): @@ -37,7 +42,7 @@ class Config: self._limit = 50 self._increasing = False self._sortcol = None - self._cluster_detail = False + self._details = None self._sasl_mechanism = None config = Config() @@ -56,24 +61,16 @@ def OptionsAndArguments(argv): parser.add_option_group(group1) group2 = OptionGroup(parser, "Display Options") - group2.add_option("-b", "--broker", help="Show Brokers", - action="store_const", const="b", dest="show") - group2.add_option("-c", "--connections", help="Show Connections", - action="store_const", const="c", dest="show") - group2.add_option("-e", "--exchanges", help="Show Exchanges", - action="store_const", const="e", dest="show") - group2.add_option("-q", "--queues", help="Show Queues", - action="store_const", const="q", dest="show") - group2.add_option("-u", "--subscriptions", help="Show Subscriptions", - action="store_const", const="u", dest="show") - group2.add_option("-S", "--sort-by", metavar="<colname>", - help="Sort by column name") - group2.add_option("-I", "--increasing", action="store_true", default=False, - help="Sort by increasing value (default = decreasing)") - group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", - help="Limit output to n rows") - group2.add_option("-C", "--cluster", action="store_true", default=False, - help="Display per-broker cluster detail.") + group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show") + group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show") + group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show") + group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show") + group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show") + group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show") + group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name") + group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)") + group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows") + group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.") parser.add_option_group(group2) opts, args = parser.parse_args(args=argv) @@ -86,8 +83,8 @@ def OptionsAndArguments(argv): config._connTimeout = opts.timeout config._increasing = opts.increasing config._limit = opts.limit - config._cluster_detail = opts.cluster config._sasl_mechanism = opts.sasl_mechanism + config._detail = opts.detail if args: config._host = args[0] @@ -119,86 +116,26 @@ class IpAddr: bestAddr = addrPort return bestAddr -class Broker(object): - def __init__(self, qmf, broker): - self.broker = broker - - agents = qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a - - bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0] - self.currentTime = bobj.getTimestamps()[0] - try: - self.uptime = bobj.uptime - except: - self.uptime = 0 - self.connections = {} - self.sessions = {} - self.exchanges = {} - self.queues = {} - self.subscriptions = {} - package = "org.apache.qpid.broker" - - list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent) - for conn in list: - if not conn.shadow: - self.connections[conn.getObjectId()] = conn - - list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent) - for sess in list: - if sess.connectionRef in self.connections: - self.sessions[sess.getObjectId()] = sess - - list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent) - for exchange in list: - self.exchanges[exchange.getObjectId()] = exchange - - list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent) - for queue in list: - self.queues[queue.getObjectId()] = queue - - list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent) - for subscription in list: - self.subscriptions[subscription.getObjectId()] = subscription - - def getName(self): - return self.broker.getUrl() - - def getCurrentTime(self): - return self.currentTime - - def getUptime(self): - return self.uptime - -class BrokerManager(Console): +class BrokerManager: def __init__(self): - self.brokerName = None - self.qmf = None - self.broker = None - self.brokers = [] - self.cluster = None + self.brokerName = None + self.connections = [] + self.brokers = [] + self.cluster = None def SetBroker(self, brokerUrl, mechanism): self.url = brokerUrl - self.qmf = Session() - self.mechanism = mechanism - self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism) - agents = self.qmf.getAgents() - for a in agents: - if a.getAgentBank() == '0': - self.brokerAgent = a + self.connections.append(Connection(self.url, sasl_mechanism=mechanism)) + self.connections[0].open() + self.brokers.append(BrokerAgent(self.connections[0])) def Disconnect(self): """ Release any allocated brokers. Ignore any failures as the tool is shutting down. """ try: - if self.broker: - self.qmf.delBroker(self.broker) - else: - for b in self.brokers: self.qmf.delBroker(b.broker) + for conn in self.connections: + conn.close() except: pass @@ -238,62 +175,63 @@ class BrokerManager(Console): hosts.append(bestUrl) return hosts - def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None): - if len(subs) == 0: - return - this = subs[0] - remaining = subs[1:] - newindent = indent + " " - if this == 'b': - pass - elif this == 'c': - if broker: - for oid in broker.connections: - iconn = broker.connections[oid] - self.printConnSub(indent, broker.getName(), iconn) - self.displaySubs(remaining, newindent, broker=broker, conn=iconn, - sess=sess, exchange=exchange, queue=queue) - elif this == 's': - pass - elif this == 'e': - pass - elif this == 'q': - pass - print - def displayBroker(self, subs): disp = Display(prefix=" ") heads = [] - heads.append(Header('broker')) - heads.append(Header('cluster')) heads.append(Header('uptime', Header.DURATION)) - heads.append(Header('conn', Header.KMG)) - heads.append(Header('sess', Header.KMG)) - heads.append(Header('exch', Header.KMG)) - heads.append(Header('queue', Header.KMG)) + heads.append(Header('connections', Header.COMMAS)) + heads.append(Header('sessions', Header.COMMAS)) + heads.append(Header('exchanges', Header.COMMAS)) + heads.append(Header('queues', Header.COMMAS)) rows = [] - for broker in self.brokers: - if self.cluster: - ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status) - else: - ctext = "<standalone>" - row = (broker.getName(), ctext, broker.getUptime(), - len(broker.connections), len(broker.sessions), - len(broker.exchanges), len(broker.queues)) - rows.append(row) - title = "Brokers" - if config._sortcol: - sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) - dispRows = sorter.getSorted() - else: - dispRows = rows - disp.formattedTable(title, heads, dispRows) + broker = self.brokers[0].getBroker() + connections = self.getConnectionMap() + sessions = self.getSessionMap() + exchanges = self.getExchangeMap() + queues = self.getQueueMap() + row = (broker.getUpdateTime() - broker.getCreateTime(), + len(connections), len(sessions), + len(exchanges), len(queues)) + rows.append(row) + disp.formattedTable('Broker Summary:', heads, rows) + + if 'queueCount' not in broker.values: + return + + print + heads = [] + heads.append(Header('Statistic')) + heads.append(Header('Messages', Header.COMMAS)) + heads.append(Header('Bytes', Header.COMMAS)) + rows = [] + rows.append(['queue-depth', broker.msgDepth, broker.byteDepth]) + rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues]) + rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues]) + rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues]) + rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues]) + rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues]) + rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues]) + rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth]) + rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues]) + rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues]) + rows.append(['acquires', broker.acquires, None]) + rows.append(['releases', broker.releases, None]) + rows.append(['discards-no-route', broker.discardsNoRoute, None]) + rows.append(['discards-ttl-expired', broker.discardsTtl, None]) + rows.append(['discards-limit-overflow', broker.discardsOverflow, None]) + rows.append(['discards-ring-overflow', broker.discardsRing, None]) + rows.append(['discards-lvq-replace', broker.discardsLvq, None]) + rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None]) + rows.append(['discards-purged', broker.discardsPurge, None]) + rows.append(['reroutes', broker.reroutes, None]) + rows.append(['abandoned', broker.abandoned, None]) + rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None]) + disp.formattedTable('Aggregate Broker Statistics:', heads, rows) + def displayConn(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header('client-addr')) heads.append(Header('cproc')) heads.append(Header('cpid')) @@ -303,25 +241,20 @@ class BrokerManager(Console): heads.append(Header('msgIn', Header.KMG)) heads.append(Header('msgOut', Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.connections: - conn = broker.connections[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(conn.address) - row.append(conn.remoteProcessName) - row.append(conn.remotePid) - row.append(conn.authIdentity) - row.append(broker.getCurrentTime() - conn.getTimestamps()[1]) - idle = broker.getCurrentTime() - conn.getTimestamps()[0] - row.append(broker.getCurrentTime() - conn.getTimestamps()[0]) - row.append(conn.msgsFromClient) - row.append(conn.msgsToClient) - rows.append(row) + connections = self.brokers[0].getAllConnections() + broker = self.brokers[0].getBroker() + for conn in connections: + row = [] + row.append(conn.address) + row.append(conn.remoteProcessName) + row.append(conn.remotePid) + row.append(conn.authIdentity) + row.append(broker.getUpdateTime() - conn.getCreateTime()) + row.append(broker.getUpdateTime() - conn.getUpdateTime()) + row.append(conn.msgsFromClient) + row.append(conn.msgsToClient) + rows.append(row) title = "Connections" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -335,8 +268,6 @@ class BrokerManager(Console): def displayExchange(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header("exchange")) heads.append(Header("type")) heads.append(Header("dur", Header.Y)) @@ -348,26 +279,21 @@ class BrokerManager(Console): heads.append(Header("byteOut", Header.KMG)) heads.append(Header("byteDrop", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.exchanges: - ex = broker.exchanges[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(ex.name) - row.append(ex.type) - row.append(ex.durable) - row.append(ex.bindingCount) - row.append(ex.msgReceives) - row.append(ex.msgRoutes) - row.append(ex.msgDrops) - row.append(ex.byteReceives) - row.append(ex.byteRoutes) - row.append(ex.byteDrops) - rows.append(row) + exchanges = self.brokers[0].getAllExchanges() + for ex in exchanges: + row = [] + row.append(ex.name) + row.append(ex.type) + row.append(ex.durable) + row.append(ex.bindingCount) + row.append(ex.msgReceives) + row.append(ex.msgRoutes) + row.append(ex.msgDrops) + row.append(ex.byteReceives) + row.append(ex.byteRoutes) + row.append(ex.byteDrops) + rows.append(row) title = "Exchanges" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -375,11 +301,9 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) - def displayQueue(self, subs): + def displayQueues(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) heads.append(Header("queue")) heads.append(Header("dur", Header.Y)) heads.append(Header("autoDel", Header.Y)) @@ -393,28 +317,23 @@ class BrokerManager(Console): heads.append(Header("cons", Header.KMG)) heads.append(Header("bind", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.queues: - q = broker.queues[oid] - row = [] - if self.cluster: - row.append(broker.getName()) - row.append(q.name) - row.append(q.durable) - row.append(q.autoDelete) - row.append(q.exclusive) - row.append(q.msgDepth) - row.append(q.msgTotalEnqueues) - row.append(q.msgTotalDequeues) - row.append(q.byteDepth) - row.append(q.byteTotalEnqueues) - row.append(q.byteTotalDequeues) - row.append(q.consumerCount) - row.append(q.bindingCount) - rows.append(row) + queues = self.brokers[0].getAllQueues() + for q in queues: + row = [] + row.append(q.name) + row.append(q.durable) + row.append(q.autoDelete) + row.append(q.exclusive) + row.append(q.msgDepth) + row.append(q.msgTotalEnqueues) + row.append(q.msgTotalDequeues) + row.append(q.byteDepth) + row.append(q.byteTotalEnqueues) + row.append(q.byteTotalDequeues) + row.append(q.consumerCount) + row.append(q.bindingCount) + rows.append(row) title = "Queues" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -422,46 +341,46 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) + def displayQueue(self, subs): + disp = Display(prefix=" ") + heads = [] + def displaySubscriptions(self, subs): disp = Display(prefix=" ") heads = [] - if self.cluster: - heads.append(Header('broker')) - heads.append(Header("subscription")) + heads.append(Header("subscr")) heads.append(Header("queue")) - heads.append(Header("connection")) - heads.append(Header("processName")) - heads.append(Header("processId")) - heads.append(Header("browsing", Header.Y)) - heads.append(Header("acknowledged", Header.Y)) - heads.append(Header("exclusive", Header.Y)) + heads.append(Header("conn")) + heads.append(Header("procName")) + heads.append(Header("procId")) + heads.append(Header("browse", Header.Y)) + heads.append(Header("acked", Header.Y)) + heads.append(Header("excl", Header.Y)) heads.append(Header("creditMode")) heads.append(Header("delivered", Header.KMG)) rows = [] - for broker in self.brokers: - for oid in broker.subscriptions: - s = broker.subscriptions[oid] - row = [] - try: - if self.cluster: - row.append(broker.getName()) - row.append(s.name) - row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name) - connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address) - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName) - row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid) - row.append(s.browsing) - row.append(s.acknowledged) - row.append(s.exclusive) - row.append(s.creditMode) - row.append(s.delivered) - rows.append(row) - except: - pass + subscriptions = self.brokers[0].getAllSubscriptions() + sessions = self.getSessionMap() + connections = self.getConnectionMap() + for s in subscriptions: + row = [] + try: + row.append(s.name) + row.append(s.queueRef) + session = sessions[s.sessionRef] + connection = connections[session.connectionRef] + row.append(connection.address) + row.append(connection.remoteProcessName) + row.append(connection.remotePid) + row.append(s.browsing) + row.append(s.acknowledged) + row.append(s.exclusive) + row.append(s.creditMode) + row.append(s.delivered) + rows.append(row) + except: + pass title = "Subscriptions" - if self.cluster: - title += " for cluster '%s'" % self.cluster.clusterName if config._sortcol: sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing) dispRows = sorter.getSorted() @@ -469,33 +388,58 @@ class BrokerManager(Console): dispRows = rows disp.formattedTable(title, heads, dispRows) + def displayMemory(self, unused): + disp = Display(prefix=" ") + heads = [Header('Statistic'), Header('Value', Header.COMMAS)] + rows = [] + memory = self.brokers[0].getMemory() + for k,v in memory.values.items(): + if k != 'name': + rows.append([k, v]) + disp.formattedTable('Broker Memory Statistics:', heads, rows) + + def getExchangeMap(self): + exchanges = self.brokers[0].getAllExchanges() + emap = {} + for e in exchanges: + emap[e.name] = e + return emap + + def getQueueMap(self): + queues = self.brokers[0].getAllQueues() + qmap = {} + for q in queues: + qmap[q.name] = q + return qmap + + def getSessionMap(self): + sessions = self.brokers[0].getAllSessions() + smap = {} + for s in sessions: + smap[s.name] = s + return smap + + def getConnectionMap(self): + connections = self.brokers[0].getAllConnections() + cmap = {} + for c in connections: + cmap[c.address] = c + return cmap + def displayMain(self, main, subs): if main == 'b': self.displayBroker(subs) elif main == 'c': self.displayConn(subs) elif main == 's': self.displaySession(subs) elif main == 'e': self.displayExchange(subs) - elif main == 'q': self.displayQueue(subs) + elif main == 'q': + if config._detail: + self.displayQueue(subs, config._detail) + else: + self.displayQueues(subs) elif main == 'u': self.displaySubscriptions(subs) + elif main == 'm': self.displayMemory(subs) def display(self): - if config._cluster_detail or config._types[0] == 'b': - # always show cluster detail when dumping broker stats - self._getCluster() - if self.cluster: - memberList = self.cluster.members.split(";") - hostList = self._getHostList(memberList) - self.qmf.delBroker(self.broker) - self.broker = None - if config._host.find("@") > 0: - authString = config._host.split("@")[0] + "@" - else: - authString = "" - for host in hostList: - b = self.qmf.addBroker(authString + host, config._connTimeout) - self.brokers.append(Broker(self.qmf, b)) - else: - self.brokers.append(Broker(self.qmf, self.broker)) - self.displayMain(config._types[0], config._types[1:]) diff --git a/tools/src/py/qpidtoollibs/__init__.py b/tools/src/py/qpidtoollibs/__init__.py new file mode 100644 index 0000000000..31d5a2ef58 --- /dev/null +++ b/tools/src/py/qpidtoollibs/__init__.py @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py new file mode 100644 index 0000000000..366d9b0663 --- /dev/null +++ b/tools/src/py/qpidtoollibs/broker.py @@ -0,0 +1,322 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from qpid.messaging import Message +try: + from uuid import uuid4 +except ImportError: + from qpid.datatypes import uuid4 + +class BrokerAgent(object): + def __init__(self, conn): + self.conn = conn + self.sess = self.conn.session() + self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \ + str(uuid4()) + self.reply_rx = self.sess.receiver(self.reply_to) + self.reply_rx.capacity = 10 + self.tx = self.sess.sender("qmf.default.direct/broker") + self.next_correlator = 1 + + def close(self): + self.sess.close() + + def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10): + props = {'method' : 'request', + 'qmf.opcode' : '_method_request', + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + + content = {'_object_id' : {'_object_name' : addr}, + '_method_name' : method, + '_arguments' : arguments} + + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + response = self.reply_rx.fetch(timeout) + self.sess.acknowledge() + if response.properties['qmf.opcode'] == '_exception': + raise Exception("Exception from Agent: %r" % response.content['_values']) + if response.properties['qmf.opcode'] != '_method_response': + raise Exception("bad response: %r" % response.properties) + return response.content['_arguments'] + + def _sendRequest(self, opcode, content): + props = {'method' : 'request', + 'qmf.opcode' : opcode, + 'x-amqp-0-10.app-id' : 'qmf2'} + correlator = str(self.next_correlator) + self.next_correlator += 1 + message = Message(content, reply_to=self.reply_to, correlation_id=correlator, + properties=props, subject="broker") + self.tx.send(message) + return correlator + + def _doClassQuery(self, class_name): + query = {'_what' : 'OBJECT', + '_schema_id' : {'_class_name' : class_name}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + return items + + def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'): + query = {'_what' : 'OBJECT', + '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}} + correlator = self._sendRequest('_query_request', query) + response = self.reply_rx.fetch(10) + if response.properties['qmf.opcode'] != '_query_response': + raise Exception("bad response") + items = [] + done = False + while not done: + for item in response.content: + items.append(item) + if 'partial' in response.properties: + response = self.reply_rx.fetch(10) + else: + done = True + self.sess.acknowledge() + if len(items) == 1: + return items[0] + return None + + def _getAllBrokerObjects(self, cls): + items = self._doClassQuery(cls.__name__.lower()) + objs = [] + for item in items: + objs.append(cls(self, item)) + return objs + + def _getBrokerObject(self, cls, name): + obj = self._doNameQuery(cls.__name__.lower(), name) + if obj: + return cls(self, obj) + return None + + def getCluster(self): + return self._getAllBrokerObjects(Cluster) + + def getBroker(self): + return self._getBrokerObject(Broker, "amqp-broker") + + def getMemory(self): + return self._getAllBrokerObjects(Memory)[0] + + def getAllConnections(self): + return self._getAllBrokerObjects(Connection) + + def getConnection(self, name): + return self._getBrokerObject(Connection, name) + + def getAllSessions(self): + return self._getAllBrokerObjects(Session) + + def getSession(self, name): + return self._getBrokerObject(Session, name) + + def getAllSubscriptions(self): + return self._getAllBrokerObjects(Subscription) + + def getSubscription(self, name): + return self._getBrokerObject(Subscription, name) + + def getAllExchanges(self): + return self._getAllBrokerObjects(Exchange) + + def getExchange(self, name): + return self._getBrokerObject(Exchange, name) + + def getAllQueues(self): + return self._getAllBrokerObjects(Queue) + + def getQueue(self, name): + return self._getBrokerObject(Queue, name) + + def getAllBindings(self): + return self._getAllBrokerObjects(Binding) + + def getBinding(self, exchange=None, queue=None): + pass + + def echo(self, sequence, body): + """Request a response to test the path to the management broker""" + pass + + def connect(self, host, port, durable, authMechanism, username, password, transport): + """Establish a connection to another broker""" + pass + + def queueMoveMessages(self, srcQueue, destQueue, qty): + """Move messages from one queue to another""" + pass + + def setLogLevel(self, level): + """Set the log level""" + pass + + def getLogLevel(self): + """Get the log level""" + pass + + def setTimestampConfig(self, receive): + """Set the message timestamping configuration""" + pass + + def getTimestampConfig(self): + """Get the message timestamping configuration""" + pass + +# def addExchange(self, exchange_type, name, **kwargs): +# pass + +# def delExchange(self, name): +# pass + +# def addQueue(self, name, **kwargs): +# pass + +# def delQueue(self, name): +# pass + +# def bind(self, exchange, queue, key, **kwargs): +# pass + +# def unbind(self, exchange, queue, key, **kwargs): +# pass + + def create(self, _type, name, properties, strict): + """Create an object of the specified type""" + pass + + def delete(self, _type, name, options): + """Delete an object of the specified type""" + pass + + def query(self, _type, name): + """Query the current state of an object""" + return self._getBrokerObject(self, _type, name) + + +class BrokerObject(object): + def __init__(self, broker, content): + self.broker = broker + self.content = content + self.values = content['_values'] + + def __getattr__(self, key): + if key not in self.values: + return None + value = self.values[key] + if value.__class__ == dict and '_object_name' in value: + full_name = value['_object_name'] + colon = full_name.find(':') + if colon > 0: + full_name = full_name[colon+1:] + colon = full_name.find(':') + if colon > 0: + return full_name[colon+1:] + return value + + def getAttributes(self): + return self.values + + def getCreateTime(self): + return self.content['_create_ts'] + + def getDeleteTime(self): + return self.content['_delete_ts'] + + def getUpdateTime(self): + return self.content['_update_ts'] + + def update(self): + """ + Reload the property values from the agent. + """ + refreshed = self.broker._getBrokerObject(self.__class__, self.name) + if refreshed: + self.content = refreshed.content + self.values = self.content['_values'] + else: + raise Exception("No longer exists on the broker") + +class Broker(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Memory(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Connection(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def close(self): + pass + +class Session(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Subscription(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "subscription name undefined" + +class Exchange(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + +class Binding(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def __repr__(self): + return "Binding key: %s" % self.values['bindingKey'] + +class Queue(BrokerObject): + def __init__(self, broker, values): + BrokerObject.__init__(self, broker, values) + + def purge(self, request): + """Discard all or some messages on a queue""" + self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name) + + def reroute(self, request, useAltExchange, exchange, filter={}): + """Remove all or some messages on this queue and route them to an exchange""" + self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter}, + "org.apache.qpid.broker:queue:%s" % self.name) + diff --git a/tools/src/py/qpidtoollibs/disp.py b/tools/src/py/qpidtoollibs/disp.py new file mode 100644 index 0000000000..cb7d3da306 --- /dev/null +++ b/tools/src/py/qpidtoollibs/disp.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from time import strftime, gmtime + +class Header: + """ """ + NONE = 1 + KMG = 2 + YN = 3 + Y = 4 + TIME_LONG = 5 + TIME_SHORT = 6 + DURATION = 7 + COMMAS = 8 + + def __init__(self, text, format=NONE): + self.text = text + self.format = format + + def __repr__(self): + return self.text + + def __str__(self): + return self.text + + def formatted(self, value): + try: + if value == None: + return '' + if self.format == Header.NONE: + return value + if self.format == Header.KMG: + return self.num(value) + if self.format == Header.YN: + if value: + return 'Y' + return 'N' + if self.format == Header.Y: + if value: + return 'Y' + return '' + if self.format == Header.TIME_LONG: + return strftime("%c", gmtime(value / 1000000000)) + if self.format == Header.TIME_SHORT: + return strftime("%X", gmtime(value / 1000000000)) + if self.format == Header.DURATION: + if value < 0: value = 0 + sec = value / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + if self.format == Header.COMMAS: + sval = str(value) + result = "" + while True: + if len(sval) == 0: + return result + left = sval[:-3] + right = sval[-3:] + result = right + result + if len(left) > 0: + result = ',' + result + sval = left + except: + return "?" + + def numCell(self, value, tag): + fp = float(value) / 1000. + if fp < 10.0: + return "%1.2f%c" % (fp, tag) + if fp < 100.0: + return "%2.1f%c" % (fp, tag) + return "%4d%c" % (value / 1000, tag) + + def num(self, value): + if value < 1000: + return "%4d" % value + if value < 1000000: + return self.numCell(value, 'k') + value /= 1000 + if value < 1000000: + return self.numCell(value, 'm') + value /= 1000 + return self.numCell(value, 'g') + + +class Display: + """ Display formatting for QPID Management CLI """ + + def __init__(self, spacing=2, prefix=" "): + self.tableSpacing = spacing + self.tablePrefix = prefix + self.timestampFormat = "%X" + + def formattedTable(self, title, heads, rows): + fRows = [] + for row in rows: + fRow = [] + col = 0 + for cell in row: + fRow.append(heads[col].formatted(cell)) + col += 1 + fRows.append(fRow) + headtext = [] + for head in heads: + headtext.append(head.text) + self.table(title, headtext, fRows) + + def table(self, title, heads, rows): + """ Print a table with autosized columns """ + + # Pad the rows to the number of heads + for row in rows: + diff = len(heads) - len(row) + for idx in range(diff): + row.append("") + + print title + if len (rows) == 0: + return + colWidth = [] + col = 0 + line = self.tablePrefix + for head in heads: + width = len (head) + for row in rows: + cellWidth = len (unicode (row[col])) + if cellWidth > width: + width = cellWidth + colWidth.append (width + self.tableSpacing) + line = line + head + if col < len (heads) - 1: + for i in range (colWidth[col] - len (head)): + line = line + " " + col = col + 1 + print line + line = self.tablePrefix + for width in colWidth: + for i in range (width): + line = line + "=" + print line + + for row in rows: + line = self.tablePrefix + col = 0 + for width in colWidth: + line = line + unicode (row[col]) + if col < len (heads) - 1: + for i in range (width - len (unicode (row[col]))): + line = line + " " + col = col + 1 + print line + + def do_setTimeFormat (self, fmt): + """ Select timestamp format """ + if fmt == "long": + self.timestampFormat = "%c" + elif fmt == "short": + self.timestampFormat = "%X" + + def timestamp (self, nsec): + """ Format a nanosecond-since-the-epoch timestamp for printing """ + return strftime (self.timestampFormat, gmtime (nsec / 1000000000)) + + def duration(self, nsec): + if nsec < 0: nsec = 0 + sec = nsec / 1000000000 + min = sec / 60 + hour = min / 60 + day = hour / 24 + result = "" + if day > 0: + result = "%dd " % day + if hour > 0 or result != "": + result += "%dh " % (hour % 24) + if min > 0 or result != "": + result += "%dm " % (min % 60) + result += "%ds" % (sec % 60) + return result + +class Sortable: + """ """ + def __init__(self, row, sortIndex): + self.row = row + self.sortIndex = sortIndex + if sortIndex >= len(row): + raise Exception("sort index exceeds row boundary") + + def __cmp__(self, other): + return cmp(self.row[self.sortIndex], other.row[self.sortIndex]) + + def getRow(self): + return self.row + +class Sorter: + """ """ + def __init__(self, heads, rows, sortCol, limit=0, inc=True): + col = 0 + for head in heads: + if head.text == sortCol: + break + col += 1 + if col == len(heads): + raise Exception("sortCol '%s', not found in headers" % sortCol) + + list = [] + for row in rows: + list.append(Sortable(row, col)) + list.sort() + if not inc: + list.reverse() + count = 0 + self.sorted = [] + for row in list: + self.sorted.append(row.getRow()) + count += 1 + if count == limit: + break + + def getSorted(self): + return self.sorted |
