summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2012-02-29 12:27:58 +0000
committerTed Ross <tross@apache.org>2012-02-29 12:27:58 +0000
commitc92e26782b48a898d68721a2b66d5cb384cd6129 (patch)
treea551b1eac930537acd3bd1a74edf94e6cfa153d9 /qpid/cpp/src
parent9c6b3d24943f2edb7c5f8ab351b9429804801d0a (diff)
downloadqpid-python-c92e26782b48a898d68721a2b66d5cb384cd6129.tar.gz
QPID-3871 - Added --mgmt-publish switch to the C++ broker
Also in this commit: - Fixed a problem with an unused argument in qpid-stat - Cleaned up the recurring timer code in ManagementAgent git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1295075 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.cpp5
-rw-r--r--qpid/cpp/src/qpid/broker/Broker.h1
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp65
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h3
-rwxr-xr-xqpid/cpp/src/tests/run_cli_tests4
5 files changed, 55 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp
index 221c31583b..02111b4387 100644
--- a/qpid/cpp/src/qpid/broker/Broker.cpp
+++ b/qpid/cpp/src/qpid/broker/Broker.cpp
@@ -111,6 +111,7 @@ Broker::Options::Options(const std::string& name) :
maxConnections(500),
connectionBacklog(10),
enableMgmt(1),
+ mgmtPublish(1),
mgmtPubInterval(10),
queueCleanInterval(60*10),//10 minutes
auth(SaslAuthenticator::available()),
@@ -148,6 +149,7 @@ Broker::Options::Options(const std::string& name) :
("max-connections", optValue(maxConnections, "N"), "Sets the maximum allowed connections")
("connection-backlog", optValue(connectionBacklog, "N"), "Sets the connection backlog limit for the server socket")
("mgmt-enable,m", optValue(enableMgmt,"yes|no"), "Enable Management")
+ ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)")
("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2")
("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1")
// FIXME aconway 2012-02-13: consistent treatment of values in SECONDS
@@ -213,7 +215,7 @@ Broker::Broker(const Broker::Options& conf) :
try {
if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
- managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(),
+ managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish,
conf.mgmtPubInterval, this, conf.workerThreads + 3);
managementAgent->setName("apache.org", "qpidd");
_qmf::Package packageInitializer(managementAgent.get());
@@ -228,6 +230,7 @@ Broker::Broker(const Broker::Options& conf) :
mgmtObject->set_maxConns(conf.maxConnections);
mgmtObject->set_connBacklog(conf.connectionBacklog);
mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
+ mgmtObject->set_mgmtPublish(conf.mgmtPublish);
mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
mgmtObject->set_dataDir(dataDir.getPath());
diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h
index a812c28b80..543d42e002 100644
--- a/qpid/cpp/src/qpid/broker/Broker.h
+++ b/qpid/cpp/src/qpid/broker/Broker.h
@@ -105,6 +105,7 @@ public:
int maxConnections;
int connectionBacklog;
bool enableMgmt;
+ bool mgmtPublish;
uint16_t mgmtPubInterval;
uint16_t queueCleanInterval;
bool auth;
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 741ef442b0..372393886d 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -117,7 +117,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
}
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
- threadPoolSize(1), interval(10), broker(0), timer(0),
+ threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -164,10 +164,11 @@ ManagementAgent::~ManagementAgent ()
}
}
-void ManagementAgent::configure(const string& _dataDir, uint16_t _interval,
+void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t _interval,
qpid::broker::Broker* _broker, int _threads)
{
dataDir = _dataDir;
+ publish = _publish;
interval = _interval;
broker = _broker;
threadPoolSize = _threads;
@@ -428,16 +429,17 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
- "ManagementAgent::periodicProcessing"),
+ : TimerTask(sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
+ "ManagementAgent::periodicProcessing"),
agent(_agent) {}
-ManagementAgent::Periodic::~Periodic () {}
+ManagementAgent::Periodic::~Periodic() {}
-void ManagementAgent::Periodic::fire ()
+void ManagementAgent::Periodic::fire()
{
- agent.timer->add (new Periodic (agent, agent.interval));
- agent.periodicProcessing ();
+ setupNextFire();
+ agent.timer->add(this);
+ agent.periodicProcessing();
}
void ManagementAgent::clientAdded (const string& routingKey)
@@ -719,11 +721,16 @@ void ManagementAgent::periodicProcessing (void)
string routingKey;
string sBuf;
- uint64_t uptime = sys::Duration(startTime, sys::now());
- static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
-
moveNewObjectsLH();
- qpid::sys::MemStat::loadMemInfo(memstat);
+
+ //
+ // If we're publishing updates, get the latest memory statistics and uptime now
+ //
+ if (publish) {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ qpid::sys::MemStat::loadMemInfo(memstat);
+ }
//
// Clear the been-here flag on all objects in the map.
@@ -747,6 +754,14 @@ void ManagementAgent::periodicProcessing (void)
// would incorrectly think the object was deleted. See QPID-2997
//
bool objectsDeleted = moveDeletedObjectsLH();
+
+ //
+ // If we are not publishing updates, just clear the pending deletes. There's no
+ // need to tell anybody.
+ //
+ if (!publish)
+ pendingDeletedObjs.clear();
+
if (!pendingDeletedObjs.empty()) {
// use a temporary copy of the pending deletes so dropping the lock when
// the buffer is sent is safe.
@@ -867,7 +882,9 @@ void ManagementAgent::periodicProcessing (void)
// sendBuffer(). This allows the managementObjects map to be altered during the
// sendBuffer() call, so always restart the search after a sendBuffer() call
//
- while (1) {
+ // If publish is disabled, don't send any updates.
+ //
+ while (publish) {
msgBuffer.reset();
Variant::List list_;
uint32_t pcount;
@@ -1023,10 +1040,9 @@ void ManagementAgent::periodicProcessing (void)
if (objectsDeleted) deleteOrphanedAgentsLH();
- // heartbeat generation
+ // heartbeat generation. Note that heartbeats need to be sent even if publish is disabled.
if (qmf1Support) {
-#define BUFSIZE 65536
uint32_t contentSize;
char msgChars[BUFSIZE];
Buffer msgBuffer(msgChars, BUFSIZE);
@@ -1087,7 +1103,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
Variant::List list_;
stringstream v1key, v2key;
- if (qmf1Support) {
+ if (publish && qmf1Support) {
string sBuf;
v1key << "console.obj.1.0." << object->getPackageName() << "." << object->getClassName();
@@ -1096,7 +1112,7 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
msgBuffer.putRawData(sBuf);
}
- if (qmf2Support) {
+ if (publish && qmf2Support) {
Variant::Map map_;
Variant::Map values;
@@ -1121,14 +1137,14 @@ void ManagementAgent::deleteObjectNowLH(const ObjectId& oid)
// object deleted, ok to drop lock now.
- if (qmf1Support) {
+ if (publish && qmf1Support) {
uint32_t contentSize = msgBuffer.getPosition();
msgBuffer.reset();
sendBufferLH(msgBuffer, contentSize, mExchange, v1key.str());
QPID_LOG(debug, "SEND Immediate(delete) ContentInd to=" << v1key.str());
}
- if (qmf2Support) {
+ if (publish && qmf2Support) {
Variant::Map headers;
headers["method"] = "indication";
headers["qmf.opcode"] = "_data_indication";
@@ -1841,6 +1857,12 @@ void ManagementAgent::handleGetQueryLH(Buffer& inBuffer, const string& replyToKe
if (className == "memory")
qpid::sys::MemStat::loadMemInfo(memstat);
+ if (className == "broker") {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ }
+
+
// build up a set of all objects to be dumped
for (ManagementObjectMap::iterator iter = managementObjects.begin();
iter != managementObjects.end();
@@ -1956,6 +1978,11 @@ void ManagementAgent::handleGetQueryLH(const string& body, const string& rte, co
if (className == "memory")
qpid::sys::MemStat::loadMemInfo(memstat);
+ if (className == "broker") {
+ uint64_t uptime = sys::Duration(startTime, sys::now());
+ static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
+ }
+
/*
* Unpack the _object_id element of the query if it is present. If it is present, find that one
* object and return it. If it is not present, send a class-based result.
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index f68bfe0577..f01c66b4af 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -72,7 +72,7 @@ public:
virtual ~ManagementAgent ();
/** Called before plugins are initialized */
- void configure (const std::string& dataDir, uint16_t interval,
+ void configure (const std::string& dataDir, bool publish, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
/** Called after plugins are initialized. */
void pluginsInitialized();
@@ -300,6 +300,7 @@ private:
qpid::broker::Exchange::shared_ptr v2Topic;
qpid::broker::Exchange::shared_ptr v2Direct;
std::string dataDir;
+ bool publish;
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
diff --git a/qpid/cpp/src/tests/run_cli_tests b/qpid/cpp/src/tests/run_cli_tests
index ec5c71b646..ea44437c0d 100755
--- a/qpid/cpp/src/tests/run_cli_tests
+++ b/qpid/cpp/src/tests/run_cli_tests
@@ -56,9 +56,9 @@ start_brokers() {
targs="--ignore=*xml*"
fi
- ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no $xargs > qpidd.port
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --mgmt-publish no --auth no $xargs > qpidd.port
LOCAL_PORT=`cat qpidd.port`
- ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no $xargs > qpidd.port
+ ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --mgmt-publish no --auth no $xargs > qpidd.port
REMOTE_PORT=`cat qpidd.port`
}