diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2012-05-04 15:39:19 +0000 |
| commit | 633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch) | |
| tree | 1391da89470593209466df68c0b40b89c14963b1 /cpp/src/qpid/management | |
| parent | c73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff) | |
| download | qpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz | |
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/management')
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 79 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementDirectExchange.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.cpp | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementTopicExchange.h | 4 |
6 files changed, 65 insertions, 42 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 741ef442b0..062a530706 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -30,6 +30,7 @@ #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/FieldValue.h" #include "qpid/sys/Time.h" #include "qpid/sys/Thread.h" #include "qpid/broker/ConnectionState.h" @@ -44,14 +45,15 @@ #include <sstream> #include <typeinfo> +namespace qpid { +namespace management { + using boost::intrusive_ptr; using qpid::framing::Uuid; using qpid::types::Variant; using qpid::amqp_0_10::MapCodec; using qpid::amqp_0_10::ListCodec; -using qpid::sys::Mutex; using namespace qpid::framing; -using namespace qpid::management; using namespace qpid::broker; using namespace qpid; using namespace std; @@ -117,7 +119,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : - threadPoolSize(1), interval(10), broker(0), timer(0), + threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), @@ -164,10 +166,11 @@ ManagementAgent::~ManagementAgent () } } -void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, +void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t _interval, qpid::broker::Broker* _broker, int _threads) { dataDir = _dataDir; + publish = _publish; interval = _interval; broker = _broker; threadPoolSize = _threads; @@ -428,16 +431,17 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), - "ManagementAgent::periodicProcessing"), + : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), + "ManagementAgent::periodicProcessing"), agent(_agent) {} -ManagementAgent::Periodic::~Periodic () {} +ManagementAgent::Periodic::~Periodic() {} -void ManagementAgent::Periodic::fire () +void ManagementAgent::Periodic::fire() { - agent.timer->add (new Periodic (agent, agent.interval)); - agent.periodicProcessing (); + setupNextFire(); + agent.timer->add(this); + agent.periodicProcessing(); } void ManagementAgent::clientAdded (const string& routingKey) @@ -562,7 +566,7 @@ void ManagementAgent::sendBufferLH(Buffer& buf, DeliverableMessage deliverable (msg); try { - exchange->route(deliverable, routingKey, 0); + exchange->route(deliverable); } catch(exception&) {} } buf.reset(); @@ -639,7 +643,7 @@ void ManagementAgent::sendBufferLH(const string& data, DeliverableMessage deliverable (msg); try { - exchange->route(deliverable, routingKey, 0); + exchange->route(deliverable); } catch(exception&) {} } } @@ -719,11 +723,16 @@ void ManagementAgent::periodicProcessing (void) string routingKey; string sBuf; - uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - moveNewObjectsLH(); - qpid::sys::MemStat::loadMemInfo(memstat); + + // + // If we're publishing updates, get the latest memory statistics and uptime now + // + if (publish) { + uint64_t uptime = sys::Duration(startTime, sys::now()); + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + qpid::sys::MemStat::loadMemInfo(memstat); + } // // Clear the been-here flag on all objects in the map. @@ -747,6 +756,14 @@ void ManagementAgent::periodicProcessing (void) // would incorrectly think the object was deleted. See QPID-2997 // bool objectsDeleted = moveDeletedObjectsLH(); + + // + // If we are not publishing updates, just clear the pending deletes. There's no + // need to tell anybody. + // + if (!publish) + pendingDeletedObjs.clear(); + if (!pendingDeletedObjs.empty()) { // use a temporary copy of the pending deletes so dropping the lock when // the buffer is sent is safe. @@ -867,7 +884,9 @@ void ManagementAgent::periodicProcessing (void) // sendBuffer(). This allows the managementObjects map to be altered during the // sendBuffer() call, so always restart the search after a sendBuffer() call // - while (1) { + // If publish is disabled, don't send any updates. + // + while (publish) { msgBuffer.reset(); Variant::List list_; uint32_t pcount; @@ -1023,10 +1042,9 @@ void ManagementAgent::periodicProcessing (void) if (objectsDeleted) deleteOrphanedAgentsLH(); - // heartbeat generation + // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled. if (qmf1Support) { -#define BUFSIZE 65536 uint32_t contentSize; char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); @@ -1087,7 +1105,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) Variant::List list_; stringstream v1key, v2key; - if (qmf1Support) { + if (publish && qmf1Support) { string sBuf; v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); @@ -1096,7 +1114,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) msgBuffer.putRawData(sBuf); } - if (qmf2Support) { + if (publish && qmf2Support) { Variant::Map map_; Variant::Map values; @@ -1121,14 +1139,14 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) // object deleted, ok to drop lock now. - if (qmf1Support) { + if (publish && qmf1Support) { uint32_t contentSize = msgBuffer.getPosition(); msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } - if (qmf2Support) { + if (publish && qmf2Support) { Variant::Map headers; headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; @@ -1841,6 +1859,12 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe if (className == "memory") qpid::sys::MemStat::loadMemInfo(memstat); + if (className == "broker") { + uint64_t uptime = sys::Duration(startTime, sys::now()); + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + } + + // build up a set of all objects to be dumped for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); @@ -1956,6 +1980,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co if (className == "memory") qpid::sys::MemStat::loadMemInfo(memstat); + if (className == "broker") { + uint64_t uptime = sys::Duration(startTime, sys::now()); + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + } + /* * Unpack the _object_id element of the query if it is present. If it is present, find that one * object and return it. If it is not present, send a class-based result. @@ -2934,9 +2963,6 @@ bool ManagementAgent::moveDeletedObjectsLH() { return !deleteList.empty(); } -namespace qpid { -namespace management { - namespace { QPID_TSS const qpid::broker::ConnectionState* executionContext = 0; } @@ -2951,3 +2977,4 @@ const qpid::broker::ConnectionState* getManagementExecutionContext() } }} + diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h index f68bfe0577..c7e830dcf5 100644 --- a/cpp/src/qpid/management/ManagementAgent.h +++ b/cpp/src/qpid/management/ManagementAgent.h @@ -36,7 +36,6 @@ #include "qpid/sys/MemStat.h" #include "qpid/types/Variant.h" #include <qpid/framing/AMQFrame.h> -#include <qpid/framing/FieldValue.h> #include <qpid/framing/ResizableBuffer.h> #include <memory> #include <string> @@ -72,7 +71,7 @@ public: virtual ~ManagementAgent (); /** Called before plugins are initialized */ - void configure (const std::string& dataDir, uint16_t interval, + void configure (const std::string& dataDir, bool publish, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); /** Called after plugins are initialized. */ void pluginsInitialized(); @@ -300,6 +299,7 @@ private: qpid::broker::Exchange::shared_ptr v2Topic; qpid::broker::Exchange::shared_ptr v2Direct; std::string dataDir; + bool publish; uint16_t interval; qpid::broker::Broker* broker; qpid::sys::Timer* timer; diff --git a/cpp/src/qpid/management/ManagementDirectExchange.cpp b/cpp/src/qpid/management/ManagementDirectExchange.cpp index 1d5f8bbd6b..312eacf462 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.cpp +++ b/cpp/src/qpid/management/ManagementDirectExchange.cpp @@ -40,17 +40,17 @@ ManagementDirectExchange::ManagementDirectExchange(const std::string& _name, DirectExchange(_name, _durable, _args, _parent, b), managementAgent(0) {} -void ManagementDirectExchange::route(Deliverable& msg, - const string& routingKey, - const FieldTable* args) +void ManagementDirectExchange::route(Deliverable& msg) { bool routeIt = true; + const string& routingKey = msg.getMessage().getRoutingKey(); + const FieldTable* args = msg.getMessage().getApplicationHeaders(); if (managementAgent) routeIt = managementAgent->dispatchCommand(msg, routingKey, args, false, qmfVersion); if (routeIt) - DirectExchange::route(msg, routingKey, args); + DirectExchange::route(msg); } void ManagementDirectExchange::setManagmentAgent(ManagementAgent* agent, int qv) diff --git a/cpp/src/qpid/management/ManagementDirectExchange.h b/cpp/src/qpid/management/ManagementDirectExchange.h index 7507179c06..582354d723 100644 --- a/cpp/src/qpid/management/ManagementDirectExchange.h +++ b/cpp/src/qpid/management/ManagementDirectExchange.h @@ -43,9 +43,7 @@ class ManagementDirectExchange : public virtual DirectExchange virtual std::string getType() const { return typeName; } - virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg); void setManagmentAgent(management::ManagementAgent* agent, int qmfVersion); diff --git a/cpp/src/qpid/management/ManagementTopicExchange.cpp b/cpp/src/qpid/management/ManagementTopicExchange.cpp index ee8657646f..587cc660df 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.cpp +++ b/cpp/src/qpid/management/ManagementTopicExchange.cpp @@ -39,18 +39,18 @@ ManagementTopicExchange::ManagementTopicExchange(const std::string& _name, TopicExchange(_name, _durable, _args, _parent, b), managementAgent(0) {} -void ManagementTopicExchange::route(Deliverable& msg, - const string& routingKey, - const FieldTable* args) +void ManagementTopicExchange::route(Deliverable& msg) { bool routeIt = true; + const string& routingKey = msg.getMessage().getRoutingKey(); + const FieldTable* args = msg.getMessage().getApplicationHeaders(); // Intercept management agent commands if (managementAgent) routeIt = managementAgent->dispatchCommand(msg, routingKey, args, true, qmfVersion); if (routeIt) - TopicExchange::route(msg, routingKey, args); + TopicExchange::route(msg); } bool ManagementTopicExchange::bind(Queue::shared_ptr queue, diff --git a/cpp/src/qpid/management/ManagementTopicExchange.h b/cpp/src/qpid/management/ManagementTopicExchange.h index 232300265e..eff01a8552 100644 --- a/cpp/src/qpid/management/ManagementTopicExchange.h +++ b/cpp/src/qpid/management/ManagementTopicExchange.h @@ -43,9 +43,7 @@ class ManagementTopicExchange : public virtual TopicExchange virtual std::string getType() const { return typeName; } - virtual void route(Deliverable& msg, - const std::string& routingKey, - const qpid::framing::FieldTable* args); + virtual void route(Deliverable& msg); virtual bool bind(Queue::shared_ptr queue, const std::string& routingKey, |
