summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/qpid/sys/MemStat.h38
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/Makefile.am2
-rw-r--r--cpp/src/qpid/broker/Broker.cpp1
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp10
-rw-r--r--cpp/src/qpid/broker/Exchange.h2
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp1
-rw-r--r--cpp/src/qpid/broker/Queue.cpp146
-rw-r--r--cpp/src/qpid/broker/Queue.h16
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.cpp8
-rw-r--r--cpp/src/qpid/broker/QueuePolicy.h6
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp12
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h7
-rw-r--r--cpp/src/qpid/sys/posix/MemStat.cpp38
-rw-r--r--cpp/src/qpid/sys/windows/MemStat.cpp29
-rw-r--r--java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java288
-rw-r--r--java/broker/src/xsl/qmf.xsl20
-rw-r--r--specs/management-schema.xml75
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/__init__.py1
-rw-r--r--tests/src/py/qpid_tests/broker_0_10/stats.py489
-rwxr-xr-xtools/setup.py19
-rwxr-xr-xtools/src/py/qpid-stat452
-rw-r--r--tools/src/py/qpidtoollibs/__init__.py18
-rw-r--r--tools/src/py/qpidtoollibs/broker.py322
-rw-r--r--tools/src/py/qpidtoollibs/disp.py249
27 files changed, 1976 insertions, 278 deletions
diff --git a/cpp/include/qpid/sys/MemStat.h b/cpp/include/qpid/sys/MemStat.h
new file mode 100644
index 0000000000..d855786cd5
--- /dev/null
+++ b/cpp/include/qpid/sys/MemStat.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef sys_MemStat
+#define sys_MemStat
+
+#include "qpid/CommonImportExport.h"
+#include "qmf/org/apache/qpid/broker/Memory.h"
+
+namespace qpid {
+namespace sys {
+
+ class QPID_COMMON_CLASS_EXTERN MemStat {
+ public:
+ QPID_COMMON_EXTERN static void loadMemInfo(qmf::org::apache::qpid::broker::Memory* object);
+ };
+
+}}
+
+#endif
+
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index 41d3cec1f6..1a84f5e79a 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -669,6 +669,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/SystemInfo.cpp
qpid/sys/windows/Thread.cpp
qpid/sys/windows/Time.cpp
+ qpid/sys/windows/MemStat.cpp
qpid/client/windows/SaslFactory.cpp
${sslcommon_windows_SOURCES}
)
@@ -740,6 +741,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/posix/FileSysDir.cpp
qpid/sys/posix/IOHandle.cpp
qpid/sys/posix/LockFile.cpp
+ qpid/sys/posix/MemStat.cpp
qpid/sys/posix/Mutex.cpp
qpid/sys/posix/PipeHandle.cpp
qpid/sys/posix/PollableCondition.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index fb26251da0..9533e37565 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -53,6 +53,7 @@ windows_dist = \
../include/qpid/sys/windows/Time.h \
qpid/sys/windows/uuid.cpp \
qpid/sys/windows/uuid.h \
+ qpid/sys/windows/MemStat.cpp \
windows/QpiddBroker.cpp \
windows/SCM.h \
windows/SCM.cpp \
@@ -163,6 +164,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp \
qpid/sys/posix/Shlib.cpp \
+ qpid/sys/posix/MemStat.cpp \
qpid/sys/posix/Mutex.cpp \
qpid/sys/posix/Fork.cpp \
qpid/sys/posix/StrError.cpp \
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 89532ae256..ff6da087c3 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -194,6 +194,7 @@ Broker::Broker(const Broker::Options& conf) :
conf.replayFlushLimit*1024, // convert kb to bytes.
conf.replayHardLimit*1024),
*this),
+ mgmtObject(0),
queueCleaner(queues, &timer),
queueEvents(poller,!conf.asyncQueueEvents),
recovery(true),
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 0b8fe95d5e..adc145dc84 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -142,6 +142,7 @@ void DeliveryRecord::reject()
//just drop it
QPID_LOG(info, "Dropping rejected message from " << queue->getName());
}
+ queue->countRejected();
dequeue();
setEnded();
}
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 5d763bf0da..ecaa492903 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -142,6 +142,8 @@ void Exchange::doRoute(Deliverable& msg, ConstBindingList b)
//QPID_LOG(warning, "Exchange " << getName() << " could not route message; no matching binding found");
mgmtExchange->inc_msgDrops ();
mgmtExchange->inc_byteDrops (msg.contentSize ());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsNoRoute();
}
else
{
@@ -161,7 +163,7 @@ void Exchange::routeIVE(){
Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
name(_name), durable(false), persistenceId(0), sequence(false),
- sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+ sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
@@ -172,6 +174,8 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
mgmtExchange->set_durable(durable);
mgmtExchange->set_autoDelete(false);
agent->addObject(mgmtExchange, 0, durable);
+ if (broker)
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
}
}
}
@@ -179,7 +183,7 @@ Exchange::Exchange (const string& _name, Manageable* parent, Broker* b) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent, Broker* b)
: name(_name), durable(_durable), alternateUsers(0), persistenceId(0),
- args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), broker(b), destroyed(false)
+ args(_args), sequence(false), sequenceNo(0), ive(false), mgmtExchange(0), brokerMgmtObject(0), broker(b), destroyed(false)
{
if (parent != 0 && broker != 0)
{
@@ -191,6 +195,8 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
mgmtExchange->set_autoDelete(false);
mgmtExchange->set_arguments(ManagementAgent::toMap(args));
agent->addObject(mgmtExchange, 0, durable);
+ if (broker)
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
}
}
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index b12af9a1dd..9179dd5c7c 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -32,6 +32,7 @@
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Exchange.h"
#include "qmf/org/apache/qpid/broker/Binding.h"
+#include "qmf/org/apache/qpid/broker/Broker.h"
namespace qpid {
namespace broker {
@@ -158,6 +159,7 @@ protected:
};
qmf::org::apache::qpid::broker::Exchange* mgmtExchange;
+ qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
public:
typedef boost::shared_ptr<Exchange> shared_ptr;
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 4bda70d313..142c23f276 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -200,6 +200,8 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
mgmtExchange->inc_byteReceives(msg.contentSize());
mgmtExchange->inc_msgDrops();
mgmtExchange->inc_byteDrops(msg.contentSize());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsNoRoute();
}
return;
}
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index d13109dad1..ae4503328a 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -270,6 +270,7 @@ void Message::sendContent(const Queue& queue, framing::FrameHandler& out, uint16
morecontent = getContentFrame(queue, frame, maxContentSize, offset);
out.handle(frame);
}
+ queue.countLoadedFromDisk(contentSize());
} else {
Count c;
frames.map_if(c, TypeFilter<CONTENT_BODY>());
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index dd23760922..969d510e26 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -109,6 +109,7 @@ Queue::Queue(const string& _name, bool _autodelete,
persistenceId(0),
policyExceeded(false),
mgmtObject(0),
+ brokerMgmtObject(0),
eventMode(0),
insertSeqNo(0),
broker(b),
@@ -123,14 +124,20 @@ Queue::Queue(const string& _name, bool _autodelete,
if (agent != 0) {
mgmtObject = new _qmf::Queue(agent, this, parent, _name, _store != 0, _autodelete, _owner != 0);
agent->addObject(mgmtObject, 0, store != 0);
+ brokerMgmtObject = (qmf::org::apache::qpid::broker::Broker*) broker->GetManagementObject();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_queueCount();
}
}
}
Queue::~Queue()
{
- if (mgmtObject != 0)
+ if (mgmtObject != 0) {
mgmtObject->resourceDestroy();
+ if (brokerMgmtObject)
+ brokerMgmtObject->dec_queueCount();
+ }
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -204,6 +211,10 @@ void Queue::process(boost::intrusive_ptr<Message>& msg){
if (mgmtObject != 0){
mgmtObject->inc_msgTxnEnqueues ();
mgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgTxnEnqueues ();
+ brokerMgmtObject->inc_byteTxnEnqueues (msg->contentSize ());
+ }
}
}
@@ -221,7 +232,13 @@ void Queue::requeue(const QueuedMessage& msg){
if (alternateExchange.get()) {
DeliverableMessage dmsg(msg.payload);
alternateExchange->routeWithAlternate(dmsg);
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
}
+ mgntDeqStats(msg.payload);
} else {
messages->reinsert(msg);
listeners.populate(copy);
@@ -234,8 +251,8 @@ void Queue::requeue(const QueuedMessage& msg){
enqueue(0, payload);
}
}
+ observeRequeue(msg, locker);
}
- observeRequeue(msg, locker);
}
copy.notify();
}
@@ -323,6 +340,11 @@ Queue::ConsumeCode Queue::consumeNextMessage(QueuedMessage& m, Consumer::shared_
c->setPosition(msg.position);
acquire( msg.position, msg, locker);
dequeue( 0, msg );
+ if (mgmtObject) {
+ mgmtObject->inc_discardsTtl();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl();
+ }
continue;
}
@@ -504,6 +526,15 @@ void Queue::purgeExpired(qpid::sys::Duration lapse)
messages->removeIf(boost::bind(&collect_if_expired, boost::ref(expired), _1));
}
+ //
+ // Report the count of discarded-by-ttl messages
+ //
+ if (mgmtObject && !expired.empty()) {
+ mgmtObject->inc_discardsTtl(expired.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsTtl(expired.size());
+ }
+
for (std::deque<QueuedMessage>::const_iterator i = expired.begin();
i != expired.end(); ++i) {
{
@@ -638,6 +669,19 @@ uint32_t Queue::purge(const uint32_t purge_request, boost::shared_ptr<Exchange>
Mutex::ScopedLock locker(messageLock);
messages->removeIf( boost::bind<bool>(boost::ref(c), _1) );
+
+ if (mgmtObject && !c.matches.empty()) {
+ if (dest.get()) {
+ mgmtObject->inc_reroutes(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_reroutes(c.matches.size());
+ } else {
+ mgmtObject->inc_discardsPurge(c.matches.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsPurge(c.matches.size());
+ }
+ }
+
for (std::deque<QueuedMessage>::iterator qmsg = c.matches.begin();
qmsg != c.matches.end(); ++qmsg) {
// Update observers and message state:
@@ -710,8 +754,14 @@ void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){
if (insertSeqNo) msg->insertCustomProperty(seqNoKey, sequence);
dequeueRequired = messages->push(qm, removed);
- if (dequeueRequired)
+ if (dequeueRequired) {
observeAcquire(removed, locker);
+ if (mgmtObject) {
+ mgmtObject->inc_discardsLvq();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsLvq();
+ }
+ }
listeners.populate(copy);
observeEnqueue(qm, locker);
}
@@ -799,10 +849,30 @@ bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message>& msg
std::deque<QueuedMessage> dequeues;
{
Mutex::ScopedLock locker(messageLock);
- policy->tryEnqueue(msg);
+ try {
+ policy->tryEnqueue(msg);
+ } catch(ResourceLimitExceededException&) {
+ if (mgmtObject) {
+ mgmtObject->inc_discardsOverflow();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsOverflow();
+ }
+ throw;
+ }
policy->getPendingDequeues(dequeues);
}
//depending on policy, may have some dequeues that need to performed without holding the lock
+
+ //
+ // Count the dequeues as ring-discards. We know that these aren't rejects because
+ // policy->tryEnqueue would have thrown an exception.
+ //
+ if (mgmtObject && !dequeues.empty()) {
+ mgmtObject->inc_discardsRing(dequeues.size());
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsRing(dequeues.size());
+ }
+
for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1));
}
@@ -871,6 +941,10 @@ void Queue::dequeueCommitted(const QueuedMessage& msg)
if (mgmtObject != 0) {
mgmtObject->inc_msgTxnDequeues();
mgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgTxnDequeues();
+ brokerMgmtObject->inc_byteTxnDequeues(msg.payload->contentSize());
+ }
}
}
@@ -893,8 +967,8 @@ void Queue::popAndDequeue(const Mutex::ScopedLock& held)
*/
void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
- if (policy.get()) policy->dequeued(msg);
mgntDeqStats(msg.payload);
+ if (policy.get()) policy->dequeued(msg);
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->dequeued(msg);
@@ -909,6 +983,12 @@ void Queue::observeDequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
*/
void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
+ if (mgmtObject) {
+ mgmtObject->inc_acquires();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_acquires();
+ }
+
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->acquired(msg);
@@ -923,6 +1003,12 @@ void Queue::observeAcquire(const QueuedMessage& msg, const Mutex::ScopedLock&)
*/
void Queue::observeRequeue(const QueuedMessage& msg, const Mutex::ScopedLock&)
{
+ if (mgmtObject) {
+ mgmtObject->inc_releases();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_releases();
+ }
+
for (Observers::const_iterator i = observers.begin(); i != observers.end(); ++i) {
try{
(*i)->requeued(msg);
@@ -1079,14 +1165,22 @@ void Queue::configureImpl(const FieldTable& _settings)
void Queue::destroyed()
{
unbind(broker->getExchanges());
- if (alternateExchange.get()) {
+ {
Mutex::ScopedLock locker(messageLock);
while(!messages->empty()){
DeliverableMessage msg(messages->front().payload);
- alternateExchange->routeWithAlternate(msg);
+ if (alternateExchange.get()) {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandonedViaAlt();
+ alternateExchange->routeWithAlternate(msg);
+ } else {
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_abandoned();
+ }
popAndDequeue(locker);
}
- alternateExchange->decAlternateUsers();
+ if (alternateExchange.get())
+ alternateExchange->decAlternateUsers();
}
if (store) {
@@ -1124,6 +1218,8 @@ void Queue::unbind(ExchangeRegistry& exchanges)
void Queue::setPolicy(std::auto_ptr<QueuePolicy> _policy)
{
policy = _policy;
+ if (policy.get())
+ policy->setQueue(this);
}
const QueuePolicy* Queue::getPolicy()
@@ -1291,6 +1387,40 @@ void Queue::setExternalQueueStore(ExternalQueueStore* inst) {
}
}
+void Queue::countRejected() const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_discardsSubscriber();
+ if (brokerMgmtObject)
+ brokerMgmtObject->inc_discardsSubscriber();
+ }
+}
+
+void Queue::countFlowedToDisk(uint64_t size) const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_msgFtdEnqueues();
+ mgmtObject->inc_byteFtdEnqueues(size);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgFtdEnqueues();
+ brokerMgmtObject->inc_byteFtdEnqueues(size);
+ }
+ }
+}
+
+void Queue::countLoadedFromDisk(uint64_t size) const
+{
+ if (mgmtObject) {
+ mgmtObject->inc_msgFtdDequeues();
+ mgmtObject->inc_byteFtdDequeues(size);
+ if (brokerMgmtObject) {
+ brokerMgmtObject->inc_msgFtdDequeues();
+ brokerMgmtObject->inc_byteFtdDequeues(size);
+ }
+ }
+}
+
+
ManagementObject* Queue::GetManagementObject (void) const
{
return (ManagementObject*) mgmtObject;
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 59ae41e768..5eca1e9b0c 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -39,6 +39,7 @@
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
+#include "qmf/org/apache/qpid/broker/Broker.h"
#include "qpid/framing/amqp_types.h"
#include <boost/shared_ptr.hpp>
@@ -92,7 +93,6 @@ class Queue : public boost::enable_shared_from_this<Queue>,
typedef std::set< boost::shared_ptr<QueueObserver> > Observers;
enum ConsumeCode {NO_MESSAGES=0, CANT_CONSUME=1, CONSUMED=2};
-
const std::string name;
const bool autodelete;
MessageStore* store;
@@ -119,6 +119,7 @@ class Queue : public boost::enable_shared_from_this<Queue>,
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
+ qmf::org::apache::qpid::broker::Broker* brokerMgmtObject;
sys::AtomicValue<uint32_t> dequeueSincePurge; // Count dequeues since last purge.
int eventMode;
Observers observers;
@@ -165,9 +166,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
if (mgmtObject != 0) {
mgmtObject->inc_msgTotalEnqueues ();
mgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
+ brokerMgmtObject->inc_msgTotalEnqueues ();
+ brokerMgmtObject->inc_byteTotalEnqueues (msg->contentSize ());
if (msg->isPersistent ()) {
mgmtObject->inc_msgPersistEnqueues ();
mgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
+ brokerMgmtObject->inc_msgPersistEnqueues ();
+ brokerMgmtObject->inc_bytePersistEnqueues (msg->contentSize ());
}
}
}
@@ -176,9 +181,13 @@ class Queue : public boost::enable_shared_from_this<Queue>,
if (mgmtObject != 0){
mgmtObject->inc_msgTotalDequeues ();
mgmtObject->inc_byteTotalDequeues (msg->contentSize());
+ brokerMgmtObject->inc_msgTotalDequeues ();
+ brokerMgmtObject->inc_byteTotalDequeues (msg->contentSize());
if (msg->isPersistent ()){
mgmtObject->inc_msgPersistDequeues ();
mgmtObject->inc_bytePersistDequeues (msg->contentSize());
+ brokerMgmtObject->inc_msgPersistDequeues ();
+ brokerMgmtObject->inc_bytePersistDequeues (msg->contentSize());
}
}
}
@@ -355,6 +364,11 @@ class Queue : public boost::enable_shared_from_this<Queue>,
virtual void setExternalQueueStore(ExternalQueueStore* inst);
+ // Increment the rejected-by-consumer counter.
+ void countRejected() const;
+ void countFlowedToDisk(uint64_t size) const;
+ void countLoadedFromDisk(uint64_t size) const;
+
// Manageable entry points
management::ManagementObject* GetManagementObject (void) const;
management::Manageable::status_t
diff --git a/cpp/src/qpid/broker/QueuePolicy.cpp b/cpp/src/qpid/broker/QueuePolicy.cpp
index dafcf92a63..d5b4c1ae86 100644
--- a/cpp/src/qpid/broker/QueuePolicy.cpp
+++ b/cpp/src/qpid/broker/QueuePolicy.cpp
@@ -31,7 +31,7 @@ using namespace qpid::broker;
using namespace qpid::framing;
QueuePolicy::QueuePolicy(const std::string& _name, uint32_t _maxCount, uint64_t _maxSize, const std::string& _type) :
- maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), name(_name) {
+ maxCount(_maxCount), maxSize(_maxSize), type(_type), count(0), size(0), policyExceeded(false), queue(0), name(_name) {
QPID_LOG(info, "Queue \"" << name << "\": Policy created: type=" << type << "; maxCount=" << maxCount << "; maxSize=" << maxSize);
}
@@ -204,7 +204,11 @@ FlowToDiskPolicy::FlowToDiskPolicy(const std::string& _name, uint32_t _maxCount,
bool FlowToDiskPolicy::checkLimit(boost::intrusive_ptr<Message> m)
{
- if (!QueuePolicy::checkLimit(m)) m->requestContentRelease();
+ if (!QueuePolicy::checkLimit(m)) {
+ m->requestContentRelease();
+ if (queue)
+ queue->countFlowedToDisk(m->contentSize());
+ }
return true;
}
diff --git a/cpp/src/qpid/broker/QueuePolicy.h b/cpp/src/qpid/broker/QueuePolicy.h
index ec7f846704..f23b709f18 100644
--- a/cpp/src/qpid/broker/QueuePolicy.h
+++ b/cpp/src/qpid/broker/QueuePolicy.h
@@ -33,6 +33,8 @@
namespace qpid {
namespace broker {
+class Queue;
+
class QueuePolicy
{
static uint64_t defaultMaxSize;
@@ -44,8 +46,8 @@ class QueuePolicy
uint64_t size;
bool policyExceeded;
-
protected:
+ Queue* queue;
uint64_t getCurrentQueueSize() const { return size; }
public:
@@ -72,6 +74,8 @@ class QueuePolicy
void decode ( framing::Buffer& buffer );
uint32_t encodedSize() const;
virtual void getPendingDequeues(Messages& result);
+ std::string getType() const { return type; }
+ void setQueue(Queue* q) { queue = q; }
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, const qpid::framing::FieldTable& settings);
static QPID_BROKER_EXTERN std::auto_ptr<QueuePolicy> createQueuePolicy(const std::string& name, uint32_t maxCount, uint64_t maxSize, const std::string& type = REJECT);
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index ff5271f83b..741ef442b0 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -122,7 +122,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
qmf1Support(qmfV1), qmf2Support(qmfV2), maxReplyObjs(100),
- msgBuffer(MA_BUFFER_SIZE)
+ msgBuffer(MA_BUFFER_SIZE), memstat(0)
{
nextObjectId = 1;
brokerBank = 1;
@@ -132,6 +132,9 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
clientWasAdded = false;
attrMap["_vendor"] = defaultVendorName;
attrMap["_product"] = defaultProductName;
+
+ memstat = new qmf::org::apache::qpid::broker::Memory(this, 0, "amqp-broker");
+ addObject(memstat, "amqp-broker");
}
ManagementAgent::~ManagementAgent ()
@@ -720,6 +723,7 @@ void ManagementAgent::periodicProcessing (void)
static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
moveNewObjectsLH();
+ qpid::sys::MemStat::loadMemInfo(memstat);
//
// Clear the been-here flag on all objects in the map.
@@ -1834,6 +1838,9 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
string className (value->get<string>());
std::list<ObjectId>matches;
+ if (className == "memory")
+ qpid::sys::MemStat::loadMemInfo(memstat);
+
// build up a set of all objects to be dumped
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
@@ -1946,6 +1953,8 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
packageName = s_iter->second.asString();
}
+ if (className == "memory")
+ qpid::sys::MemStat::loadMemInfo(memstat);
/*
* Unpack the _object_id element of the query if it is present. If it is present, find that one
@@ -1968,6 +1977,7 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
Variant::Map values;
Variant::Map oidMap;
+ object->writeTimestamps(map_);
object->mapEncodeValues(values, true, true); // write both stats and properties
objId.mapEncode(oidMap);
map_["_values"] = values;
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index c21f384433..f68bfe0577 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -32,6 +32,8 @@
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Agent.h"
+#include "qmf/org/apache/qpid/broker/Memory.h"
+#include "qpid/sys/MemStat.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/FieldValue.h>
@@ -343,6 +345,11 @@ private:
char eventBuffer[MA_BUFFER_SIZE];
framing::ResizableBuffer msgBuffer;
+ //
+ // Memory statistics object
+ //
+ qmf::org::apache::qpid::broker::Memory *memstat;
+
void writeData ();
void periodicProcessing (void);
void deleteObjectNowLH(const ObjectId& oid);
diff --git a/cpp/src/qpid/sys/posix/MemStat.cpp b/cpp/src/qpid/sys/posix/MemStat.cpp
new file mode 100644
index 0000000000..72c53e5886
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/MemStat.cpp
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/MemStat.h"
+#include <malloc.h>
+
+void qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory* object)
+{
+ struct mallinfo info(mallinfo());
+
+ object->set_malloc_arena(info.arena);
+ object->set_malloc_ordblks(info.ordblks);
+ object->set_malloc_hblks(info.hblks);
+ object->set_malloc_hblkhd(info.hblkhd);
+ object->set_malloc_uordblks(info.uordblks);
+ object->set_malloc_fordblks(info.fordblks);
+ object->set_malloc_keepcost(info.keepcost);
+}
+
+
diff --git a/cpp/src/qpid/sys/windows/MemStat.cpp b/cpp/src/qpid/sys/windows/MemStat.cpp
new file mode 100644
index 0000000000..4ad73933ad
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/MemStat.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/MemStat.h"
+
+qpid::sys::MemStat::loadMemInfo(qmf::org::apache::qpid::broker::Memory*)
+{
+ // TODO: Add Windows-specific memory stats to the object and load them here.
+}
+
+
diff --git a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
index 27345f0a88..4367a04da9 100644
--- a/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
+++ b/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
@@ -678,6 +678,204 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return (System.currentTimeMillis() - _obj.getCreateTime()) * 1000000L;
}
+ public Long getQueueCount()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgTotalEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgTotalDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteTotalEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteTotalDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgDepth()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteDepth()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgPersistEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgPersistDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getBytePersistEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getBytePersistDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgTxnEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgTxnDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteTxnEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteTxnDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgFtdEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgFtdDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteFtdEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteFtdDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgFtdDepth()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteFtdDepth()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getReleases()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getAcquires()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsNoRoute()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsTtl()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsRing()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsLvq()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsOverflow()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsSubscriber()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsPurge()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getReroutes()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getAbandoned()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getAbandonedViaAlt()
+ {
+ // TODO
+ return 0L;
+ }
+
public BrokerSchema.BrokerClass.EchoMethodResponseCommand echo(final BrokerSchema.BrokerClass.EchoMethodResponseCommandFactory factory,
final Long sequence,
final String body)
@@ -1064,6 +1262,96 @@ public class QMFService implements ConfigStore.ConfigEventListener, Closeable
return _obj.getPersistentByteDequeues();
}
+ public Long getMsgFtdEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgFtdDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteFtdEnqueues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteFtdDequeues()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getMsgFtdDepth()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getByteFtdDepth()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getReleases()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getAcquires()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsTtl()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsRing()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsLvq()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsOverflow()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsSubscriber()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getDiscardsPurge()
+ {
+ // TODO
+ return 0L;
+ }
+
+ public Long getReroutes()
+ {
+ // TODO
+ return 0L;
+ }
+
public Long getConsumerCount()
{
return (long) _obj.getConsumerCount();
diff --git a/java/broker/src/xsl/qmf.xsl b/java/broker/src/xsl/qmf.xsl
index 1e98c97466..c45d1e419e 100644
--- a/java/broker/src/xsl/qmf.xsl
+++ b/java/broker/src/xsl/qmf.xsl
@@ -794,9 +794,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage
<xsl:when test="$type='objId'">OBJECTREFERENCE</xsl:when>
<xsl:when test="$type='sstr'">STR8</xsl:when>
<xsl:when test="$type='lstr'">STR16</xsl:when>
+<xsl:when test="$type='uint8'">UINT8</xsl:when>
<xsl:when test="$type='uint16'">UINT16</xsl:when>
<xsl:when test="$type='uint32'">UINT32</xsl:when>
<xsl:when test="$type='uint64'">UINT64</xsl:when>
+<xsl:when test="$type='int8'">INT8</xsl:when>
+<xsl:when test="$type='int16'">INT16</xsl:when>
+<xsl:when test="$type='int32'">INT32</xsl:when>
+<xsl:when test="$type='int64'">INT64</xsl:when>
<xsl:when test="$type='uuid'">UUID</xsl:when>
<xsl:when test="$type='deltaTime'">DELTATIME</xsl:when>
<xsl:when test="$type='count32'">UINT32</xsl:when>
@@ -813,9 +818,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage
<xsl:when test="$type='objId'"><xsl:value-of select="$referenceType"/>Object</xsl:when>
<xsl:when test="$type='sstr'">String</xsl:when>
<xsl:when test="$type='lstr'">String</xsl:when>
+ <xsl:when test="$type='uint8'">Integer</xsl:when>
<xsl:when test="$type='uint16'">Integer</xsl:when>
<xsl:when test="$type='uint32'">Long</xsl:when>
<xsl:when test="$type='uint64'">Long</xsl:when>
+ <xsl:when test="$type='int8'">Integer</xsl:when>
+ <xsl:when test="$type='int16'">Integer</xsl:when>
+ <xsl:when test="$type='int32'">Long</xsl:when>
+ <xsl:when test="$type='int64'">Long</xsl:when>
<xsl:when test="$type='uuid'">UUID</xsl:when>
<xsl:when test="$type='deltaTime'">Long</xsl:when>
<xsl:when test="$type='count32'">Long</xsl:when>
@@ -831,9 +841,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage
<xsl:when test="$type='objId'">writeBin128( <xsl:value-of select="$var"/>.getId() )</xsl:when>
<xsl:when test="$type='sstr'">writeStr8( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='lstr'">writeStr16( <xsl:value-of select="$var"/> )</xsl:when>
+ <xsl:when test="$type='uint8'">writeUint8( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='uint16'">writeUint16( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='uint32'">writeUint32( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='uint64'">writeUint64( <xsl:value-of select="$var"/> )</xsl:when>
+ <xsl:when test="$type='int8'">writeInt8( <xsl:value-of select="$var"/> )</xsl:when>
+ <xsl:when test="$type='int16'">writeInt16( <xsl:value-of select="$var"/> )</xsl:when>
+ <xsl:when test="$type='int32'">writeInt32( <xsl:value-of select="$var"/> )</xsl:when>
+ <xsl:when test="$type='int64'">writeInt64( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='uuid'">writeUuid( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='deltaTime'">writeUint64( <xsl:value-of select="$var"/> )</xsl:when>
<xsl:when test="$type='count32'">writeUint32( <xsl:value-of select="$var"/> )</xsl:when>
@@ -849,9 +864,14 @@ public class <xsl:value-of select="$ClassName"/> extends QMFPackage
<xsl:when test="$type='objId'">readBin128()</xsl:when>
<xsl:when test="$type='sstr'">readStr8()</xsl:when>
<xsl:when test="$type='lstr'">readStr16()</xsl:when>
+ <xsl:when test="$type='uint8'">readUint8()</xsl:when>
<xsl:when test="$type='uint16'">readUint16()</xsl:when>
<xsl:when test="$type='uint32'">readUint32()</xsl:when>
<xsl:when test="$type='uint64'">readUint64()</xsl:when>
+ <xsl:when test="$type='int8'">readInt8()</xsl:when>
+ <xsl:when test="$type='int16'">readInt16()</xsl:when>
+ <xsl:when test="$type='int32'">readInt32()</xsl:when>
+ <xsl:when test="$type='int64'">readInt64()</xsl:when>
<xsl:when test="$type='uuid'">readUuid()</xsl:when>
<xsl:when test="$type='deltaTime'">readUint64()</xsl:when>
<xsl:when test="$type='count32'">readUint32()</xsl:when>
diff --git a/specs/management-schema.xml b/specs/management-schema.xml
index 9e2a644c2a..8e3e798af6 100644
--- a/specs/management-schema.xml
+++ b/specs/management-schema.xml
@@ -57,6 +57,22 @@
<!--
===============================================================
+ Memory
+ ===============================================================
+ -->
+ <class name="Memory">
+ <property name="name" type="sstr" access="RC" index="y" desc="Index for the broker at this agent"/>
+ <property name="malloc_arena" type="uint64" access="RO" optional="y" desc="Total size of memory allocated with `sbrk' by `malloc', in bytes"/>
+ <property name="malloc_ordblks" type="uint64" access="RO" optional="y" desc="The number of chunks not in use"/>
+ <property name="malloc_hblks" type="uint64" access="RO" optional="y" desc="Total number of chunks allocated with `mmap'"/>
+ <property name="malloc_hblkhd" type="uint64" access="RO" optional="y" desc="Total size of memory allocated with `mmap', in bytes"/>
+ <property name="malloc_uordblks" type="uint64" access="RO" optional="y" desc="Total size of memory occupied by chunks handed out by `malloc'"/>
+ <property name="malloc_fordblks" type="uint64" access="RO" optional="y" desc="Total size of memory occupied by free (not in use) chunks"/>
+ <property name="malloc_keepcost" type="uint64" access="RO" optional="y" desc="The size of the top-most releasable chunk that normally borders the end of the heap"/>
+ </class>
+
+ <!--
+ ===============================================================
Broker
===============================================================
-->
@@ -73,6 +89,40 @@
<property name="dataDir" type="lstr" access="RO" optional="y" desc="Persistent configuration storage location"/>
<statistic name="uptime" type="deltaTime"/>
+ <statistic name="queueCount" type="count64" unit="queue" desc="Number of queues in the broker"/>
+ <statistic name="msgTotalEnqueues" type="count64" unit="message" desc="Total messages enqueued to broker"/>
+ <statistic name="msgTotalDequeues" type="count64" unit="message" desc="Total messages dequeued from broker"/>
+ <statistic name="byteTotalEnqueues" type="count64" unit="octet" desc="Total bytes enqueued to broker"/>
+ <statistic name="byteTotalDequeues" type="count64" unit="octet" desc="Total bytes dequeued from broker"/>
+ <statistic name="msgDepth" type="count64" unit="message" desc="Current number of messages on queues in broker" assign="msgTotalEnqueues - msgTotalDequeues"/>
+ <statistic name="byteDepth" type="count64" unit="octet" desc="Current number of bytes on queues in broker" assign="byteTotalEnqueues - byteTotalDequeues"/>
+ <statistic name="msgPersistEnqueues" type="count64" unit="message" desc="Total persistent messages enqueued to broker"/>
+ <statistic name="msgPersistDequeues" type="count64" unit="message" desc="Total persistent messages dequeued from broker"/>
+ <statistic name="bytePersistEnqueues" type="count64" unit="octet" desc="Total persistent bytes enqueued to broker"/>
+ <statistic name="bytePersistDequeues" type="count64" unit="octet" desc="Total persistent bytes dequeued from broker"/>
+ <statistic name="msgTxnEnqueues" type="count64" unit="message" desc="Total transactional messages enqueued to broker"/>
+ <statistic name="msgTxnDequeues" type="count64" unit="message" desc="Total transactional messages dequeued from broker"/>
+ <statistic name="byteTxnEnqueues" type="count64" unit="octet" desc="Total transactional bytes enqueued to broker"/>
+ <statistic name="byteTxnDequeues" type="count64" unit="octet" desc="Total transactional bytes dequeued from broker"/>
+ <statistic name="msgFtdEnqueues" type="count64" unit="message" desc="Total message bodies released from memory and flowed-to-disk on broker"/>
+ <statistic name="msgFtdDequeues" type="count64" unit="message" desc="Total message bodies dequeued from the broker having been flowed-to-disk"/>
+ <statistic name="byteFtdEnqueues" type="count64" unit="octet" desc="Total bytes released from memory and flowed-to-disk on broker"/>
+ <statistic name="byteFtdDequeues" type="count64" unit="octet" desc="Total bytes dequeued from the broker having been flowed-to-disk"/>
+ <statistic name="msgFtdDepth" type="count64" unit="message" desc="Current number of messages flowed-to-disk" assign="msgFtdEnqueues - msgFtdDequeues"/>
+ <statistic name="byteFtdDepth" type="count64" unit="octet" desc="Current number of bytes flowed-to-disk" assign="byteFtdEnqueues - byteFtdDequeues"/>
+ <statistic name="releases" type="count64" unit="message" desc="Acquired messages reinserted into the queue"/>
+ <statistic name="acquires" type="count64" unit="message" desc="Messages acquired from the queue"/>
+ <statistic name="discardsNoRoute" type="count64" unit="message" desc="Messages discarded due to no-route from exchange"/>
+ <statistic name="discardsTtl" type="count64" unit="message" desc="Messages discarded due to TTL expiration"/>
+ <statistic name="discardsRing" type="count64" unit="message" desc="Messages discarded due to ring-queue overflow"/>
+ <statistic name="discardsLvq" type="count64" unit="message" desc="Messages discarded due to LVQ insert"/>
+ <statistic name="discardsOverflow" type="count64" unit="message" desc="Messages discarded due to reject-policy overflow"/>
+ <statistic name="discardsSubscriber" type="count64" unit="message" desc="Messages discarded due to subscriber reject"/>
+ <statistic name="discardsPurge" type="count64" unit="message" desc="Messages discarded due to management purge"/>
+ <statistic name="reroutes" type="count64" unit="message" desc="Messages dequeued to management re-route"/>
+ <statistic name="abandoned" type="count64" unit="message" desc="Messages left in a deleted queue"/>
+ <statistic name="abandonedViaAlt" type="count64" unit="message" desc="Messages routed to alternate exchange from a deleted queue"/>
+
<method name="echo" desc="Request a response to test the path to the management broker">
<arg name="sequence" dir="IO" type="uint32" default="0"/>
<arg name="body" dir="IO" type="lstr" default=""/>
@@ -187,6 +237,31 @@
<statistic name="byteTxnDequeues" type="count64" unit="octet" desc="Transactional messages dequeued"/>
<statistic name="bytePersistEnqueues" type="count64" unit="octet" desc="Persistent messages enqueued"/>
<statistic name="bytePersistDequeues" type="count64" unit="octet" desc="Persistent messages dequeued"/>
+
+ <!-- Flow-to-disk Statistics -->
+
+ <statistic name="msgFtdEnqueues" type="count64" unit="message" desc="Total message bodies released from memory and flowed-to-disk on broker"/>
+ <statistic name="msgFtdDequeues" type="count64" unit="message" desc="Total message bodies dequeued from the broker having been flowed-to-disk"/>
+ <statistic name="byteFtdEnqueues" type="count64" unit="octet" desc="Total bytes released from memory and flowed-to-disk on broker"/>
+ <statistic name="byteFtdDequeues" type="count64" unit="octet" desc="Total bytes dequeued from the broker having been flowed-to-disk"/>
+ <statistic name="msgFtdDepth" type="count64" unit="message" desc="Current number of messages flowed-to-disk" assign="msgFtdEnqueues - msgFtdDequeues"/>
+ <statistic name="byteFtdDepth" type="count64" unit="octet" desc="Current number of bytes flowed-to-disk" assign="byteFtdEnqueues - byteFtdDequeues"/>
+
+ <!-- Acquire and Release Statistics - These do not affect msgDepth since msgDepth includes acquired-but-not-completed messages. -->
+
+ <statistic name="releases" type="count64" unit="message" desc="Acquired messages reinserted into the queue"/>
+ <statistic name="acquires" type="count64" unit="message" desc="Messages acquired from the queue"/>
+
+ <!-- Dequeue Details - all of these are included in msgTotalDequeues -->
+
+ <statistic name="discardsTtl" type="count64" unit="message" desc="Messages discarded due to TTL expiration"/>
+ <statistic name="discardsRing" type="count64" unit="message" desc="Messages discarded due to ring-queue overflow"/>
+ <statistic name="discardsLvq" type="count64" unit="message" desc="Messages discarded due to LVQ insert"/>
+ <statistic name="discardsOverflow" type="count64" unit="message" desc="Messages discarded due to reject-policy overflow"/>
+ <statistic name="discardsSubscriber" type="count64" unit="message" desc="Messages discarded due to subscriber reject"/>
+ <statistic name="discardsPurge" type="count64" unit="message" desc="Messages discarded due to management purge"/>
+ <statistic name="reroutes" type="count64" unit="message" desc="Messages dequeued to management re-route"/>
+
<statistic name="consumerCount" type="hilo32" unit="consumer" desc="Current consumers on queue"/>
<statistic name="bindingCount" type="hilo32" unit="binding" desc="Current bindings"/>
<statistic name="unackedMessages" type="hilo32" unit="message" desc="Messages consumed but not yet acked"/>
diff --git a/tests/src/py/qpid_tests/broker_0_10/__init__.py b/tests/src/py/qpid_tests/broker_0_10/__init__.py
index 5b1964ae98..107b34c82b 100644
--- a/tests/src/py/qpid_tests/broker_0_10/__init__.py
+++ b/tests/src/py/qpid_tests/broker_0_10/__init__.py
@@ -35,3 +35,4 @@ from threshold import *
from extensions import *
from msg_groups import *
from new_api import *
+from stats import *
diff --git a/tests/src/py/qpid_tests/broker_0_10/stats.py b/tests/src/py/qpid_tests/broker_0_10/stats.py
new file mode 100644
index 0000000000..f1f809cf2f
--- /dev/null
+++ b/tests/src/py/qpid_tests/broker_0_10/stats.py
@@ -0,0 +1,489 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import *
+from qpid.tests.messaging import Base
+import qmf.console
+from time import sleep
+import os
+import sys
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("../../../../tools/src/py"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import BrokerAgent
+
+#
+# Tests the Broker's statistics reporting
+#
+
+class BrokerStatsTests(Base):
+ """
+ Tests of the broker's statistics
+ """
+
+ def assertEqual(self, left, right, text=None):
+ if not left == right:
+ print "assertEqual failure: %r != %r" % (left, right)
+ if text:
+ print " %r" % text
+ assert None
+
+ def failUnless(self, value, text=None):
+ if value:
+ return
+ print "failUnless failure",
+ if text:
+ print ": %r" % text
+ else:
+ print
+ assert None
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self, tx=False):
+ return self.conn.session(transactional=tx)
+
+ def setup_access(self):
+ return BrokerAgent(self.conn)
+
+
+ def test_enqueues_dequeues(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("enqueue_test;{create:always,delete:always}")
+ rx = sess.receiver("enqueue_test")
+
+ queue = agent.getQueue("enqueue_test")
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
+ self.assertEqual(queue.msgDepth, 0, "msgDepth")
+ self.assertEqual(queue.byteDepth, 0, "byteDepth")
+
+ tx.send("0123456789")
+ tx.send("01234567890123456789")
+ tx.send("012345678901234567890123456789")
+ tx.send("0123456789012345678901234567890123456789")
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 100, "byteTotalEnqueues")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues")
+ self.assertEqual(queue.msgDepth, 4, "msgDepth")
+ self.assertEqual(queue.byteDepth, 100, "byteDepth")
+
+ now_broker = agent.getBroker()
+ self.failUnless((now_broker.msgTotalEnqueues - start_broker.msgTotalEnqueues) >= 4, "broker msgTotalEnqueues")
+ self.failUnless((now_broker.byteTotalEnqueues - start_broker.byteTotalEnqueues) >= 100, "broker byteTotalEnqueues")
+
+ m = rx.fetch()
+ m = rx.fetch()
+ sess.acknowledge()
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues")
+ self.assertEqual(queue.byteTotalEnqueues, 100, "byteTotalEnqueues")
+ self.assertEqual(queue.msgTotalDequeues, 2, "msgTotalDequeues")
+ self.assertEqual(queue.byteTotalDequeues, 30, "byteTotalDequeues")
+ self.assertEqual(queue.msgDepth, 2, "msgDepth")
+ self.assertEqual(queue.byteDepth, 70, "byteDepth")
+
+ now_broker = agent.getBroker()
+ self.failUnless((now_broker.msgTotalDequeues - start_broker.msgTotalDequeues) >= 2, "broker msgTotalDequeues")
+ self.failUnless((now_broker.byteTotalDequeues - start_broker.byteTotalDequeues) >= 30, "broker byteTotalDequeues")
+
+ sess.close()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(now_broker.abandoned - start_broker.abandoned, 2, "expect 2 abandoned messages")
+ self.assertEqual(now_broker.msgDepth, start_broker.msgDepth, "expect broker message depth to be unchanged")
+ self.assertEqual(now_broker.byteDepth, start_broker.byteDepth, "expect broker byte depth to be unchanged")
+
+
+ def test_transactional_enqueues_dequeues(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session(True)
+ tx = sess.sender("tx_enqueue_test;{create:always,delete:always}")
+
+ tx.send("0123456789")
+ tx.send("0123456789")
+ tx.send("0123456789")
+ tx.send("0123456789")
+
+ queue = agent.getQueue("tx_enqueue_test")
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.msgTotalEnqueues, 0, "msgTotalEnqueues pre-tx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 0, "byteTotalEnqueues pre-tx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 0, "msgTxnEnqueues pre-tx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 0, "byteTxnEnqueues pre-tx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-tx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-tx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-tx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-tx-commit")
+
+ sess.commit()
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-tx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues post-tx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-tx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues post-tx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues post-tx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues post-tx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues post-tx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues post-tx-commit")
+
+ sess2 = self.setup_session(True)
+ rx = sess2.receiver("tx_enqueue_test")
+
+ m = rx.fetch()
+ m = rx.fetch()
+ m = rx.fetch()
+ m = rx.fetch()
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues pre-rx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues pre-rx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues pre-rx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues pre-rx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 0, "msgTotalDequeues pre-rx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 0, "byteTotalDequeues pre-rx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 0, "msgTxnDequeues pre-rx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 0, "byteTxnDequeues pre-rx-commit")
+
+ sess2.acknowledge()
+ sess2.commit()
+
+ queue.update()
+ self.assertEqual(queue.msgTotalEnqueues, 4, "msgTotalEnqueues post-rx-commit")
+ self.assertEqual(queue.byteTotalEnqueues, 40, "byteTotalEnqueues post-rx-commit")
+ self.assertEqual(queue.msgTxnEnqueues, 4, "msgTxnEnqueues post-rx-commit")
+ self.assertEqual(queue.byteTxnEnqueues, 40, "byteTxnEnqueues post-rx-commit")
+ self.assertEqual(queue.msgTotalDequeues, 4, "msgTotalDequeues post-rx-commit")
+ self.assertEqual(queue.byteTotalDequeues, 40, "byteTotalDequeues post-rx-commit")
+ self.assertEqual(queue.msgTxnDequeues, 4, "msgTxnDequeues post-rx-commit")
+ self.assertEqual(queue.byteTxnDequeues, 40, "byteTxnDequeues post-rx-commit")
+
+ sess.close()
+ sess2.close()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(now_broker.msgTxnEnqueues - start_broker.msgTxnEnqueues, 4, "broker msgTxnEnqueues")
+ self.assertEqual(now_broker.byteTxnEnqueues - start_broker.byteTxnEnqueues, 40, "broker byteTxnEnqueues")
+ self.assertEqual(now_broker.msgTxnDequeues - start_broker.msgTxnDequeues, 4, "broker msgTxnDequeues")
+ self.assertEqual(now_broker.byteTxnDequeues - start_broker.byteTxnDequeues, 40, "broker byteTxnDequeues")
+
+
+ def test_discards_no_route(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("amq.topic/non.existing.key")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+ tx.send("NO_ROUTE")
+
+ now_broker = agent.getBroker()
+
+ self.assertEqual(now_broker.discardsNoRoute - start_broker.discardsNoRoute, 5, "Expect 5 no-routes")
+
+ sess.close()
+
+
+ def test_abandoned_alt(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("abandon_alt;{create:always,delete:always,node:{x-declare:{alternate-exchange:'amq.fanout'}}}")
+ rx = sess.receiver("abandon_alt")
+ rx.capacity = 2
+
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+ tx.send("ABANDON_ALT")
+
+ rx.fetch()
+
+ sess.close()
+ now_broker = agent.getBroker()
+ self.assertEqual(now_broker.abandonedViaAlt - start_broker.abandonedViaAlt, 5, "Expect 5 abandonedViaAlt")
+ self.assertEqual(now_broker.abandoned - start_broker.abandoned, 0, "Expect 0 abandoned")
+
+
+ def test_discards_ttl(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_ttl;{create:always,delete:always}")
+ msg = Message("TTL")
+ msg.ttl = 1
+
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+ tx.send(msg)
+
+ sleep(2)
+
+ rx = sess.receiver("discards_ttl")
+ try:
+ rx.fetch(0)
+ except:
+ pass
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_ttl")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsTtl, 6, "expect 6 TTL discards on queue")
+ self.assertEqual(now_broker.discardsTtl - start_broker.discardsTtl, 6, "expect 6 TTL discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 6, "expect 6 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_discards_limit_overflow(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_limit;{create:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0}}}}")
+ tx.send("LIMIT")
+ tx.send("LIMIT")
+ tx.send("LIMIT")
+ try:
+ tx.send("LIMIT")
+ self.fail("expected to fail sending 4th message")
+ except:
+ pass
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_limit")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsOverflow, 1, "expect 1 overflow discard on queue")
+ self.assertEqual(now_broker.discardsOverflow - start_broker.discardsOverflow, 1, "expect 1 overflow discard on broker")
+
+ ##
+ ## Shut down and restart the connection to clear the error condition.
+ ##
+ try:
+ self.conn.close()
+ except:
+ pass
+ self.conn = self.setup_connection()
+
+ ##
+ ## Re-create the session to delete the queue.
+ ##
+ sess = self.setup_session()
+ tx = sess.sender("discards_limit;{create:always,delete:always}")
+ sess.close()
+
+
+ def test_discards_ring_overflow(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_ring;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.policy_type':ring}}}}")
+
+ tx.send("RING")
+ tx.send("RING")
+ tx.send("RING")
+ tx.send("RING")
+ tx.send("RING")
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_ring")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsRing, 2, "expect 2 ring discards on queue")
+ self.assertEqual(now_broker.discardsRing - start_broker.discardsRing, 2, "expect 2 ring discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 2, "expect 2 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_discards_lvq_replace(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_lvq;{create:always,delete:always,node:{x-declare:{arguments:{'qpid.max_count':3,'qpid.flow_stop_count':0,'qpid.last_value_queue_key':key}}}}")
+ msgA = Message("LVQ_A")
+ msgA.properties['key'] = 'AAA'
+ msgB = Message("LVQ_B")
+ msgB.properties['key'] = 'BBB'
+
+ tx.send(msgA)
+ tx.send(msgB)
+ tx.send(msgA)
+ tx.send(msgA)
+ tx.send(msgB)
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_lvq")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsLvq, 3, "expect 3 lvq discards on queue")
+ self.assertEqual(now_broker.discardsLvq - start_broker.discardsLvq, 3, "expect 3 lvq discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_discards_reject(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_reject;{create:always,delete:always}")
+ tx.send("REJECT")
+ tx.send("REJECT")
+ tx.send("REJECT")
+
+ rx = sess.receiver("discards_reject")
+ m = rx.fetch()
+ sess.acknowledge()
+ m1 = rx.fetch()
+ m2 = rx.fetch()
+ sess.acknowledge(m1, Disposition(REJECTED))
+ sess.acknowledge(m2, Disposition(REJECTED))
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("discards_reject")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.discardsSubscriber, 2, "expect 2 reject discards on queue")
+ self.assertEqual(now_broker.discardsSubscriber - start_broker.discardsSubscriber, 2, "expect 2 reject discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_message_release(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("message_release;{create:always,delete:always}")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+ tx.send("RELEASE")
+
+ rx = sess.receiver("message_release")
+ m1 = rx.fetch()
+ m2 = rx.fetch()
+ sess.acknowledge(m1, Disposition(RELEASED))
+ sess.acknowledge(m2, Disposition(RELEASED))
+
+ now_broker = agent.getBroker()
+ queue = agent.getQueue("message_release")
+
+ self.failUnless(queue, "expected a valid queue object")
+ self.assertEqual(queue.acquires, 2, "expect 2 acquires on queue")
+ self.failUnless(now_broker.acquires - start_broker.acquires >= 2, "expect at least 2 acquires on broker")
+ self.assertEqual(queue.msgTotalDequeues, 0, "expect 0 total dequeues on queue")
+
+ self.assertEqual(queue.releases, 2, "expect 2 releases on queue")
+ self.failUnless(now_broker.releases - start_broker.releases >= 2, "expect at least 2 releases on broker")
+
+ sess.close()
+
+
+ def test_discards_purge(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("discards_purge;{create:always,delete:always}")
+ tx.send("PURGE")
+ tx.send("PURGE")
+ tx.send("PURGE")
+ tx.send("PURGE")
+ tx.send("PURGE")
+
+ queue = agent.getQueue("discards_purge")
+ self.failUnless(queue, "expected a valid queue object")
+
+ queue.purge(3)
+ queue.update()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(queue.discardsPurge, 3, "expect 3 purge discards on queue")
+ self.assertEqual(now_broker.discardsPurge - start_broker.discardsPurge, 3, "expect 3 purge discards on broker")
+ self.assertEqual(queue.msgTotalDequeues, 3, "expect 3 total dequeues on queue")
+
+ sess.close()
+
+
+ def test_reroutes(self):
+ agent = self.setup_access()
+ start_broker = agent.getBroker()
+
+ sess = self.setup_session()
+ tx = sess.sender("reroute;{create:always,delete:always}")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+ tx.send("REROUTE")
+
+ queue = agent.getQueue("reroute")
+ self.failUnless(queue, "expected a valid queue object")
+
+ queue.reroute(5, False, 'amq.fanout')
+ queue.update()
+
+ now_broker = agent.getBroker()
+ self.assertEqual(queue.reroutes, 5, "expect 5 reroutes on queue")
+ self.assertEqual(now_broker.reroutes - start_broker.reroutes, 5, "expect 5 reroutes on broker")
+ self.assertEqual(queue.msgTotalDequeues, 5, "expect 5 total dequeues on queue")
+
+ sess.close()
+
+
diff --git a/tools/setup.py b/tools/setup.py
index feae4bb1bd..b04bb65c87 100755
--- a/tools/setup.py
+++ b/tools/setup.py
@@ -23,15 +23,16 @@ setup(name="qpid-tools",
version="0.15",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
- scripts=["src/py/qpid-cluster",
- "src/py/qpid-cluster-store",
- "src/py/qpid-config",
- "src/py/qpid-printevents",
- "src/py/qpid-queue-stats",
- "src/py/qpid-route",
- "src/py/qpid-stat",
- "src/py/qpid-tool",
- "src/py/qmf-tool"],
+ packages=["qpidtoollibs"],
+ scripts=["qpid-cluster",
+ "qpid-cluster-store",
+ "qpid-config",
+ "qpid-printevents",
+ "qpid-queue-stats",
+ "qpid-route",
+ "qpid-stat",
+ "qpid-tool",
+ "qmf-tool"],
url="http://qpid.apache.org/",
license="Apache Software License",
description="Diagnostic and management tools for Apache Qpid brokers.")
diff --git a/tools/src/py/qpid-stat b/tools/src/py/qpid-stat
index a7272da3f1..bb094554e6 100755
--- a/tools/src/py/qpid-stat
+++ b/tools/src/py/qpid-stat
@@ -21,13 +21,18 @@
import os
from optparse import OptionParser, OptionGroup
-from time import sleep ### debug
import sys
import locale
import socket
import re
-from qmf.console import Session, Console
-from qpid.disp import Display, Header, Sorter
+from qpid.messaging import Connection
+
+home = os.environ.get("QPID_TOOLS_HOME", os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import BrokerAgent
+from qpidtoollibs.disp import Display, Header, Sorter
+
class Config:
def __init__(self):
@@ -37,7 +42,7 @@ class Config:
self._limit = 50
self._increasing = False
self._sortcol = None
- self._cluster_detail = False
+ self._details = None
self._sasl_mechanism = None
config = Config()
@@ -56,24 +61,16 @@ def OptionsAndArguments(argv):
parser.add_option_group(group1)
group2 = OptionGroup(parser, "Display Options")
- group2.add_option("-b", "--broker", help="Show Brokers",
- action="store_const", const="b", dest="show")
- group2.add_option("-c", "--connections", help="Show Connections",
- action="store_const", const="c", dest="show")
- group2.add_option("-e", "--exchanges", help="Show Exchanges",
- action="store_const", const="e", dest="show")
- group2.add_option("-q", "--queues", help="Show Queues",
- action="store_const", const="q", dest="show")
- group2.add_option("-u", "--subscriptions", help="Show Subscriptions",
- action="store_const", const="u", dest="show")
- group2.add_option("-S", "--sort-by", metavar="<colname>",
- help="Sort by column name")
- group2.add_option("-I", "--increasing", action="store_true", default=False,
- help="Sort by increasing value (default = decreasing)")
- group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>",
- help="Limit output to n rows")
- group2.add_option("-C", "--cluster", action="store_true", default=False,
- help="Display per-broker cluster detail.")
+ group2.add_option("-b", "--broker", help="Show Brokers", action="store_const", const="b", dest="show")
+ group2.add_option("-c", "--connections", help="Show Connections", action="store_const", const="c", dest="show")
+ group2.add_option("-e", "--exchanges", help="Show Exchanges", action="store_const", const="e", dest="show")
+ group2.add_option("-q", "--queues", help="Show Queues", action="store_const", const="q", dest="show")
+ group2.add_option("-u", "--subscriptions", help="Show Subscriptions", action="store_const", const="u", dest="show")
+ group2.add_option("-m", "--memory", help="Show Broker Memory Stats", action="store_const", const="m", dest="show")
+ group2.add_option("-S", "--sort-by", metavar="<colname>", help="Sort by column name")
+ group2.add_option("-I", "--increasing", action="store_true", default=False, help="Sort by increasing value (default = decreasing)")
+ group2.add_option("-L", "--limit", type="int", default=50, metavar="<n>", help="Limit output to n rows")
+ group2.add_option("-D", "--details", action="store", metavar="<name>", dest="detail", default=None, help="Display details on a single object.")
parser.add_option_group(group2)
opts, args = parser.parse_args(args=argv)
@@ -86,8 +83,8 @@ def OptionsAndArguments(argv):
config._connTimeout = opts.timeout
config._increasing = opts.increasing
config._limit = opts.limit
- config._cluster_detail = opts.cluster
config._sasl_mechanism = opts.sasl_mechanism
+ config._detail = opts.detail
if args:
config._host = args[0]
@@ -119,86 +116,26 @@ class IpAddr:
bestAddr = addrPort
return bestAddr
-class Broker(object):
- def __init__(self, qmf, broker):
- self.broker = broker
-
- agents = qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
-
- bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
- self.currentTime = bobj.getTimestamps()[0]
- try:
- self.uptime = bobj.uptime
- except:
- self.uptime = 0
- self.connections = {}
- self.sessions = {}
- self.exchanges = {}
- self.queues = {}
- self.subscriptions = {}
- package = "org.apache.qpid.broker"
-
- list = qmf.getObjects(_class="connection", _package=package, _agent=self.brokerAgent)
- for conn in list:
- if not conn.shadow:
- self.connections[conn.getObjectId()] = conn
-
- list = qmf.getObjects(_class="session", _package=package, _agent=self.brokerAgent)
- for sess in list:
- if sess.connectionRef in self.connections:
- self.sessions[sess.getObjectId()] = sess
-
- list = qmf.getObjects(_class="exchange", _package=package, _agent=self.brokerAgent)
- for exchange in list:
- self.exchanges[exchange.getObjectId()] = exchange
-
- list = qmf.getObjects(_class="queue", _package=package, _agent=self.brokerAgent)
- for queue in list:
- self.queues[queue.getObjectId()] = queue
-
- list = qmf.getObjects(_class="subscription", _package=package, _agent=self.brokerAgent)
- for subscription in list:
- self.subscriptions[subscription.getObjectId()] = subscription
-
- def getName(self):
- return self.broker.getUrl()
-
- def getCurrentTime(self):
- return self.currentTime
-
- def getUptime(self):
- return self.uptime
-
-class BrokerManager(Console):
+class BrokerManager:
def __init__(self):
- self.brokerName = None
- self.qmf = None
- self.broker = None
- self.brokers = []
- self.cluster = None
+ self.brokerName = None
+ self.connections = []
+ self.brokers = []
+ self.cluster = None
def SetBroker(self, brokerUrl, mechanism):
self.url = brokerUrl
- self.qmf = Session()
- self.mechanism = mechanism
- self.broker = self.qmf.addBroker(brokerUrl, config._connTimeout, mechanism)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
+ self.connections.append(Connection(self.url, sasl_mechanism=mechanism))
+ self.connections[0].open()
+ self.brokers.append(BrokerAgent(self.connections[0]))
def Disconnect(self):
""" Release any allocated brokers. Ignore any failures as the tool is
shutting down.
"""
try:
- if self.broker:
- self.qmf.delBroker(self.broker)
- else:
- for b in self.brokers: self.qmf.delBroker(b.broker)
+ for conn in self.connections:
+ conn.close()
except:
pass
@@ -238,62 +175,63 @@ class BrokerManager(Console):
hosts.append(bestUrl)
return hosts
- def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
- if len(subs) == 0:
- return
- this = subs[0]
- remaining = subs[1:]
- newindent = indent + " "
- if this == 'b':
- pass
- elif this == 'c':
- if broker:
- for oid in broker.connections:
- iconn = broker.connections[oid]
- self.printConnSub(indent, broker.getName(), iconn)
- self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
- sess=sess, exchange=exchange, queue=queue)
- elif this == 's':
- pass
- elif this == 'e':
- pass
- elif this == 'q':
- pass
- print
-
def displayBroker(self, subs):
disp = Display(prefix=" ")
heads = []
- heads.append(Header('broker'))
- heads.append(Header('cluster'))
heads.append(Header('uptime', Header.DURATION))
- heads.append(Header('conn', Header.KMG))
- heads.append(Header('sess', Header.KMG))
- heads.append(Header('exch', Header.KMG))
- heads.append(Header('queue', Header.KMG))
+ heads.append(Header('connections', Header.COMMAS))
+ heads.append(Header('sessions', Header.COMMAS))
+ heads.append(Header('exchanges', Header.COMMAS))
+ heads.append(Header('queues', Header.COMMAS))
rows = []
- for broker in self.brokers:
- if self.cluster:
- ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
- else:
- ctext = "<standalone>"
- row = (broker.getName(), ctext, broker.getUptime(),
- len(broker.connections), len(broker.sessions),
- len(broker.exchanges), len(broker.queues))
- rows.append(row)
- title = "Brokers"
- if config._sortcol:
- sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
- dispRows = sorter.getSorted()
- else:
- dispRows = rows
- disp.formattedTable(title, heads, dispRows)
+ broker = self.brokers[0].getBroker()
+ connections = self.getConnectionMap()
+ sessions = self.getSessionMap()
+ exchanges = self.getExchangeMap()
+ queues = self.getQueueMap()
+ row = (broker.getUpdateTime() - broker.getCreateTime(),
+ len(connections), len(sessions),
+ len(exchanges), len(queues))
+ rows.append(row)
+ disp.formattedTable('Broker Summary:', heads, rows)
+
+ if 'queueCount' not in broker.values:
+ return
+
+ print
+ heads = []
+ heads.append(Header('Statistic'))
+ heads.append(Header('Messages', Header.COMMAS))
+ heads.append(Header('Bytes', Header.COMMAS))
+ rows = []
+ rows.append(['queue-depth', broker.msgDepth, broker.byteDepth])
+ rows.append(['total-enqueues', broker.msgTotalEnqueues, broker.byteTotalEnqueues])
+ rows.append(['total-dequeues', broker.msgTotalDequeues, broker.byteTotalDequeues])
+ rows.append(['persistent-enqueues', broker.msgPersistEnqueues, broker.bytePersistEnqueues])
+ rows.append(['persistent-dequeues', broker.msgPersistDequeues, broker.bytePersistDequeues])
+ rows.append(['transactional-enqueues', broker.msgTxnEnqueues, broker.byteTxnEnqueues])
+ rows.append(['transactional-dequeues', broker.msgTxnDequeues, broker.byteTxnDequeues])
+ rows.append(['flow-to-disk-depth', broker.msgFtdDepth, broker.byteFtdDepth])
+ rows.append(['flow-to-disk-enqueues', broker.msgFtdEnqueues, broker.byteFtdEnqueues])
+ rows.append(['flow-to-disk-dequeues', broker.msgFtdDequeues, broker.byteFtdDequeues])
+ rows.append(['acquires', broker.acquires, None])
+ rows.append(['releases', broker.releases, None])
+ rows.append(['discards-no-route', broker.discardsNoRoute, None])
+ rows.append(['discards-ttl-expired', broker.discardsTtl, None])
+ rows.append(['discards-limit-overflow', broker.discardsOverflow, None])
+ rows.append(['discards-ring-overflow', broker.discardsRing, None])
+ rows.append(['discards-lvq-replace', broker.discardsLvq, None])
+ rows.append(['discards-subscriber-reject', broker.discardsSubscriber, None])
+ rows.append(['discards-purged', broker.discardsPurge, None])
+ rows.append(['reroutes', broker.reroutes, None])
+ rows.append(['abandoned', broker.abandoned, None])
+ rows.append(['abandoned-via-alt', broker.abandonedViaAlt, None])
+ disp.formattedTable('Aggregate Broker Statistics:', heads, rows)
+
def displayConn(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
heads.append(Header('client-addr'))
heads.append(Header('cproc'))
heads.append(Header('cpid'))
@@ -303,25 +241,20 @@ class BrokerManager(Console):
heads.append(Header('msgIn', Header.KMG))
heads.append(Header('msgOut', Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.connections:
- conn = broker.connections[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(conn.address)
- row.append(conn.remoteProcessName)
- row.append(conn.remotePid)
- row.append(conn.authIdentity)
- row.append(broker.getCurrentTime() - conn.getTimestamps()[1])
- idle = broker.getCurrentTime() - conn.getTimestamps()[0]
- row.append(broker.getCurrentTime() - conn.getTimestamps()[0])
- row.append(conn.msgsFromClient)
- row.append(conn.msgsToClient)
- rows.append(row)
+ connections = self.brokers[0].getAllConnections()
+ broker = self.brokers[0].getBroker()
+ for conn in connections:
+ row = []
+ row.append(conn.address)
+ row.append(conn.remoteProcessName)
+ row.append(conn.remotePid)
+ row.append(conn.authIdentity)
+ row.append(broker.getUpdateTime() - conn.getCreateTime())
+ row.append(broker.getUpdateTime() - conn.getUpdateTime())
+ row.append(conn.msgsFromClient)
+ row.append(conn.msgsToClient)
+ rows.append(row)
title = "Connections"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -335,8 +268,6 @@ class BrokerManager(Console):
def displayExchange(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
heads.append(Header("exchange"))
heads.append(Header("type"))
heads.append(Header("dur", Header.Y))
@@ -348,26 +279,21 @@ class BrokerManager(Console):
heads.append(Header("byteOut", Header.KMG))
heads.append(Header("byteDrop", Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.exchanges:
- ex = broker.exchanges[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(ex.name)
- row.append(ex.type)
- row.append(ex.durable)
- row.append(ex.bindingCount)
- row.append(ex.msgReceives)
- row.append(ex.msgRoutes)
- row.append(ex.msgDrops)
- row.append(ex.byteReceives)
- row.append(ex.byteRoutes)
- row.append(ex.byteDrops)
- rows.append(row)
+ exchanges = self.brokers[0].getAllExchanges()
+ for ex in exchanges:
+ row = []
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(ex.durable)
+ row.append(ex.bindingCount)
+ row.append(ex.msgReceives)
+ row.append(ex.msgRoutes)
+ row.append(ex.msgDrops)
+ row.append(ex.byteReceives)
+ row.append(ex.byteRoutes)
+ row.append(ex.byteDrops)
+ rows.append(row)
title = "Exchanges"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -375,11 +301,9 @@ class BrokerManager(Console):
dispRows = rows
disp.formattedTable(title, heads, dispRows)
- def displayQueue(self, subs):
+ def displayQueues(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
heads.append(Header("queue"))
heads.append(Header("dur", Header.Y))
heads.append(Header("autoDel", Header.Y))
@@ -393,28 +317,23 @@ class BrokerManager(Console):
heads.append(Header("cons", Header.KMG))
heads.append(Header("bind", Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.queues:
- q = broker.queues[oid]
- row = []
- if self.cluster:
- row.append(broker.getName())
- row.append(q.name)
- row.append(q.durable)
- row.append(q.autoDelete)
- row.append(q.exclusive)
- row.append(q.msgDepth)
- row.append(q.msgTotalEnqueues)
- row.append(q.msgTotalDequeues)
- row.append(q.byteDepth)
- row.append(q.byteTotalEnqueues)
- row.append(q.byteTotalDequeues)
- row.append(q.consumerCount)
- row.append(q.bindingCount)
- rows.append(row)
+ queues = self.brokers[0].getAllQueues()
+ for q in queues:
+ row = []
+ row.append(q.name)
+ row.append(q.durable)
+ row.append(q.autoDelete)
+ row.append(q.exclusive)
+ row.append(q.msgDepth)
+ row.append(q.msgTotalEnqueues)
+ row.append(q.msgTotalDequeues)
+ row.append(q.byteDepth)
+ row.append(q.byteTotalEnqueues)
+ row.append(q.byteTotalDequeues)
+ row.append(q.consumerCount)
+ row.append(q.bindingCount)
+ rows.append(row)
title = "Queues"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -422,46 +341,46 @@ class BrokerManager(Console):
dispRows = rows
disp.formattedTable(title, heads, dispRows)
+ def displayQueue(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+
def displaySubscriptions(self, subs):
disp = Display(prefix=" ")
heads = []
- if self.cluster:
- heads.append(Header('broker'))
- heads.append(Header("subscription"))
+ heads.append(Header("subscr"))
heads.append(Header("queue"))
- heads.append(Header("connection"))
- heads.append(Header("processName"))
- heads.append(Header("processId"))
- heads.append(Header("browsing", Header.Y))
- heads.append(Header("acknowledged", Header.Y))
- heads.append(Header("exclusive", Header.Y))
+ heads.append(Header("conn"))
+ heads.append(Header("procName"))
+ heads.append(Header("procId"))
+ heads.append(Header("browse", Header.Y))
+ heads.append(Header("acked", Header.Y))
+ heads.append(Header("excl", Header.Y))
heads.append(Header("creditMode"))
heads.append(Header("delivered", Header.KMG))
rows = []
- for broker in self.brokers:
- for oid in broker.subscriptions:
- s = broker.subscriptions[oid]
- row = []
- try:
- if self.cluster:
- row.append(broker.getName())
- row.append(s.name)
- row.append(self.qmf.getObjects(_objectId=s.queueRef)[0].name)
- connectionRef = self.qmf.getObjects(_objectId=s.sessionRef)[0].connectionRef
- row.append(self.qmf.getObjects(_objectId=connectionRef)[0].address)
- row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remoteProcessName)
- row.append(self.qmf.getObjects(_objectId=connectionRef)[0].remotePid)
- row.append(s.browsing)
- row.append(s.acknowledged)
- row.append(s.exclusive)
- row.append(s.creditMode)
- row.append(s.delivered)
- rows.append(row)
- except:
- pass
+ subscriptions = self.brokers[0].getAllSubscriptions()
+ sessions = self.getSessionMap()
+ connections = self.getConnectionMap()
+ for s in subscriptions:
+ row = []
+ try:
+ row.append(s.name)
+ row.append(s.queueRef)
+ session = sessions[s.sessionRef]
+ connection = connections[session.connectionRef]
+ row.append(connection.address)
+ row.append(connection.remoteProcessName)
+ row.append(connection.remotePid)
+ row.append(s.browsing)
+ row.append(s.acknowledged)
+ row.append(s.exclusive)
+ row.append(s.creditMode)
+ row.append(s.delivered)
+ rows.append(row)
+ except:
+ pass
title = "Subscriptions"
- if self.cluster:
- title += " for cluster '%s'" % self.cluster.clusterName
if config._sortcol:
sorter = Sorter(heads, rows, config._sortcol, config._limit, config._increasing)
dispRows = sorter.getSorted()
@@ -469,33 +388,58 @@ class BrokerManager(Console):
dispRows = rows
disp.formattedTable(title, heads, dispRows)
+ def displayMemory(self, unused):
+ disp = Display(prefix=" ")
+ heads = [Header('Statistic'), Header('Value', Header.COMMAS)]
+ rows = []
+ memory = self.brokers[0].getMemory()
+ for k,v in memory.values.items():
+ if k != 'name':
+ rows.append([k, v])
+ disp.formattedTable('Broker Memory Statistics:', heads, rows)
+
+ def getExchangeMap(self):
+ exchanges = self.brokers[0].getAllExchanges()
+ emap = {}
+ for e in exchanges:
+ emap[e.name] = e
+ return emap
+
+ def getQueueMap(self):
+ queues = self.brokers[0].getAllQueues()
+ qmap = {}
+ for q in queues:
+ qmap[q.name] = q
+ return qmap
+
+ def getSessionMap(self):
+ sessions = self.brokers[0].getAllSessions()
+ smap = {}
+ for s in sessions:
+ smap[s.name] = s
+ return smap
+
+ def getConnectionMap(self):
+ connections = self.brokers[0].getAllConnections()
+ cmap = {}
+ for c in connections:
+ cmap[c.address] = c
+ return cmap
+
def displayMain(self, main, subs):
if main == 'b': self.displayBroker(subs)
elif main == 'c': self.displayConn(subs)
elif main == 's': self.displaySession(subs)
elif main == 'e': self.displayExchange(subs)
- elif main == 'q': self.displayQueue(subs)
+ elif main == 'q':
+ if config._detail:
+ self.displayQueue(subs, config._detail)
+ else:
+ self.displayQueues(subs)
elif main == 'u': self.displaySubscriptions(subs)
+ elif main == 'm': self.displayMemory(subs)
def display(self):
- if config._cluster_detail or config._types[0] == 'b':
- # always show cluster detail when dumping broker stats
- self._getCluster()
- if self.cluster:
- memberList = self.cluster.members.split(";")
- hostList = self._getHostList(memberList)
- self.qmf.delBroker(self.broker)
- self.broker = None
- if config._host.find("@") > 0:
- authString = config._host.split("@")[0] + "@"
- else:
- authString = ""
- for host in hostList:
- b = self.qmf.addBroker(authString + host, config._connTimeout)
- self.brokers.append(Broker(self.qmf, b))
- else:
- self.brokers.append(Broker(self.qmf, self.broker))
-
self.displayMain(config._types[0], config._types[1:])
diff --git a/tools/src/py/qpidtoollibs/__init__.py b/tools/src/py/qpidtoollibs/__init__.py
new file mode 100644
index 0000000000..31d5a2ef58
--- /dev/null
+++ b/tools/src/py/qpidtoollibs/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
diff --git a/tools/src/py/qpidtoollibs/broker.py b/tools/src/py/qpidtoollibs/broker.py
new file mode 100644
index 0000000000..366d9b0663
--- /dev/null
+++ b/tools/src/py/qpidtoollibs/broker.py
@@ -0,0 +1,322 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.messaging import Message
+try:
+ from uuid import uuid4
+except ImportError:
+ from qpid.datatypes import uuid4
+
+class BrokerAgent(object):
+ def __init__(self, conn):
+ self.conn = conn
+ self.sess = self.conn.session()
+ self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}, link:{x-declare:{auto-delete:True,exclusive:True}}}" % \
+ str(uuid4())
+ self.reply_rx = self.sess.receiver(self.reply_to)
+ self.reply_rx.capacity = 10
+ self.tx = self.sess.sender("qmf.default.direct/broker")
+ self.next_correlator = 1
+
+ def close(self):
+ self.sess.close()
+
+ def _method(self, method, arguments, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
+ props = {'method' : 'request',
+ 'qmf.opcode' : '_method_request',
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+
+ content = {'_object_id' : {'_object_name' : addr},
+ '_method_name' : method,
+ '_arguments' : arguments}
+
+ message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ response = self.reply_rx.fetch(timeout)
+ self.sess.acknowledge()
+ if response.properties['qmf.opcode'] == '_exception':
+ raise Exception("Exception from Agent: %r" % response.content['_values'])
+ if response.properties['qmf.opcode'] != '_method_response':
+ raise Exception("bad response: %r" % response.properties)
+ return response.content['_arguments']
+
+ def _sendRequest(self, opcode, content):
+ props = {'method' : 'request',
+ 'qmf.opcode' : opcode,
+ 'x-amqp-0-10.app-id' : 'qmf2'}
+ correlator = str(self.next_correlator)
+ self.next_correlator += 1
+ message = Message(content, reply_to=self.reply_to, correlation_id=correlator,
+ properties=props, subject="broker")
+ self.tx.send(message)
+ return correlator
+
+ def _doClassQuery(self, class_name):
+ query = {'_what' : 'OBJECT',
+ '_schema_id' : {'_class_name' : class_name}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item)
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ self.sess.acknowledge()
+ return items
+
+ def _doNameQuery(self, class_name, object_name, package_name='org.apache.qpid.broker'):
+ query = {'_what' : 'OBJECT',
+ '_object_id' : {'_object_name' : "%s:%s:%s" % (package_name, class_name, object_name)}}
+ correlator = self._sendRequest('_query_request', query)
+ response = self.reply_rx.fetch(10)
+ if response.properties['qmf.opcode'] != '_query_response':
+ raise Exception("bad response")
+ items = []
+ done = False
+ while not done:
+ for item in response.content:
+ items.append(item)
+ if 'partial' in response.properties:
+ response = self.reply_rx.fetch(10)
+ else:
+ done = True
+ self.sess.acknowledge()
+ if len(items) == 1:
+ return items[0]
+ return None
+
+ def _getAllBrokerObjects(self, cls):
+ items = self._doClassQuery(cls.__name__.lower())
+ objs = []
+ for item in items:
+ objs.append(cls(self, item))
+ return objs
+
+ def _getBrokerObject(self, cls, name):
+ obj = self._doNameQuery(cls.__name__.lower(), name)
+ if obj:
+ return cls(self, obj)
+ return None
+
+ def getCluster(self):
+ return self._getAllBrokerObjects(Cluster)
+
+ def getBroker(self):
+ return self._getBrokerObject(Broker, "amqp-broker")
+
+ def getMemory(self):
+ return self._getAllBrokerObjects(Memory)[0]
+
+ def getAllConnections(self):
+ return self._getAllBrokerObjects(Connection)
+
+ def getConnection(self, name):
+ return self._getBrokerObject(Connection, name)
+
+ def getAllSessions(self):
+ return self._getAllBrokerObjects(Session)
+
+ def getSession(self, name):
+ return self._getBrokerObject(Session, name)
+
+ def getAllSubscriptions(self):
+ return self._getAllBrokerObjects(Subscription)
+
+ def getSubscription(self, name):
+ return self._getBrokerObject(Subscription, name)
+
+ def getAllExchanges(self):
+ return self._getAllBrokerObjects(Exchange)
+
+ def getExchange(self, name):
+ return self._getBrokerObject(Exchange, name)
+
+ def getAllQueues(self):
+ return self._getAllBrokerObjects(Queue)
+
+ def getQueue(self, name):
+ return self._getBrokerObject(Queue, name)
+
+ def getAllBindings(self):
+ return self._getAllBrokerObjects(Binding)
+
+ def getBinding(self, exchange=None, queue=None):
+ pass
+
+ def echo(self, sequence, body):
+ """Request a response to test the path to the management broker"""
+ pass
+
+ def connect(self, host, port, durable, authMechanism, username, password, transport):
+ """Establish a connection to another broker"""
+ pass
+
+ def queueMoveMessages(self, srcQueue, destQueue, qty):
+ """Move messages from one queue to another"""
+ pass
+
+ def setLogLevel(self, level):
+ """Set the log level"""
+ pass
+
+ def getLogLevel(self):
+ """Get the log level"""
+ pass
+
+ def setTimestampConfig(self, receive):
+ """Set the message timestamping configuration"""
+ pass
+
+ def getTimestampConfig(self):
+ """Get the message timestamping configuration"""
+ pass
+
+# def addExchange(self, exchange_type, name, **kwargs):
+# pass
+
+# def delExchange(self, name):
+# pass
+
+# def addQueue(self, name, **kwargs):
+# pass
+
+# def delQueue(self, name):
+# pass
+
+# def bind(self, exchange, queue, key, **kwargs):
+# pass
+
+# def unbind(self, exchange, queue, key, **kwargs):
+# pass
+
+ def create(self, _type, name, properties, strict):
+ """Create an object of the specified type"""
+ pass
+
+ def delete(self, _type, name, options):
+ """Delete an object of the specified type"""
+ pass
+
+ def query(self, _type, name):
+ """Query the current state of an object"""
+ return self._getBrokerObject(self, _type, name)
+
+
+class BrokerObject(object):
+ def __init__(self, broker, content):
+ self.broker = broker
+ self.content = content
+ self.values = content['_values']
+
+ def __getattr__(self, key):
+ if key not in self.values:
+ return None
+ value = self.values[key]
+ if value.__class__ == dict and '_object_name' in value:
+ full_name = value['_object_name']
+ colon = full_name.find(':')
+ if colon > 0:
+ full_name = full_name[colon+1:]
+ colon = full_name.find(':')
+ if colon > 0:
+ return full_name[colon+1:]
+ return value
+
+ def getAttributes(self):
+ return self.values
+
+ def getCreateTime(self):
+ return self.content['_create_ts']
+
+ def getDeleteTime(self):
+ return self.content['_delete_ts']
+
+ def getUpdateTime(self):
+ return self.content['_update_ts']
+
+ def update(self):
+ """
+ Reload the property values from the agent.
+ """
+ refreshed = self.broker._getBrokerObject(self.__class__, self.name)
+ if refreshed:
+ self.content = refreshed.content
+ self.values = self.content['_values']
+ else:
+ raise Exception("No longer exists on the broker")
+
+class Broker(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Memory(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Connection(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def close(self):
+ pass
+
+class Session(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Subscription(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def __repr__(self):
+ return "subscription name undefined"
+
+class Exchange(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+class Binding(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def __repr__(self):
+ return "Binding key: %s" % self.values['bindingKey']
+
+class Queue(BrokerObject):
+ def __init__(self, broker, values):
+ BrokerObject.__init__(self, broker, values)
+
+ def purge(self, request):
+ """Discard all or some messages on a queue"""
+ self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name)
+
+ def reroute(self, request, useAltExchange, exchange, filter={}):
+ """Remove all or some messages on this queue and route them to an exchange"""
+ self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
+ "org.apache.qpid.broker:queue:%s" % self.name)
+
diff --git a/tools/src/py/qpidtoollibs/disp.py b/tools/src/py/qpidtoollibs/disp.py
new file mode 100644
index 0000000000..cb7d3da306
--- /dev/null
+++ b/tools/src/py/qpidtoollibs/disp.py
@@ -0,0 +1,249 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from time import strftime, gmtime
+
+class Header:
+ """ """
+ NONE = 1
+ KMG = 2
+ YN = 3
+ Y = 4
+ TIME_LONG = 5
+ TIME_SHORT = 6
+ DURATION = 7
+ COMMAS = 8
+
+ def __init__(self, text, format=NONE):
+ self.text = text
+ self.format = format
+
+ def __repr__(self):
+ return self.text
+
+ def __str__(self):
+ return self.text
+
+ def formatted(self, value):
+ try:
+ if value == None:
+ return ''
+ if self.format == Header.NONE:
+ return value
+ if self.format == Header.KMG:
+ return self.num(value)
+ if self.format == Header.YN:
+ if value:
+ return 'Y'
+ return 'N'
+ if self.format == Header.Y:
+ if value:
+ return 'Y'
+ return ''
+ if self.format == Header.TIME_LONG:
+ return strftime("%c", gmtime(value / 1000000000))
+ if self.format == Header.TIME_SHORT:
+ return strftime("%X", gmtime(value / 1000000000))
+ if self.format == Header.DURATION:
+ if value < 0: value = 0
+ sec = value / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+ if self.format == Header.COMMAS:
+ sval = str(value)
+ result = ""
+ while True:
+ if len(sval) == 0:
+ return result
+ left = sval[:-3]
+ right = sval[-3:]
+ result = right + result
+ if len(left) > 0:
+ result = ',' + result
+ sval = left
+ except:
+ return "?"
+
+ def numCell(self, value, tag):
+ fp = float(value) / 1000.
+ if fp < 10.0:
+ return "%1.2f%c" % (fp, tag)
+ if fp < 100.0:
+ return "%2.1f%c" % (fp, tag)
+ return "%4d%c" % (value / 1000, tag)
+
+ def num(self, value):
+ if value < 1000:
+ return "%4d" % value
+ if value < 1000000:
+ return self.numCell(value, 'k')
+ value /= 1000
+ if value < 1000000:
+ return self.numCell(value, 'm')
+ value /= 1000
+ return self.numCell(value, 'g')
+
+
+class Display:
+ """ Display formatting for QPID Management CLI """
+
+ def __init__(self, spacing=2, prefix=" "):
+ self.tableSpacing = spacing
+ self.tablePrefix = prefix
+ self.timestampFormat = "%X"
+
+ def formattedTable(self, title, heads, rows):
+ fRows = []
+ for row in rows:
+ fRow = []
+ col = 0
+ for cell in row:
+ fRow.append(heads[col].formatted(cell))
+ col += 1
+ fRows.append(fRow)
+ headtext = []
+ for head in heads:
+ headtext.append(head.text)
+ self.table(title, headtext, fRows)
+
+ def table(self, title, heads, rows):
+ """ Print a table with autosized columns """
+
+ # Pad the rows to the number of heads
+ for row in rows:
+ diff = len(heads) - len(row)
+ for idx in range(diff):
+ row.append("")
+
+ print title
+ if len (rows) == 0:
+ return
+ colWidth = []
+ col = 0
+ line = self.tablePrefix
+ for head in heads:
+ width = len (head)
+ for row in rows:
+ cellWidth = len (unicode (row[col]))
+ if cellWidth > width:
+ width = cellWidth
+ colWidth.append (width + self.tableSpacing)
+ line = line + head
+ if col < len (heads) - 1:
+ for i in range (colWidth[col] - len (head)):
+ line = line + " "
+ col = col + 1
+ print line
+ line = self.tablePrefix
+ for width in colWidth:
+ for i in range (width):
+ line = line + "="
+ print line
+
+ for row in rows:
+ line = self.tablePrefix
+ col = 0
+ for width in colWidth:
+ line = line + unicode (row[col])
+ if col < len (heads) - 1:
+ for i in range (width - len (unicode (row[col]))):
+ line = line + " "
+ col = col + 1
+ print line
+
+ def do_setTimeFormat (self, fmt):
+ """ Select timestamp format """
+ if fmt == "long":
+ self.timestampFormat = "%c"
+ elif fmt == "short":
+ self.timestampFormat = "%X"
+
+ def timestamp (self, nsec):
+ """ Format a nanosecond-since-the-epoch timestamp for printing """
+ return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
+
+ def duration(self, nsec):
+ if nsec < 0: nsec = 0
+ sec = nsec / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
+
+class Sortable:
+ """ """
+ def __init__(self, row, sortIndex):
+ self.row = row
+ self.sortIndex = sortIndex
+ if sortIndex >= len(row):
+ raise Exception("sort index exceeds row boundary")
+
+ def __cmp__(self, other):
+ return cmp(self.row[self.sortIndex], other.row[self.sortIndex])
+
+ def getRow(self):
+ return self.row
+
+class Sorter:
+ """ """
+ def __init__(self, heads, rows, sortCol, limit=0, inc=True):
+ col = 0
+ for head in heads:
+ if head.text == sortCol:
+ break
+ col += 1
+ if col == len(heads):
+ raise Exception("sortCol '%s', not found in headers" % sortCol)
+
+ list = []
+ for row in rows:
+ list.append(Sortable(row, col))
+ list.sort()
+ if not inc:
+ list.reverse()
+ count = 0
+ self.sorted = []
+ for row in list:
+ self.sorted.append(row.getRow())
+ count += 1
+ if count == limit:
+ break
+
+ def getSorted(self):
+ return self.sorted