summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/management/ManagementAgent.h
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/management/ManagementAgent.h')
-rw-r--r--cpp/src/qpid/management/ManagementAgent.h183
1 files changed, 68 insertions, 115 deletions
diff --git a/cpp/src/qpid/management/ManagementAgent.h b/cpp/src/qpid/management/ManagementAgent.h
index c7e830dcf5..6de5d1d719 100644
--- a/cpp/src/qpid/management/ManagementAgent.h
+++ b/cpp/src/qpid/management/ManagementAgent.h
@@ -26,7 +26,6 @@
#include "qpid/broker/Exchange.h"
#include "qpid/framing/Uuid.h"
#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Timer.h"
#include "qpid/broker/ConnectionToken.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
@@ -34,9 +33,11 @@
#include "qmf/org/apache/qpid/broker/Agent.h"
#include "qmf/org/apache/qpid/broker/Memory.h"
#include "qpid/sys/MemStat.h"
+#include "qpid/sys/PollableQueue.h"
#include "qpid/types/Variant.h"
#include <qpid/framing/AMQFrame.h>
#include <qpid/framing/ResizableBuffer.h>
+#include <boost/shared_ptr.hpp>
#include <memory>
#include <string>
#include <map>
@@ -45,6 +46,9 @@ namespace qpid {
namespace broker {
class ConnectionState;
}
+namespace sys {
+class Timer;
+}
namespace management {
class ManagementAgent
@@ -73,11 +77,6 @@ public:
/** Called before plugins are initialized */
void configure (const std::string& dataDir, bool publish, uint16_t interval,
qpid::broker::Broker* broker, int threadPoolSize);
- /** Called after plugins are initialized. */
- void pluginsInitialized();
-
- /** Called by cluster to suppress management output during update. */
- void suppress(bool s) { suppressed = s; }
void setName(const std::string& vendor,
const std::string& product,
@@ -100,18 +99,16 @@ public:
const std::string& eventName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- uint64_t persistId = 0,
- bool persistent = false);
- QPID_BROKER_EXTERN ObjectId addObject (ManagementObject* object,
- const std::string& key,
- bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ uint64_t persistId = 0,
+ bool persistent = false);
+ QPID_BROKER_EXTERN ObjectId addObject (ManagementObject::shared_ptr object,
+ const std::string& key,
+ bool persistent = false);
QPID_BROKER_EXTERN void raiseEvent(const ManagementEvent& event,
severity_t severity = SEV_DEFAULT);
QPID_BROKER_EXTERN void clientAdded (const std::string& routingKey);
- QPID_BROKER_EXTERN void clusterUpdate();
-
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args,
@@ -121,25 +118,6 @@ public:
/** Disallow a method. Attempts to call it will receive an exception with message. */
void disallow(const std::string& className, const std::string& methodName, const std::string& message);
- /** Disallow all QMFv1 methods (used in clustered brokers). */
- void disallowV1Methods() { disallowAllV1Methods = true; }
-
- /** Serialize my schemas as a binary blob into schemaOut */
- void exportSchemas(std::string& schemaOut);
-
- /** Serialize my remote-agent map as a binary blob into agentsOut */
- void exportAgents(std::string& agentsOut);
-
- /** Decode a serialized schemas and add to my schema cache */
- void importSchemas(framing::Buffer& inBuf);
-
- /** Decode a serialized agent map */
- void importAgents(framing::Buffer& inBuf);
-
- // these are in support of the managementSetup-state stuff, for synch'ing clustered brokers
- uint64_t getNextObjectId(void) { return nextObjectId; }
- void setNextObjectId(uint64_t o) { nextObjectId = o; }
-
uint16_t getBootSequence(void) { return bootSequence; }
void setBootSequence(uint16_t b) { bootSequence = b; writeData(); }
@@ -148,20 +126,11 @@ public:
static types::Variant::Map toMap(const framing::FieldTable& from);
- // For Clustering: management objects that have been marked as
- // "deleted", but are waiting for their last published object
- // update are not visible to the cluster replication code. These
- // interfaces allow clustering to gather up all the management
- // objects that are deleted in order to allow all clustered
- // brokers to publish the same set of deleted objects.
-
class DeletedObject {
public:
typedef boost::shared_ptr<DeletedObject> shared_ptr;
- DeletedObject(ManagementObject *, bool v1, bool v2);
- DeletedObject( const std::string &encoded );
+ DeletedObject(ManagementObject::shared_ptr, bool v1, bool v2);
~DeletedObject() {};
- void encode( std::string& toBuffer );
const std::string getKey() const {
// used to batch up objects of the same class type
return std::string(packageName + std::string(":") + className);
@@ -181,22 +150,7 @@ public:
typedef std::vector<DeletedObject::shared_ptr> DeletedObjectList;
- /** returns a snapshot of all currently deleted management objects. */
- void exportDeletedObjects( DeletedObjectList& outList );
-
- /** Import a list of deleted objects to send on next publish interval. */
- void importDeletedObjects( const DeletedObjectList& inList );
-
private:
- struct Periodic : public qpid::sys::TimerTask
- {
- ManagementAgent& agent;
-
- Periodic (ManagementAgent& agent, uint32_t seconds);
- virtual ~Periodic ();
- void fire ();
- };
-
// Storage for tracking remote management agents, attached via the client
// management agent API.
//
@@ -207,9 +161,9 @@ private:
uint32_t agentBank;
std::string routingKey;
ObjectId connectionRef;
- qmf::org::apache::qpid::broker::Agent* mgmtObject;
- RemoteAgent(ManagementAgent& _agent) : agent(_agent), mgmtObject(0) {}
- ManagementObject* GetManagementObject (void) const { return mgmtObject; }
+ qmf::org::apache::qpid::broker::Agent::shared_ptr mgmtObject;
+ RemoteAgent(ManagementAgent& _agent) : agent(_agent) {}
+ ManagementObject::shared_ptr GetManagementObject (void) const { return mgmtObject; }
virtual ~RemoteAgent ();
void mapEncode(qpid::types::Variant::Map& _map) const;
@@ -276,7 +230,7 @@ private:
PackageMap packages;
//
- // Protected by userLock
+ // Protected by objectLock
//
ManagementObjectMap managementObjects;
@@ -288,11 +242,11 @@ private:
framing::Uuid uuid;
//
- // Lock hierarchy: If a thread needs to take both addLock and userLock,
- // it MUST take userLock first, then addLock.
+ // Lock ordering: userLock -> addLock -> objectLock
//
sys::Mutex userLock;
sys::Mutex addLock;
+ sys::Mutex objectLock;
qpid::broker::Exchange::shared_ptr mExchange;
qpid::broker::Exchange::shared_ptr dExchange;
@@ -335,53 +289,51 @@ private:
// list of objects that have been deleted, but have yet to be published
// one final time.
// Indexed by a string composed of the object's package and class name.
- // Protected by userLock.
+ // Protected by objectLock.
typedef std::map<std::string, DeletedObjectList> PendingDeletedObjsMap;
PendingDeletedObjsMap pendingDeletedObjs;
-# define MA_BUFFER_SIZE 65536
- char inputBuffer[MA_BUFFER_SIZE];
- char outputBuffer[MA_BUFFER_SIZE];
- char eventBuffer[MA_BUFFER_SIZE];
- framing::ResizableBuffer msgBuffer;
+ // Pollable queue to serialize event messages
+ typedef std::pair<boost::shared_ptr<broker::Exchange>,
+ broker::Message> ExchangeAndMessage;
+ typedef sys::PollableQueue<ExchangeAndMessage> EventQueue;
//
// Memory statistics object
//
- qmf::org::apache::qpid::broker::Memory *memstat;
+ qmf::org::apache::qpid::broker::Memory::shared_ptr memstat;
void writeData ();
void periodicProcessing (void);
- void deleteObjectNowLH(const ObjectId& oid);
+ void deleteObjectNow(const ObjectId& oid);
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey);
- void sendBufferLH(framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange,
- const std::string& routingKey);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- qpid::broker::Exchange::shared_ptr exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void sendBufferLH(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map& headers,
- const std::string& content_type,
- const std::string& exchange,
- const std::string& routingKey,
- uint64_t ttl_msec = 0);
- void moveNewObjectsLH();
- bool moveDeletedObjectsLH();
-
- bool authorizeAgentMessageLH(qpid::broker::Message& msg);
- void dispatchAgentCommandLH(qpid::broker::Message& msg, bool viaLocal=false);
+ EventQueue::Batch::const_iterator sendEvents(const EventQueue::Batch& batch);
+ void sendBuffer(framing::Buffer& buf,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey);
+ void sendBuffer(framing::Buffer& buf,
+ const std::string& exchange,
+ const std::string& routingKey);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ qpid::broker::Exchange::shared_ptr exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void sendBuffer(const std::string& data,
+ const std::string& cid,
+ const qpid::types::Variant::Map& headers,
+ const std::string& content_type,
+ const std::string& exchange,
+ const std::string& routingKey,
+ uint64_t ttl_msec = 0);
+ void moveNewObjects();
+ bool moveDeletedObjects();
+
+ bool authorizeAgentMessage(qpid::broker::Message& msg);
+ void dispatchAgentCommand(qpid::broker::Message& msg, bool viaLocal=false);
PackageMap::iterator findOrAddPackageLH(std::string name);
void addClassLH(uint8_t kind,
@@ -399,22 +351,22 @@ private:
uint32_t allocateNewBank ();
uint32_t assignBankLH (uint32_t requestedPrefix);
void deleteOrphanedAgentsLH();
- void sendCommandCompleteLH(const std::string& replyToKey, uint32_t sequence,
- uint32_t code = 0, const std::string& text = "OK");
- void sendExceptionLH(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
- void handleBrokerRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handlePackageIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleClassIndLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaRequestLH (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
- void handleSchemaResponseLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleAttachRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
- void handleMethodRequestLH (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
- void handleGetQueryLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
- void handleMethodRequestLH (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
- void handleLocateRequestLH (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
+ void sendCommandComplete(const std::string& replyToKey, uint32_t sequence,
+ uint32_t code = 0, const std::string& text = "OK");
+ void sendException(const std::string& rte, const std::string& rtk, const std::string& cid, const std::string& text, uint32_t code=1, bool viaLocal=false);
+ void handleBrokerRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handlePackageInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleClassInd (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaRequest (framing::Buffer& inBuffer, const std::string& replyToEx, const std::string& replyToKey, uint32_t sequence);
+ void handleSchemaResponse (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleAttachRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQuery (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence);
+ void handleMethodRequest (framing::Buffer& inBuffer, const std::string& replyToKey, uint32_t sequence, const qpid::broker::ConnectionToken* connToken);
+ void handleGetQuery (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, bool viaLocal);
+ void handleMethodRequest (const std::string& body, const std::string& replyToEx, const std::string& replyToKey, const std::string& cid, const qpid::broker::ConnectionToken* connToken, bool viaLocal);
+ void handleLocateRequest (const std::string& body, const std::string& replyToEx, const std::string &replyToKey, const std::string& cid);
size_t validateSchema(framing::Buffer&, uint8_t kind);
@@ -424,6 +376,7 @@ private:
std::string summarizeAgents();
void debugSnapshot(const char* title);
+ std::auto_ptr<EventQueue> sendQueue;
};
void setManagementExecutionContext(const qpid::broker::ConnectionState*);