diff options
| author | Ted Ross <tross@apache.org> | 2012-02-29 12:27:58 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2012-02-29 12:27:58 +0000 |
| commit | c92e26782b48a898d68721a2b66d5cb384cd6129 (patch) | |
| tree | a551b1eac930537acd3bd1a74edf94e6cfa153d9 /qpid/cpp/src | |
| parent | 9c6b3d24943f2edb7c5f8ab351b9429804801d0a (diff) | |
| download | qpid-python-c92e26782b48a898d68721a2b66d5cb384cd6129.tar.gz | |
QPID-3871 - Added --mgmt-publish switch to the C++ broker
Also in this commit:
- Fixed a problem with an unused argument in qpid-stat
- Cleaned up the recurring timer code in ManagementAgent
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295075 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 65 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 3 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/run_cli_tests | 4 |
5 files changed, 55 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index 221c31583b..02111b4387 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -111,6 +111,7 @@ Broker::Options::Options(const std::string& name) : maxConnections(500), connectionBacklog(10), enableMgmt(1), + mgmtPublish(1), mgmtPubInterval(10), queueCleanInterval(60*10),//10 minutes auth(SaslAuthenticator::available()), @@ -148,6 +149,7 @@ Broker::Options::Options(const std::string& name) : ("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections") ("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket") ("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management") + ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") // FIXME aconway 2012-02-13: consistent treatment of values in SECONDS @@ -213,7 +215,7 @@ Broker::Broker(const Broker::Options& conf) : try { if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); - managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), + managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish, conf.mgmtPubInterval, this, conf.workerThreads + 3); managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); @@ -228,6 +230,7 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_maxConns(conf.maxConnections); mgmtObject->set_connBacklog(conf.connectionBacklog); mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); + mgmtObject->set_mgmtPublish(conf.mgmtPublish); mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) mgmtObject->set_dataDir(dataDir.getPath()); diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index a812c28b80..543d42e002 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -105,6 +105,7 @@ public: int maxConnections; int connectionBacklog; bool enableMgmt; + bool mgmtPublish; uint16_t mgmtPubInterval; uint16_t queueCleanInterval; bool auth; diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 741ef442b0..372393886d 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -117,7 +117,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : - threadPoolSize(1), interval(10), broker(0), timer(0), + threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), @@ -164,10 +164,11 @@ ManagementAgent::~ManagementAgent () } } -void ManagementAgent::configure(const string& _dataDir, uint16_t _interval, +void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t _interval, qpid::broker::Broker* _broker, int _threads) { dataDir = _dataDir; + publish = _publish; interval = _interval; broker = _broker; threadPoolSize = _threads; @@ -428,16 +429,17 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), - "ManagementAgent::periodicProcessing"), + : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), + "ManagementAgent::periodicProcessing"), agent(_agent) {} -ManagementAgent::Periodic::~Periodic () {} +ManagementAgent::Periodic::~Periodic() {} -void ManagementAgent::Periodic::fire () +void ManagementAgent::Periodic::fire() { - agent.timer->add (new Periodic (agent, agent.interval)); - agent.periodicProcessing (); + setupNextFire(); + agent.timer->add(this); + agent.periodicProcessing(); } void ManagementAgent::clientAdded (const string& routingKey) @@ -719,11 +721,16 @@ void ManagementAgent::periodicProcessing (void) string routingKey; string sBuf; - uint64_t uptime = sys::Duration(startTime, sys::now()); - static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); - moveNewObjectsLH(); - qpid::sys::MemStat::loadMemInfo(memstat); + + // + // If we're publishing updates, get the latest memory statistics and uptime now + // + if (publish) { + uint64_t uptime = sys::Duration(startTime, sys::now()); + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + qpid::sys::MemStat::loadMemInfo(memstat); + } // // Clear the been-here flag on all objects in the map. @@ -747,6 +754,14 @@ void ManagementAgent::periodicProcessing (void) // would incorrectly think the object was deleted. See QPID-2997 // bool objectsDeleted = moveDeletedObjectsLH(); + + // + // If we are not publishing updates, just clear the pending deletes. There's no + // need to tell anybody. + // + if (!publish) + pendingDeletedObjs.clear(); + if (!pendingDeletedObjs.empty()) { // use a temporary copy of the pending deletes so dropping the lock when // the buffer is sent is safe. @@ -867,7 +882,9 @@ void ManagementAgent::periodicProcessing (void) // sendBuffer(). This allows the managementObjects map to be altered during the // sendBuffer() call, so always restart the search after a sendBuffer() call // - while (1) { + // If publish is disabled, don't send any updates. + // + while (publish) { msgBuffer.reset(); Variant::List list_; uint32_t pcount; @@ -1023,10 +1040,9 @@ void ManagementAgent::periodicProcessing (void) if (objectsDeleted) deleteOrphanedAgentsLH(); - // heartbeat generation + // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled. if (qmf1Support) { -#define BUFSIZE 65536 uint32_t contentSize; char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); @@ -1087,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) Variant::List list_; stringstream v1key, v2key; - if (qmf1Support) { + if (publish && qmf1Support) { string sBuf; v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName(); @@ -1096,7 +1112,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) msgBuffer.putRawData(sBuf); } - if (qmf2Support) { + if (publish && qmf2Support) { Variant::Map map_; Variant::Map values; @@ -1121,14 +1137,14 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid) // object deleted, ok to drop lock now. - if (qmf1Support) { + if (publish && qmf1Support) { uint32_t contentSize = msgBuffer.getPosition(); msgBuffer.reset(); sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str()); QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str()); } - if (qmf2Support) { + if (publish && qmf2Support) { Variant::Map headers; headers["method"] = "indication"; headers["qmf.opcode"] = "_data_indication"; @@ -1841,6 +1857,12 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe if (className == "memory") qpid::sys::MemStat::loadMemInfo(memstat); + if (className == "broker") { + uint64_t uptime = sys::Duration(startTime, sys::now()); + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + } + + // build up a set of all objects to be dumped for (ManagementObjectMap::iterator iter = managementObjects.begin(); iter != managementObjects.end(); @@ -1956,6 +1978,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co if (className == "memory") qpid::sys::MemStat::loadMemInfo(memstat); + if (className == "broker") { + uint64_t uptime = sys::Duration(startTime, sys::now()); + static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); + } + /* * Unpack the _object_id element of the query if it is present. If it is present, find that one * object and return it. If it is not present, send a class-based result. diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index f68bfe0577..f01c66b4af 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -72,7 +72,7 @@ public: virtual ~ManagementAgent (); /** Called before plugins are initialized */ - void configure (const std::string& dataDir, uint16_t interval, + void configure (const std::string& dataDir, bool publish, uint16_t interval, qpid::broker::Broker* broker, int threadPoolSize); /** Called after plugins are initialized. */ void pluginsInitialized(); @@ -300,6 +300,7 @@ private: qpid::broker::Exchange::shared_ptr v2Topic; qpid::broker::Exchange::shared_ptr v2Direct; std::string dataDir; + bool publish; uint16_t interval; qpid::broker::Broker* broker; qpid::sys::Timer* timer; diff --git a/qpid/cpp/src/tests/run_cli_tests b/qpid/cpp/src/tests/run_cli_tests index ec5c71b646..ea44437c0d 100755 --- a/qpid/cpp/src/tests/run_cli_tests +++ b/qpid/cpp/src/tests/run_cli_tests @@ -56,9 +56,9 @@ start_brokers() { targs="--ignore=*xml*" fi - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no $xargs > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --mgmt-publish no --auth no $xargs > qpidd.port LOCAL_PORT=`cat qpidd.port` - ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no $xargs > qpidd.port + ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --mgmt-publish no --auth no $xargs > qpidd.port REMOTE_PORT=`cat qpidd.port` } |
