diff options
| -rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp | 233 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/agent/ManagementAgentImpl.h | 8 | ||||
| -rw-r--r-- | qpid/extras/qmf/src/py/qmf/console.py | 203 |
3 files changed, 302 insertions, 142 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 351e0bfd00..d431e4ca16 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -88,6 +88,8 @@ ManagementAgentImpl::ManagementAgentImpl() : interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0), notifyable(0), inCallback(false), initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"), + topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"), + schemaTimestamp(Duration(EPOCH, now())), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), connThreadBody(*this), connThread(connThreadBody), @@ -296,7 +298,8 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(), event.getEventName(), - event.getMd5Sum()); + event.getMd5Sum(), + ManagementItem::CLASS_KIND_EVENT); event.mapEncode(values); map_["_values"] = values; map_["_timestamp"] = uint64_t(Duration(EPOCH, now())); @@ -308,7 +311,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se headers["qmf.agent"] = name_address; MapCodec::encode(map_, content); - connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str()); + connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str()); } uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) @@ -404,7 +407,6 @@ void ManagementAgentImpl::retrieveData() void ManagementAgentImpl::sendHeartbeat() { - static const string addr_exchange("qmf.default.topic"); static const string addr_key_base("agent.ind.heartbeat"); Variant::Map map; @@ -429,13 +431,9 @@ void ManagementAgentImpl::sendHeartbeat() headers["qmf.opcode"] = "_agent_heartbeat_indication"; headers["qmf.agent"] = name_address; - map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - + getHeartbeatContent(map); MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key.str()); + connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str()); QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address); } @@ -443,8 +441,6 @@ void ManagementAgentImpl::sendHeartbeat() void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid, const string& text, uint32_t code) { - static const string addr_exchange("qmf.default.direct"); - Variant::Map map; Variant::Map headers; Variant::Map values; @@ -459,7 +455,7 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string& map["_values"] = values; MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey); QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text); } @@ -573,7 +569,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& headers["qmf.opcode"] = "_method_response"; QPID_LOG(trace, "SENT MethodResponse map=" << outMap); MapCodec::encode(outMap, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo); } } @@ -590,7 +586,6 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, headers["method"] = "response"; headers["qmf.opcode"] = "_query_response"; - headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; headers["partial"] = Variant(); @@ -614,80 +609,25 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, return; } - if (i->second.asString() != "OBJECT") { - sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); - return; - } - - string className; - string packageName; - - /* - * Handle the _schema_id element, if supplied. - */ - i = inMap.find("_schema_id"); - if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { - const Variant::Map& schemaIdMap(i->second.asMap()); - - Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); - if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) - className = s_iter->second.asString(); - - s_iter = schemaIdMap.find("_package_name"); - if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) - packageName = s_iter->second.asString(); - } - - /* - * Unpack the _object_id element of the query if it is present. If it is present, find that one - * object and return it. If it is not present, send a class-based result. - */ - i = inMap.find("_object_id"); - if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { - ObjectId objId(i->second.asMap()); - - ManagementObjectMap::iterator iter = managementObjects.find(objId); - if (iter != managementObjects.end()) { - ManagementObject* object = iter->second; - - if (object->getConfigChanged() || object->getInstChanged()) - object->setUpdateTime(); - - object->mapEncodeValues(values, true, true); // write both stats and properties - objId.mapEncode(oidMap); - map_["_values"] = values; - map_["_object_id"] = oidMap; - object->writeTimestamps(map_); - map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), - object->getClassName(), - object->getMd5Sum()); - - list_.push_back(map_); - headers.erase("partial"); - - ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); - return; - } - } else { - for (ManagementObjectMap::iterator iter = managementObjects.begin(); - iter != managementObjects.end(); - iter++) { - ManagementObject* object = iter->second; - if (object->getClassName() == className && - (packageName.empty() || object->getPackageName() == packageName)) { + if (i->second.asString() == "OBJECT") { + headers["qmf.content"] = "_data"; + /* + * Unpack the _object_id element of the query if it is present. If it is present, find that one + * object and return it. If it is not present, send a class-based result. + */ + i = inMap.find("_object_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + ObjectId objId(i->second.asMap()); - // @todo support multiple object reply per message - values.clear(); - list_.clear(); - oidMap.clear(); + ManagementObjectMap::iterator iter = managementObjects.find(objId); + if (iter != managementObjects.end()) { + ManagementObject* object = iter->second; if (object->getConfigChanged() || object->getInstChanged()) object->setUpdateTime(); object->mapEncodeValues(values, true, true); // write both stats and properties - iter->first.mapEncode(oidMap); + objId.mapEncode(oidMap); map_["_values"] = values; map_["_object_id"] = oidMap; object->writeTimestamps(map_); @@ -695,26 +635,101 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, object->getClassName(), object->getMd5Sum()); list_.push_back(map_); + headers.erase("partial"); ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo); + return; + } + } else { // match using schema_id, if supplied + + string className; + string packageName; + + i = inMap.find("_schema_id"); + if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) { + const Variant::Map& schemaIdMap(i->second.asMap()); + + Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + className = s_iter->second.asString(); + + s_iter = schemaIdMap.find("_package_name"); + if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING) + packageName = s_iter->second.asString(); + + for (ManagementObjectMap::iterator iter = managementObjects.begin(); + iter != managementObjects.end(); + iter++) { + ManagementObject* object = iter->second; + if (object->getClassName() == className && + (packageName.empty() || object->getPackageName() == packageName)) { + + // @todo support multiple object reply per message + values.clear(); + list_.clear(); + oidMap.clear(); + + if (object->getConfigChanged() || object->getInstChanged()) + object->setUpdateTime(); + + object->mapEncodeValues(values, true, true); // write both stats and properties + iter->first.mapEncode(oidMap); + map_["_values"] = values; + map_["_object_id"] = oidMap; + object->writeTimestamps(map_); + map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(), + object->getClassName(), + object->getMd5Sum()); + list_.push_back(map_); + + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo); + } + } } } - } - // Send empty "non-partial" message to indicate CommandComplete - list_.clear(); - headers.erase("partial"); - ListCodec::encode(list_, content); - connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list"); - QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); + // Send empty "non-partial" message to indicate CommandComplete + list_.clear(); + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo); + + } else if (i->second.asString() == "SCHEMA_ID") { + headers["qmf.content"] = "_schema_id"; + /** + * @todo - support for a predicate. For now, send a list of all known schema class keys. + */ + for (PackageMap::iterator pIter = packages.begin(); + pIter != packages.end(); pIter++) { + for (ClassMap::iterator cIter = pIter->second.begin(); + cIter != pIter->second.end(); cIter++) { + + list_.push_back(mapEncodeSchemaId( pIter->first, + cIter->first.name, + cIter->first.hash, + cIter->second.kind )); + } + } + + headers.erase("partial"); + ListCodec::encode(list_, content); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list"); + QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo); + + } else { + // Unknown query target + sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported"); + } } void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo) { QPID_LOG(trace, "RCVD AgentLocateRequest"); - static const string addr_exchange("qmf.default.direct"); Variant::Map map; Variant::Map headers; @@ -724,13 +739,9 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, headers["qmf.opcode"] = "_agent_locate_response"; headers["qmf.agent"] = name_address; - map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); - map["_values"].asMap()["heartbeat_interval"] = interval; - map["_values"].asMap()["epoch"] = bootSequence; - + getHeartbeatContent(map); MapCodec::encode(map, content); - connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo); + connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo); QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); @@ -822,13 +833,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname, const string& cname, - const uint8_t *md5Sum) + const uint8_t *md5Sum, + uint8_t type) { Variant::Map map_; map_["_package_name"] = pname; map_["_class_name"] = cname; map_["_hash"] = types::Uuid(md5Sum); + if (type == ManagementItem::CLASS_KIND_EVENT) + map_["_type"] = "_event"; + else + map_["_type"] = "_data"; + return map_; } @@ -901,6 +918,8 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, // No such class found, create a new class with local information. cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind))); + schemaTimestamp = Duration(EPOCH, now()); + QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp)); } void ManagementAgentImpl::encodePackageIndication(Buffer& buf, @@ -1014,7 +1033,7 @@ void ManagementAgentImpl::periodicProcessing() headers["qmf.content"] = "_data"; headers["qmf.agent"] = name_address; - connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list"); + connThreadBody.sendBuffer(content, "", headers, topicExchange, "agent.ind.data", "amqp/list"); QPID_LOG(trace, "SENT DataIndication"); } } @@ -1031,6 +1050,16 @@ void ManagementAgentImpl::periodicProcessing() sendHeartbeat(); } + +void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map) +{ + map["_values"] = attrMap; + map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now())); + map["_values"].asMap()["heartbeat_interval"] = interval; + map["_values"].asMap()["epoch"] = bootSequence; + map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp); +} + void ManagementAgentImpl::ConnectionThread::run() { static const int delayMin(1); @@ -1055,9 +1084,9 @@ void ManagementAgentImpl::ConnectionThread::run() arg::exclusive=true); session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(), arg::bindingKey=queueName.str()); - session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(), + session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(), arg::bindingKey=agent.name_address); - session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(), + session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(), arg::bindingKey="console.#"); subscriptions->subscribe(agent, queueName.str(), dest); diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h index 4a58807e98..32cbbd7e08 100644 --- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -32,6 +32,7 @@ #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/PipeHandle.h" +#include "qpid/sys/Time.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -166,6 +167,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen bool connected; bool useMapMsg; std::string lastFailure; + std::string topicExchange; + std::string directExchange; + qpid::sys::Duration schemaTimestamp; bool clientWasAdded; uint32_t requestedBrokerBank; @@ -258,7 +262,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0); qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname, const std::string& cname, - const uint8_t *md5Sum); + const uint8_t *md5Sum, + uint8_t type=ManagementItem::CLASS_KIND_TABLE); bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq); void sendHeartbeat(); void sendException(const std::string& replyToKey, const std::string& cid, @@ -272,6 +277,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo); void handleConsoleAddedIndication(); + void getHeartbeatContent (qpid::types::Variant::Map& map); }; }} diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py index 385cfdfedd..77de1602f2 100644 --- a/qpid/extras/qmf/src/py/qmf/console.py +++ b/qpid/extras/qmf/src/py/qmf/console.py @@ -1004,6 +1004,7 @@ class Session: def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) + classKey._setType(kind) schema = self.schemaCache.getSchema(classKey) if not schema: @@ -1043,13 +1044,17 @@ class Session: def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) + classKey._setType(kind) _class = SchemaClass(kind, classKey, codec, self) - self.schemaCache.declareClass(classKey, _class) + new_pkg, new_cls = self.schemaCache.declareClass(classKey, _class) ctx = self.seqMgr._release(seq) if ctx: broker._decOutstanding() if self.console != None: - self.console.newClass(kind, classKey) + if new_pkg: + self.console.newPackage(classKey.getPackageName()) + if new_cls: + self.console.newClass(kind, classKey) if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): agent = self._getAgentForAgentAddr(agent_addr) @@ -1086,6 +1091,7 @@ class Session: agent.touch() if self.console and agent: self.console.heartbeat(agent, timestamp) + agent.update_schema_timestamp(values.get("schema_timestamp", 0)) def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): @@ -1297,6 +1303,8 @@ class Session: normally invoked on the object itself. """ schema = self.getSchema(schemaKey) + if not schema: + raise Exception("Schema not present (Key=%s)" % str(schemaKey)) for method in schema.getMethods(): if name == method.name: # @@ -1361,8 +1369,8 @@ class Session: if arg.dir.find("I") != -1: self._encodeValue(sendCodec, argList[aIdx], arg.type) aIdx += 1 - smsg = broker._message(sendCodec.encoded, "agent.%d.%s" % - (objectId.getBrokerBank(), objectId.getAgentBank())) + smsg = broker._message(sendCodec.encoded, "agent.%d.%s" % + (objectId.getBrokerBank(), objectId.getAgentBank())) broker._send(smsg) return seq return None @@ -1443,19 +1451,28 @@ class SchemaCache(object): self.lock.acquire() if packageName in self.packages: for pkey in self.packages[packageName]: - list.append(self.packages[packageName][pkey].getKey()) + if isinstance(self.packages[packageName][pkey], SchemaClass): + list.append(self.packages[packageName][pkey].getKey()) + elif self.packages[packageName][pkey] is not None: + # schema not present yet, but we have schema type + list.append(ClassKey({"_package_name": packageName, + "_class_name": pkey[0], + "_hash": pkey[1], + "_type": self.packages[packageName][pkey]})) finally: self.lock.release() return list def getSchema(self, classKey): - """ Get the schema for a QMF class """ + """ Get the schema for a QMF class, return None if schema not available """ pname = classKey.getPackageName() pkey = classKey.getPackageKey() try: self.lock.acquire() if pname in self.packages: - if pkey in self.packages[pname]: + if (pkey in self.packages[pname] and + isinstance(self.packages[pname][pkey], SchemaClass)): + # hack: value may be schema type info if schema not available return self.packages[pname][pkey] finally: self.lock.release() @@ -1472,21 +1489,31 @@ class SchemaCache(object): self.lock.release() return True - def declareClass(self, classKey, classDef): - """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """ + def declareClass(self, classKey, classDef=None): + """ Add a class definition to the cache, if supplied. Return a pair + indicating if the package or class is new. + """ + new_package = False + new_class = False pname = classKey.getPackageName() pkey = classKey.getPackageKey() try: self.lock.acquire() if pname not in self.packages: self.packages[pname] = {} + new_package = True packageMap = self.packages[pname] - if pkey in packageMap: - return None - packageMap[pkey] = classDef + if pkey not in packageMap: + new_class = True + # hack: if no classDef given, store the class type code until we get + # the full schema: + if classDef is None: + packageMap[pkey] = classKey.getType() + else: + packageMap[pkey] = classDef finally: self.lock.release() - return True + return (new_package, new_class) #=================================================================================================== @@ -1494,12 +1521,25 @@ class SchemaCache(object): #=================================================================================================== class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ + + TYPE_DATA = "_data" + TYPE_EVENT = "_event" + def __init__(self, constructor): if constructor.__class__ == str: # construct from __repr__ string try: - self.pname, cls = constructor.split(":") - self.cname, hsh = cls.split("(") + # supports two formats: + # type present = P:C:T(H) + # no type present = P:C(H) + tmp = constructor.split(":") + if len(tmp) == 3: + self.pname, self.cname, rem = tmp + self.type, hsh = rem.split("(") + else: + self.pname, rem = tmp + self.cname, hsh = rem.split("(") + self.type = None hsh = hsh.strip(")") hexValues = hsh.split("-") h0 = int(hexValues[0], 16) @@ -1517,6 +1557,7 @@ class ClassKey: self.pname = constructor['_package_name'] self.cname = constructor['_class_name'] self.hash = constructor['_hash'] + self.type = constructor['_type'] except: raise Exception("Invalid ClassKey map format") else: @@ -1525,14 +1566,20 @@ class ClassKey: self.pname = str(codec.read_str8()) self.cname = str(codec.read_str8()) self.hash = UUID(codec.read_bin128()) + # old V1 codec did not include "type" + self.type = None def encode(self, codec): + # old V1 codec did not include "type" codec.write_str8(self.pname) codec.write_str8(self.cname) codec.write_bin128(self.hash.bytes) def asMap(self): - return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash} + return {'_package_name': self.pname, + '_class_name': self.cname, + '_hash': self.hash, + '_type': self.type} def getPackageName(self): return self.pname @@ -1543,6 +1590,9 @@ class ClassKey: def getHash(self): return self.hash + def getType(self): + return self.type + def getHashString(self): return str(self.hash) @@ -1550,7 +1600,15 @@ class ClassKey: return (self.cname, self.hash) def __repr__(self): - return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + if self.type is None: + return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + return self.pname + ":" + self.cname + ":" + self.type + "(" + self.getHashString() + ")" + + def _setType(self, _type): + if _type == 2 or _type == ClassKey.TYPE_EVENT: + self.type = ClassKey.TYPE_EVENT + else: + self.type = ClassKey.TYPE_DATA #=================================================================================================== @@ -2640,6 +2698,7 @@ class Agent: self.lastSeenTime = time() self.closed = None self.epoch = 0 + self.schema_timestamp = None def _checkClosed(self): @@ -2660,7 +2719,21 @@ class Agent: elif 'qmf_event' in kwargs: if self.session.console: self.session.console.event(self.broker, kwargs['qmf_event']) - + elif 'qmf_schema_id' in kwargs: + ckey = kwargs['qmf_schema_id'] + new_pkg, new_cls = self.session.schemaCache.declareClass(ckey) + if self.session.console: + if new_pkg: + self.session.console.newPackage(ckey.getPackageName()) + if new_cls: + # translate V2's string based type value to legacy + # integer value for backward compatibility + cls_type = ckey.getType() + if str(cls_type) == ckey.TYPE_DATA: + cls_type = 1 + elif str(cls_type) == ckey.TYPE_EVENT: + cls_type = 2 + self.session.console.newClass(cls_type, ckey) def touch(self): if self.heartbeatInterval: @@ -2670,6 +2743,27 @@ class Agent: def setEpoch(self, epoch): self.epoch = epoch + def update_schema_timestamp(self, timestamp): + """ Check the latest schema timestamp from the agent V2 heartbeat. Issue a + query for all packages & classes should the timestamp change. + """ + self.lock.acquire() + try: + if self.schema_timestamp == timestamp: + return + self.schema_timestamp = timestamp + + context = RequestContext(self, self) + sequence = self.seqMgr._reserve(context) + + self.contextMap[sequence] = context + context.setSequence(sequence) + + finally: + self.lock.release() + + self._v2SendSchemaIdQuery(sequence, {}) + def epochMismatch(self, epoch): if epoch == 0 or self.epoch == 0: @@ -2914,7 +3008,8 @@ class Agent: def _v2HandleDataInd(self, mp, ah, content): """ - Handle a QMFv2 data indication from the agent + Handle a QMFv2 data indication from the agent. Note: called from context + of the Broker thread. """ if mp.correlation_id: try: @@ -2938,6 +3033,19 @@ class Agent: context.addV2QueryResult(omap) context.processV2Data() + elif kind == "_schema_id": + for sid in content: + try: + ckey = ClassKey(sid) + except: + # @todo: log error + ckey = None + if ckey is not None: + # @todo: for now, the application cannot directly send a query for + # _schema_id. This request _must_ have been initiated by the framework + # in order to update the schema cache. + context.notifiable(qmf_schema_id=ckey) + if 'partial' not in ah: context.signal() @@ -3053,6 +3161,25 @@ class Agent: self.broker._send(smsg) + def _v2SendQuery(self, query, sequence): + """ + Given a query map, construct and send a V2 Query message. + """ + dp = self.broker.amqpSession.delivery_properties() + dp.routing_key = self.getV2RoutingKey() + mp = self.broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.broker.authUser + mp.correlation_id = str(sequence) + mp.app_id = "qmf2" + mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue) + mp.application_headers = {'qmf.opcode':'_query_request'} + sendCodec = Codec() + sendCodec.write_map(query) + msg = Message(dp, mp, sendCodec.encoded) + self.broker._send(msg, "qmf.default.direct") + + def _v2SendGetQuery(self, sequence, kwargs): """ Send a get query to a QMFv2 agent. @@ -3071,22 +3198,20 @@ class Agent: elif '_objectId' in kwargs: query['_object_id'] = kwargs['_objectId'].asMap() + self._v2SendQuery(query, sequence) + + + def _v2SendSchemaIdQuery(self, sequence, kwargs): + """ + Send a query for all schema ids to a QMFv2 agent. + """ # - # Construct and transmit the message + # Build the query map # - dp = self.broker.amqpSession.delivery_properties() - dp.routing_key = self.getV2RoutingKey() - mp = self.broker.amqpSession.message_properties() - mp.content_type = "amqp/map" - mp.user_id = self.broker.authUser - mp.correlation_id = str(sequence) - mp.app_id = "qmf2" - mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue) - mp.application_headers = {'qmf.opcode':'_query_request'} - sendCodec = Codec() - sendCodec.write_map(query) - msg = Message(dp, mp, sendCodec.encoded) - self.broker._send(msg, "qmf.default.direct") + query = {'_what': 'SCHEMA_ID'} + # @todo - predicate support. For now, return all known schema ids. + + self._v2SendQuery(query, sequence) def _v2SendSchemaRequest(self, schemaId): @@ -3106,7 +3231,8 @@ class Agent: def _handleQmfV1Message(self, opcode, seq, mp, ah, codec): """ - Process QMFv1 messages arriving from an agent. + Process QMFv1 messages arriving from an agent. Note well: this method is + called from the context of the Broker thread. """ if opcode == 'm': self._v1HandleMethodResp(codec, seq) elif opcode == 'e': self._v1HandleEventInd(codec, seq) @@ -3117,7 +3243,8 @@ class Agent: def _handleQmfV2Message(self, opcode, mp, ah, content): """ - Process QMFv2 messages arriving from an agent. + Process QMFv2 messages arriving from an agent. Note well: this method is + called from the context of the Broker thread. """ if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) @@ -3335,6 +3462,7 @@ class Event: self.session = agent.session self.broker = agent.broker self.classKey = ClassKey(codec) + self.classKey._setType(ClassKey.TYPE_EVENT) self.timestamp = codec.read_int64() self.severity = codec.read_uint8() self.arguments = {} @@ -3377,9 +3505,6 @@ class Event: def getTimestamp(self): return self.timestamp - def getName(self): - return self.name - def getSchema(self): return self.schema @@ -3391,7 +3516,7 @@ class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ def __init__(self): self.lock = Lock() - self.sequence = 0 + self.sequence = long(time()) # pseudo-randomize the start self.pending = {} def _reserve(self, data): |
