summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/management
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp79
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h4
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.cpp8
-rw-r--r--cpp/src/qpid/management/ManagementDirectExchange.h4
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.cpp8
-rw-r--r--cpp/src/qpid/management/ManagementTopicExchange.h4
6 files changed, 65 insertions, 42 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 741ef442b0..062a530706 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include <qpid/broker/Message.h>
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/FieldValue.h"
#include "qpid/sys/Time.h"
#include "qpid/sys/Thread.h"
#include "qpid/broker/ConnectionState.h"
@@ -44,14 +45,15 @@
#include <sstream>
#include <typeinfo>
+namespace qpid {
+namespace management {
+
using boost::intrusive_ptr;
using qpid::framing::Uuid;
using qpid::types::Variant;
using qpid::amqp_0_10::MapCodec;
using qpid::amqp_0_10::ListCodec;
-using qpid::sys::Mutex;
using namespace qpid::framing;
-using namespace qpid::management;
using namespace qpid::broker;
using namespace qpid;
using namespace std;
@@ -117,7 +119,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
}
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
- threadPoolSize(1), interval(10), broker(0), timer(0),
+ threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -164,10 +166,11 @@ ManagementAgent::~ManagementAgent ()
}
}
-void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
+void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t _interval,
qpid::broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
+ publish = _publish;
interval = _interval;
broker = _broker;
threadPoolSize = _threads;
@@ -428,16 +431,17 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
- "ManagementAgent::periodicProcessing"),
+ : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
+ "ManagementAgent::periodicProcessing"),
agent(_agent) {}
-ManagementAgent::Periodic::~Periodic () {}
+ManagementAgent::Periodic::~Periodic() {}
-void ManagementAgent::Periodic::fire ()
+void ManagementAgent::Periodic::fire()
{
- agent.timer->add (new Periodic (agent, agent.interval));
- agent.periodicProcessing ();
+ setupNextFire();
+ agent.timer->add(this);
+ agent.periodicProcessing();
}
void ManagementAgent::clientAdded (const string& routingKey)
@@ -562,7 +566,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf,
DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ exchange->route(deliverable);
} catch(exception&) {}
}
buf.reset();
@@ -639,7 +643,7 @@ void ManagementAgent::sendBufferLH(const string& data,
DeliverableMessage deliverable (msg);
try {
- exchange->route(deliverable, routingKey, 0);
+ exchange->route(deliverable);
} catch(exception&) {}
}
}
@@ -719,11 +723,16 @@ void ManagementAgent::periodicProcessing (void)
string routingKey;
string sBuf;
- uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
-
moveNewObjectsLH();
- qpid::sys::MemStat::loadMemInfo(memstat);
+
+ //
+ // If we're publishing updates, get the latest memory statistics and uptime now
+ //
+ if (publish) {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ qpid::sys::MemStat::loadMemInfo(memstat);
+ }
//
// Clear the been-here flag on all objects in the map.
@@ -747,6 +756,14 @@ void ManagementAgent::periodicProcessing (void)
// would incorrectly think the object was deleted. See QPID-2997
//
bool objectsDeleted = moveDeletedObjectsLH();
+
+ //
+ // If we are not publishing updates, just clear the pending deletes. There's no
+ // need to tell anybody.
+ //
+ if (!publish)
+ pendingDeletedObjs.clear();
+
if (!pendingDeletedObjs.empty()) {
// use a temporary copy of the pending deletes so dropping the lock when
// the buffer is sent is safe.
@@ -867,7 +884,9 @@ void ManagementAgent::periodicProcessing (void)
// sendBuffer(). This allows the managementObjects map to be altered during the
// sendBuffer() call, so always restart the search after a sendBuffer() call
//
- while (1) {
+ // If publish is disabled, don't send any updates.
+ //
+ while (publish) {
msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
@@ -1023,10 +1042,9 @@ void ManagementAgent::periodicProcessing (void)
if (objectsDeleted) deleteOrphanedAgentsLH();
- // heartbeat generation
+ // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled.
if (qmf1Support) {
-#define BUFSIZE 65536
uint32_t contentSize;
char msgChars[BUFSIZE];
Buffer msgBuffer(msgChars, BUFSIZE);
@@ -1087,7 +1105,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
Variant::List list_;
stringstream v1key, v2key;
- if (qmf1Support) {
+ if (publish && qmf1Support) {
string sBuf;
v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
@@ -1096,7 +1114,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
msgBuffer.putRawData(sBuf);
}
- if (qmf2Support) {
+ if (publish && qmf2Support) {
Variant::Map map_;
Variant::Map values;
@@ -1121,14 +1139,14 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
// object deleted, ok to drop lock now.
- if (qmf1Support) {
+ if (publish && qmf1Support) {
uint32_t contentSize = msgBuffer.getPosition();
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
}
- if (qmf2Support) {
+ if (publish && qmf2Support) {
Variant::Map headers;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
@@ -1841,6 +1859,12 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
if (className == "memory")
qpid::sys::MemStat::loadMemInfo(memstat);
+ if (className == "broker") {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ }
+
+
// build up a set of all objects to be dumped
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
@@ -1956,6 +1980,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
if (className == "memory")
qpid::sys::MemStat::loadMemInfo(memstat);
+ if (className == "broker") {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ }
+
/*
* Unpack the _object_id element of the query if it is present. If it is present, find that one
* object and return it. If it is not present, send a class-based result.
@@ -2934,9 +2963,6 @@ bool ManagementAgent::moveDeletedObjectsLH() {
return !deleteList.empty();
}
-namespace qpid {
-namespace management {
-
namespace {
QPID_TSS const qpid::broker::ConnectionState* executionContext = 0;
}
@@ -2951,3 +2977,4 @@ const qpid::broker::ConnectionState* getManagementExecutionContext()
}
}}
+
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index f68bfe0577..c7e830dcf5 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -36,7 +36,6 @@
#include "qpid/sys/MemStat.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
-#include <qpid/framing/FieldValue.h>
#include <qpid/framing/ResizableBuffer.h>
#include <memory>
#include <string>
@@ -72,7 +71,7 @@ public:
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
- void configure (const std::string& dataDir, uint16_t interval,
+ void configure (const std::string& dataDir, bool publish, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
/** Called after plugins are initialized. */
void pluginsInitialized();
@@ -300,6 +299,7 @@ private:
qpid::broker::Exchange::shared_ptr v2Topic;
qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
+ bool publish;
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp
index 1d5f8bbd6b..312eacf462 100644
--- a/cpp/src/qpid/management/ManagementDirectExchange.cpp
+++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp
@@ -40,17 +40,17 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name,
DirectExchange(_name, _durable, _args, _parent, b),
managementAgent(0) {}
-void ManagementDirectExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementDirectExchange::route(Deliverable& msg)
{
bool routeIt = true;
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
if (managementAgent)
routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion);
if (routeIt)
- DirectExchange::route(msg, routingKey, args);
+ DirectExchange::route(msg);
}
void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv)
diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h
index 7507179c06..582354d723 100644
--- a/cpp/src/qpid/management/ManagementDirectExchange.h
+++ b/cpp/src/qpid/management/ManagementDirectExchange.h
@@ -43,9 +43,7 @@ class ManagementDirectExchange : public virtual DirectExchange
virtual std::string getType() const { return typeName; }
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion);
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp
index ee8657646f..587cc660df 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.cpp
+++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp
@@ -39,18 +39,18 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name,
TopicExchange(_name, _durable, _args, _parent, b),
managementAgent(0) {}
-void ManagementTopicExchange::route(Deliverable& msg,
- const string& routingKey,
- const FieldTable* args)
+void ManagementTopicExchange::route(Deliverable& msg)
{
bool routeIt = true;
+ const string& routingKey = msg.getMessage().getRoutingKey();
+ const FieldTable* args = msg.getMessage().getApplicationHeaders();
// Intercept management agent commands
if (managementAgent)
routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion);
if (routeIt)
- TopicExchange::route(msg, routingKey, args);
+ TopicExchange::route(msg);
}
bool ManagementTopicExchange::bind(Queue::shared_ptr queue,
diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h
index 232300265e..eff01a8552 100644
--- a/cpp/src/qpid/management/ManagementTopicExchange.h
+++ b/cpp/src/qpid/management/ManagementTopicExchange.h
@@ -43,9 +43,7 @@ class ManagementTopicExchange : public virtual TopicExchange
virtual std::string getType() const { return typeName; }
- virtual void route(Deliverable& msg,
- const std::string& routingKey,
- const qpid::framing::FieldTable* args);
+ virtual void route(Deliverable& msg);
virtual bool bind(Queue::shared_ptr queue,
const std::string& routingKey,