summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorKenneth Anthony Giusti <kgiusti@apache.org>2010-07-12 20:38:07 +0000
committerKenneth Anthony Giusti <kgiusti@apache.org>2010-07-12 20:38:07 +0000
commita6f17cb659cd73005f55edebdd1b78db043d73a9 (patch)
treeccb7d97f34ebd10aa38b087bac488ea234dfe868 /qpid
parentd1882643cfca013fd9ee0b9cb0687975cab7aa60 (diff)
downloadqpid-python-a6f17cb659cd73005f55edebdd1b78db043d73a9.tar.gz
QMF: enable python console to pull schema info from agent.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@963479 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp233
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h8
-rw-r--r--qpid/extras/qmf/src/py/qmf/console.py203
3 files changed, 302 insertions, 142 deletions
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 351e0bfd00..d431e4ca16 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -88,6 +88,8 @@ ManagementAgentImpl::ManagementAgentImpl() :
interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
notifyable(0), inCallback(false),
initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
+ topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"),
+ schemaTimestamp(Duration(EPOCH, now())),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
connThreadBody(*this), connThread(connThreadBody),
@@ -296,7 +298,8 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
event.getEventName(),
- event.getMd5Sum());
+ event.getMd5Sum(),
+ ManagementItem::CLASS_KIND_EVENT);
event.mapEncode(values);
map_["_values"] = values;
map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
@@ -308,7 +311,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
headers["qmf.agent"] = name_address;
MapCodec::encode(map_, content);
- connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", key.str());
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str());
}
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
@@ -404,7 +407,6 @@ void ManagementAgentImpl::retrieveData()
void ManagementAgentImpl::sendHeartbeat()
{
- static const string addr_exchange("qmf.default.topic");
static const string addr_key_base("agent.ind.heartbeat");
Variant::Map map;
@@ -429,13 +431,9 @@ void ManagementAgentImpl::sendHeartbeat()
headers["qmf.opcode"] = "_agent_heartbeat_indication";
headers["qmf.agent"] = name_address;
- map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
-
+ getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, "", headers, addr_exchange, addr_key.str());
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str());
QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
}
@@ -443,8 +441,6 @@ void ManagementAgentImpl::sendHeartbeat()
void ManagementAgentImpl::sendException(const string& replyToKey, const string& cid,
const string& text, uint32_t code)
{
- static const string addr_exchange("qmf.default.direct");
-
Variant::Map map;
Variant::Map headers;
Variant::Map values;
@@ -459,7 +455,7 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string&
map["_values"] = values;
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyToKey);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyToKey);
QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
}
@@ -573,7 +569,7 @@ void ManagementAgentImpl::invokeMethodRequest(const string& body, const string&
headers["qmf.opcode"] = "_method_response";
QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
MapCodec::encode(outMap, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
}
}
@@ -590,7 +586,6 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
headers["method"] = "response";
headers["qmf.opcode"] = "_query_response";
- headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
headers["partial"] = Variant();
@@ -614,80 +609,25 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
return;
}
- if (i->second.asString() != "OBJECT") {
- sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
- return;
- }
-
- string className;
- string packageName;
-
- /*
- * Handle the _schema_id element, if supplied.
- */
- i = inMap.find("_schema_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- const Variant::Map& schemaIdMap(i->second.asMap());
-
- Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- className = s_iter->second.asString();
-
- s_iter = schemaIdMap.find("_package_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- packageName = s_iter->second.asString();
- }
-
- /*
- * Unpack the _object_id element of the query if it is present. If it is present, find that one
- * object and return it. If it is not present, send a class-based result.
- */
- i = inMap.find("_object_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- ObjectId objId(i->second.asMap());
-
- ManagementObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end()) {
- ManagementObject* object = iter->second;
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- objId.mapEncode(oidMap);
- map_["_values"] = values;
- map_["_object_id"] = oidMap;
- object->writeTimestamps(map_);
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
-
- list_.push_back(map_);
- headers.erase("partial");
-
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
- return;
- }
- } else {
- for (ManagementObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second;
- if (object->getClassName() == className &&
- (packageName.empty() || object->getPackageName() == packageName)) {
+ if (i->second.asString() == "OBJECT") {
+ headers["qmf.content"] = "_data";
+ /*
+ * Unpack the _object_id element of the query if it is present. If it is present, find that one
+ * object and return it. If it is not present, send a class-based result.
+ */
+ i = inMap.find("_object_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ ObjectId objId(i->second.asMap());
- // @todo support multiple object reply per message
- values.clear();
- list_.clear();
- oidMap.clear();
+ ManagementObjectMap::iterator iter = managementObjects.find(objId);
+ if (iter != managementObjects.end()) {
+ ManagementObject* object = iter->second;
if (object->getConfigChanged() || object->getInstChanged())
object->setUpdateTime();
object->mapEncodeValues(values, true, true); // write both stats and properties
- iter->first.mapEncode(oidMap);
+ objId.mapEncode(oidMap);
map_["_values"] = values;
map_["_object_id"] = oidMap;
object->writeTimestamps(map_);
@@ -695,26 +635,101 @@ void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid,
object->getClassName(),
object->getMd5Sum());
list_.push_back(map_);
+ headers.erase("partial");
ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << replyTo);
+ return;
+ }
+ } else { // match using schema_id, if supplied
+
+ string className;
+ string packageName;
+
+ i = inMap.find("_schema_id");
+ if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
+ const Variant::Map& schemaIdMap(i->second.asMap());
+
+ Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ className = s_iter->second.asString();
+
+ s_iter = schemaIdMap.find("_package_name");
+ if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
+ packageName = s_iter->second.asString();
+
+ for (ManagementObjectMap::iterator iter = managementObjects.begin();
+ iter != managementObjects.end();
+ iter++) {
+ ManagementObject* object = iter->second;
+ if (object->getClassName() == className &&
+ (packageName.empty() || object->getPackageName() == packageName)) {
+
+ // @todo support multiple object reply per message
+ values.clear();
+ list_.clear();
+ oidMap.clear();
+
+ if (object->getConfigChanged() || object->getInstChanged())
+ object->setUpdateTime();
+
+ object->mapEncodeValues(values, true, true); // write both stats and properties
+ iter->first.mapEncode(oidMap);
+ map_["_values"] = values;
+ map_["_object_id"] = oidMap;
+ object->writeTimestamps(map_);
+ map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
+ object->getClassName(),
+ object->getMd5Sum());
+ list_.push_back(map_);
+
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << replyTo);
+ }
+ }
}
}
- }
- // Send empty "non-partial" message to indicate CommandComplete
- list_.clear();
- headers.erase("partial");
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, "qmf.default.direct", replyTo, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+ // Send empty "non-partial" message to indicate CommandComplete
+ list_.clear();
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (empty with no 'partial' indicator) to=" << replyTo);
+
+ } else if (i->second.asString() == "SCHEMA_ID") {
+ headers["qmf.content"] = "_schema_id";
+ /**
+ * @todo - support for a predicate. For now, send a list of all known schema class keys.
+ */
+ for (PackageMap::iterator pIter = packages.begin();
+ pIter != packages.end(); pIter++) {
+ for (ClassMap::iterator cIter = pIter->second.begin();
+ cIter != pIter->second.end(); cIter++) {
+
+ list_.push_back(mapEncodeSchemaId( pIter->first,
+ cIter->first.name,
+ cIter->first.hash,
+ cIter->second.kind ));
+ }
+ }
+
+ headers.erase("partial");
+ ListCodec::encode(list_, content);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo, "amqp/list");
+ QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << replyTo);
+
+ } else {
+ // Unknown query target
+ sendException(replyTo, cid, "Query for _what => '" + i->second.asString() + "' not supported");
+ }
}
void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& replyTo)
{
QPID_LOG(trace, "RCVD AgentLocateRequest");
- static const string addr_exchange("qmf.default.direct");
Variant::Map map;
Variant::Map headers;
@@ -724,13 +739,9 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid,
headers["qmf.opcode"] = "_agent_locate_response";
headers["qmf.agent"] = name_address;
- map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
- map["_values"].asMap()["heartbeat_interval"] = interval;
- map["_values"].asMap()["epoch"] = bootSequence;
-
+ getHeartbeatContent(map);
MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, addr_exchange, replyTo);
+ connThreadBody.sendBuffer(content, cid, headers, directExchange, replyTo);
QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
@@ -822,13 +833,19 @@ void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq
Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
const string& cname,
- const uint8_t *md5Sum)
+ const uint8_t *md5Sum,
+ uint8_t type)
{
Variant::Map map_;
map_["_package_name"] = pname;
map_["_class_name"] = cname;
map_["_hash"] = types::Uuid(md5Sum);
+ if (type == ManagementItem::CLASS_KIND_EVENT)
+ map_["_type"] = "_event";
+ else
+ map_["_type"] = "_data";
+
return map_;
}
@@ -901,6 +918,8 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind,
// No such class found, create a new class with local information.
cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
+ schemaTimestamp = Duration(EPOCH, now());
+ QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp));
}
void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
@@ -1014,7 +1033,7 @@ void ManagementAgentImpl::periodicProcessing()
headers["qmf.content"] = "_data";
headers["qmf.agent"] = name_address;
- connThreadBody.sendBuffer(content, "", headers, "qmf.default.topic", "agent.ind.data", "amqp/list");
+ connThreadBody.sendBuffer(content, "", headers, topicExchange, "agent.ind.data", "amqp/list");
QPID_LOG(trace, "SENT DataIndication");
}
}
@@ -1031,6 +1050,16 @@ void ManagementAgentImpl::periodicProcessing()
sendHeartbeat();
}
+
+void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map)
+{
+ map["_values"] = attrMap;
+ map["_values"].asMap()["timestamp"] = uint64_t(Duration(EPOCH, now()));
+ map["_values"].asMap()["heartbeat_interval"] = interval;
+ map["_values"].asMap()["epoch"] = bootSequence;
+ map["_values"].asMap()["schema_timestamp"] = uint64_t(schemaTimestamp);
+}
+
void ManagementAgentImpl::ConnectionThread::run()
{
static const int delayMin(1);
@@ -1055,9 +1084,9 @@ void ManagementAgentImpl::ConnectionThread::run()
arg::exclusive=true);
session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
arg::bindingKey=queueName.str());
- session.exchangeBind(arg::exchange="qmf.default.direct", arg::queue=queueName.str(),
+ session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(),
arg::bindingKey=agent.name_address);
- session.exchangeBind(arg::exchange="qmf.default.topic", arg::queue=queueName.str(),
+ session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(),
arg::bindingKey="console.#");
subscriptions->subscribe(agent, queueName.str(), dest);
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
index 4a58807e98..32cbbd7e08 100644
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -32,6 +32,7 @@
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
#include "qpid/sys/PipeHandle.h"
+#include "qpid/sys/Time.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
@@ -166,6 +167,9 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
bool connected;
bool useMapMsg;
std::string lastFailure;
+ std::string topicExchange;
+ std::string directExchange;
+ qpid::sys::Duration schemaTimestamp;
bool clientWasAdded;
uint32_t requestedBrokerBank;
@@ -258,7 +262,8 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
const std::string& cname,
- const uint8_t *md5Sum);
+ const uint8_t *md5Sum,
+ uint8_t type=ManagementItem::CLASS_KIND_TABLE);
bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
void sendHeartbeat();
void sendException(const std::string& replyToKey, const std::string& cid,
@@ -272,6 +277,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& replyTo);
void handleConsoleAddedIndication();
+ void getHeartbeatContent (qpid::types::Variant::Map& map);
};
}}
diff --git a/qpid/extras/qmf/src/py/qmf/console.py b/qpid/extras/qmf/src/py/qmf/console.py
index 385cfdfedd..77de1602f2 100644
--- a/qpid/extras/qmf/src/py/qmf/console.py
+++ b/qpid/extras/qmf/src/py/qmf/console.py
@@ -1004,6 +1004,7 @@ class Session:
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
+ classKey._setType(kind)
schema = self.schemaCache.getSchema(classKey)
if not schema:
@@ -1043,13 +1044,17 @@ class Session:
def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
+ classKey._setType(kind)
_class = SchemaClass(kind, classKey, codec, self)
- self.schemaCache.declareClass(classKey, _class)
+ new_pkg, new_cls = self.schemaCache.declareClass(classKey, _class)
ctx = self.seqMgr._release(seq)
if ctx:
broker._decOutstanding()
if self.console != None:
- self.console.newClass(kind, classKey)
+ if new_pkg:
+ self.console.newPackage(classKey.getPackageName())
+ if new_cls:
+ self.console.newClass(kind, classKey)
if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
agent = self._getAgentForAgentAddr(agent_addr)
@@ -1086,6 +1091,7 @@ class Session:
agent.touch()
if self.console and agent:
self.console.heartbeat(agent, timestamp)
+ agent.update_schema_timestamp(values.get("schema_timestamp", 0))
def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
@@ -1297,6 +1303,8 @@ class Session:
normally invoked on the object itself.
"""
schema = self.getSchema(schemaKey)
+ if not schema:
+ raise Exception("Schema not present (Key=%s)" % str(schemaKey))
for method in schema.getMethods():
if name == method.name:
#
@@ -1361,8 +1369,8 @@ class Session:
if arg.dir.find("I") != -1:
self._encodeValue(sendCodec, argList[aIdx], arg.type)
aIdx += 1
- smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
- (objectId.getBrokerBank(), objectId.getAgentBank()))
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (objectId.getBrokerBank(), objectId.getAgentBank()))
broker._send(smsg)
return seq
return None
@@ -1443,19 +1451,28 @@ class SchemaCache(object):
self.lock.acquire()
if packageName in self.packages:
for pkey in self.packages[packageName]:
- list.append(self.packages[packageName][pkey].getKey())
+ if isinstance(self.packages[packageName][pkey], SchemaClass):
+ list.append(self.packages[packageName][pkey].getKey())
+ elif self.packages[packageName][pkey] is not None:
+ # schema not present yet, but we have schema type
+ list.append(ClassKey({"_package_name": packageName,
+ "_class_name": pkey[0],
+ "_hash": pkey[1],
+ "_type": self.packages[packageName][pkey]}))
finally:
self.lock.release()
return list
def getSchema(self, classKey):
- """ Get the schema for a QMF class """
+ """ Get the schema for a QMF class, return None if schema not available """
pname = classKey.getPackageName()
pkey = classKey.getPackageKey()
try:
self.lock.acquire()
if pname in self.packages:
- if pkey in self.packages[pname]:
+ if (pkey in self.packages[pname] and
+ isinstance(self.packages[pname][pkey], SchemaClass)):
+ # hack: value may be schema type info if schema not available
return self.packages[pname][pkey]
finally:
self.lock.release()
@@ -1472,21 +1489,31 @@ class SchemaCache(object):
self.lock.release()
return True
- def declareClass(self, classKey, classDef):
- """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """
+ def declareClass(self, classKey, classDef=None):
+ """ Add a class definition to the cache, if supplied. Return a pair
+ indicating if the package or class is new.
+ """
+ new_package = False
+ new_class = False
pname = classKey.getPackageName()
pkey = classKey.getPackageKey()
try:
self.lock.acquire()
if pname not in self.packages:
self.packages[pname] = {}
+ new_package = True
packageMap = self.packages[pname]
- if pkey in packageMap:
- return None
- packageMap[pkey] = classDef
+ if pkey not in packageMap:
+ new_class = True
+ # hack: if no classDef given, store the class type code until we get
+ # the full schema:
+ if classDef is None:
+ packageMap[pkey] = classKey.getType()
+ else:
+ packageMap[pkey] = classDef
finally:
self.lock.release()
- return True
+ return (new_package, new_class)
#===================================================================================================
@@ -1494,12 +1521,25 @@ class SchemaCache(object):
#===================================================================================================
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
+
+ TYPE_DATA = "_data"
+ TYPE_EVENT = "_event"
+
def __init__(self, constructor):
if constructor.__class__ == str:
# construct from __repr__ string
try:
- self.pname, cls = constructor.split(":")
- self.cname, hsh = cls.split("(")
+ # supports two formats:
+ # type present = P:C:T(H)
+ # no type present = P:C(H)
+ tmp = constructor.split(":")
+ if len(tmp) == 3:
+ self.pname, self.cname, rem = tmp
+ self.type, hsh = rem.split("(")
+ else:
+ self.pname, rem = tmp
+ self.cname, hsh = rem.split("(")
+ self.type = None
hsh = hsh.strip(")")
hexValues = hsh.split("-")
h0 = int(hexValues[0], 16)
@@ -1517,6 +1557,7 @@ class ClassKey:
self.pname = constructor['_package_name']
self.cname = constructor['_class_name']
self.hash = constructor['_hash']
+ self.type = constructor['_type']
except:
raise Exception("Invalid ClassKey map format")
else:
@@ -1525,14 +1566,20 @@ class ClassKey:
self.pname = str(codec.read_str8())
self.cname = str(codec.read_str8())
self.hash = UUID(codec.read_bin128())
+ # old V1 codec did not include "type"
+ self.type = None
def encode(self, codec):
+ # old V1 codec did not include "type"
codec.write_str8(self.pname)
codec.write_str8(self.cname)
codec.write_bin128(self.hash.bytes)
def asMap(self):
- return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
+ return {'_package_name': self.pname,
+ '_class_name': self.cname,
+ '_hash': self.hash,
+ '_type': self.type}
def getPackageName(self):
return self.pname
@@ -1543,6 +1590,9 @@ class ClassKey:
def getHash(self):
return self.hash
+ def getType(self):
+ return self.type
+
def getHashString(self):
return str(self.hash)
@@ -1550,7 +1600,15 @@ class ClassKey:
return (self.cname, self.hash)
def __repr__(self):
- return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+ if self.type is None:
+ return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+ return self.pname + ":" + self.cname + ":" + self.type + "(" + self.getHashString() + ")"
+
+ def _setType(self, _type):
+ if _type == 2 or _type == ClassKey.TYPE_EVENT:
+ self.type = ClassKey.TYPE_EVENT
+ else:
+ self.type = ClassKey.TYPE_DATA
#===================================================================================================
@@ -2640,6 +2698,7 @@ class Agent:
self.lastSeenTime = time()
self.closed = None
self.epoch = 0
+ self.schema_timestamp = None
def _checkClosed(self):
@@ -2660,7 +2719,21 @@ class Agent:
elif 'qmf_event' in kwargs:
if self.session.console:
self.session.console.event(self.broker, kwargs['qmf_event'])
-
+ elif 'qmf_schema_id' in kwargs:
+ ckey = kwargs['qmf_schema_id']
+ new_pkg, new_cls = self.session.schemaCache.declareClass(ckey)
+ if self.session.console:
+ if new_pkg:
+ self.session.console.newPackage(ckey.getPackageName())
+ if new_cls:
+ # translate V2's string based type value to legacy
+ # integer value for backward compatibility
+ cls_type = ckey.getType()
+ if str(cls_type) == ckey.TYPE_DATA:
+ cls_type = 1
+ elif str(cls_type) == ckey.TYPE_EVENT:
+ cls_type = 2
+ self.session.console.newClass(cls_type, ckey)
def touch(self):
if self.heartbeatInterval:
@@ -2670,6 +2743,27 @@ class Agent:
def setEpoch(self, epoch):
self.epoch = epoch
+ def update_schema_timestamp(self, timestamp):
+ """ Check the latest schema timestamp from the agent V2 heartbeat. Issue a
+ query for all packages & classes should the timestamp change.
+ """
+ self.lock.acquire()
+ try:
+ if self.schema_timestamp == timestamp:
+ return
+ self.schema_timestamp = timestamp
+
+ context = RequestContext(self, self)
+ sequence = self.seqMgr._reserve(context)
+
+ self.contextMap[sequence] = context
+ context.setSequence(sequence)
+
+ finally:
+ self.lock.release()
+
+ self._v2SendSchemaIdQuery(sequence, {})
+
def epochMismatch(self, epoch):
if epoch == 0 or self.epoch == 0:
@@ -2914,7 +3008,8 @@ class Agent:
def _v2HandleDataInd(self, mp, ah, content):
"""
- Handle a QMFv2 data indication from the agent
+ Handle a QMFv2 data indication from the agent. Note: called from context
+ of the Broker thread.
"""
if mp.correlation_id:
try:
@@ -2938,6 +3033,19 @@ class Agent:
context.addV2QueryResult(omap)
context.processV2Data()
+ elif kind == "_schema_id":
+ for sid in content:
+ try:
+ ckey = ClassKey(sid)
+ except:
+ # @todo: log error
+ ckey = None
+ if ckey is not None:
+ # @todo: for now, the application cannot directly send a query for
+ # _schema_id. This request _must_ have been initiated by the framework
+ # in order to update the schema cache.
+ context.notifiable(qmf_schema_id=ckey)
+
if 'partial' not in ah:
context.signal()
@@ -3053,6 +3161,25 @@ class Agent:
self.broker._send(smsg)
+ def _v2SendQuery(self, query, sequence):
+ """
+ Given a query map, construct and send a V2 Query message.
+ """
+ dp = self.broker.amqpSession.delivery_properties()
+ dp.routing_key = self.getV2RoutingKey()
+ mp = self.broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.broker.authUser
+ mp.correlation_id = str(sequence)
+ mp.app_id = "qmf2"
+ mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
+ mp.application_headers = {'qmf.opcode':'_query_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(query)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self.broker._send(msg, "qmf.default.direct")
+
+
def _v2SendGetQuery(self, sequence, kwargs):
"""
Send a get query to a QMFv2 agent.
@@ -3071,22 +3198,20 @@ class Agent:
elif '_objectId' in kwargs:
query['_object_id'] = kwargs['_objectId'].asMap()
+ self._v2SendQuery(query, sequence)
+
+
+ def _v2SendSchemaIdQuery(self, sequence, kwargs):
+ """
+ Send a query for all schema ids to a QMFv2 agent.
+ """
#
- # Construct and transmit the message
+ # Build the query map
#
- dp = self.broker.amqpSession.delivery_properties()
- dp.routing_key = self.getV2RoutingKey()
- mp = self.broker.amqpSession.message_properties()
- mp.content_type = "amqp/map"
- mp.user_id = self.broker.authUser
- mp.correlation_id = str(sequence)
- mp.app_id = "qmf2"
- mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_direct_queue)
- mp.application_headers = {'qmf.opcode':'_query_request'}
- sendCodec = Codec()
- sendCodec.write_map(query)
- msg = Message(dp, mp, sendCodec.encoded)
- self.broker._send(msg, "qmf.default.direct")
+ query = {'_what': 'SCHEMA_ID'}
+ # @todo - predicate support. For now, return all known schema ids.
+
+ self._v2SendQuery(query, sequence)
def _v2SendSchemaRequest(self, schemaId):
@@ -3106,7 +3231,8 @@ class Agent:
def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
"""
- Process QMFv1 messages arriving from an agent.
+ Process QMFv1 messages arriving from an agent. Note well: this method is
+ called from the context of the Broker thread.
"""
if opcode == 'm': self._v1HandleMethodResp(codec, seq)
elif opcode == 'e': self._v1HandleEventInd(codec, seq)
@@ -3117,7 +3243,8 @@ class Agent:
def _handleQmfV2Message(self, opcode, mp, ah, content):
"""
- Process QMFv2 messages arriving from an agent.
+ Process QMFv2 messages arriving from an agent. Note well: this method is
+ called from the context of the Broker thread.
"""
if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content)
@@ -3335,6 +3462,7 @@ class Event:
self.session = agent.session
self.broker = agent.broker
self.classKey = ClassKey(codec)
+ self.classKey._setType(ClassKey.TYPE_EVENT)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
self.arguments = {}
@@ -3377,9 +3505,6 @@ class Event:
def getTimestamp(self):
return self.timestamp
- def getName(self):
- return self.name
-
def getSchema(self):
return self.schema
@@ -3391,7 +3516,7 @@ class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
def __init__(self):
self.lock = Lock()
- self.sequence = 0
+ self.sequence = long(time()) # pseudo-randomize the start
self.pending = {}
def _reserve(self, data):