diff options
| author | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-03-31 21:13:12 +0000 |
| commit | 2e29faa768283390452b7d432db28d43cd4a27aa (patch) | |
| tree | 521e9711b340a330408245ba699b35d12b36ce9c /extras/qmf | |
| parent | 5e50981ac8a35db09723ad19f5994703d00e10d9 (diff) | |
| download | qpid-python-2e29faa768283390452b7d432db28d43cd4a27aa.tar.gz | |
Merged the changes from the qmf-devel0.7a branch back to the trunk.
This is a checkpoint along the QMFv2 development path.
This update introduces portions of QMFv2 into the code:
- The C++ agent (qpid/agent) uses QMFv2 for data and method transfer
o The APIs no longer use qpid::framing::*
o Consequently, boost is no longer referenced from the API headers.
o Agents and Objects are now referenced by strings, not numbers.
o Schema transfer still uses the QMFv1 format.
- The broker-resident agent can use QMFv1 or QMFv2 based on the command line options.
It defaults to QMFv1 for compatibility.
- The pure-python QMF console (qmf.console) can concurrently interact with both
QMFv1 and QMFv2 agents.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@929716 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'extras/qmf')
| -rw-r--r-- | extras/qmf/src/py/qmf/console.py | 1717 |
1 files changed, 1412 insertions, 305 deletions
diff --git a/extras/qmf/src/py/qmf/console.py b/extras/qmf/src/py/qmf/console.py index 7bda233b9a..3d32ae36f7 100644 --- a/extras/qmf/src/py/qmf/console.py +++ b/extras/qmf/src/py/qmf/console.py @@ -41,6 +41,9 @@ from cStringIO import StringIO #import qpid.log #qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG) +#=================================================================================================== +# CONSOLE +#=================================================================================================== class Console: """ To access the asynchronous operations, a class must be derived from Console with overrides of any combination of the available methods. """ @@ -94,6 +97,10 @@ class Console: """ Invoked when a method response from an asynchronous method call is received. """ pass + +#=================================================================================================== +# BrokerURL +#=================================================================================================== class BrokerURL(URL): def __init__(self, text): URL.__init__(self, text) @@ -115,16 +122,30 @@ class BrokerURL(URL): def match(self, host, port): return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4] + +#=================================================================================================== +# Object +#=================================================================================================== class Object(object): - """ This class defines a 'proxy' object representing a real managed object on an agent. - Actions taken on this proxy are remotely affected on the real managed object. """ - def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}): - self._session = session - self._broker = broker + This class defines a 'proxy' object representing a real managed object on an agent. + Actions taken on this proxy are remotely affected on the real managed object. + """ + def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}): + self._agent = agent + self._session = None + self._broker = None + if agent: + self._session = agent.session + self._broker = agent.broker self._schema = schema - self._managed = managed - if self._managed: + self._properties = [] + self._statistics = [] + if v2Map: + self.v2Init(v2Map, agentName) + return + + if self._agent: self._currentTime = codec.read_uint64() self._createTime = codec.read_uint64() self._deleteTime = codec.read_uint64() @@ -134,8 +155,6 @@ class Object(object): self._createTime = None self._deleteTime = None self._objectId = None - self._properties = [] - self._statistics = [] if codec: if prop: notPresent = self._parsePresenceMasks(codec, schema) @@ -143,18 +162,38 @@ class Object(object): if property.name in notPresent: self._properties.append((property, None)) else: - self._properties.append((property, self._session._decodeValue(codec, property.type, broker))) + self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker))) if stat: for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker))) + self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker))) else: for property in schema.getProperties(): if property.optional: self._properties.append((property, None)) else: - self._properties.append((property, self._session._defaultValue(property, broker, kwargs))) + self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs))) for statistic in schema.getStatistics(): - self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs))) + self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs))) + + def v2Init(self, omap, agentName): + if omap.__class__ != dict: + raise Exception("QMFv2 object data must be a map/dict") + if '_values' not in omap: + raise Exception("QMFv2 object must have '_values' element") + + values = omap['_values'] + for prop in self._schema.getProperties(): + if prop.name in values: + self._properties.append((prop, values[prop.name])) + for stat in self._schema.getStatistics(): + if stat.name in values: + self._statistics.append((stat, values[stat.name])) + if '_subtypes' in omap: + self._subtypes = omap['_subtypes'] + if '_object_id' in omap: + self._objectId = ObjectId(omap['_object_id'], agentName=agentName) + else: + self._objectId = None def getBroker(self): """ Return the broker from which this object was sent """ @@ -186,7 +225,7 @@ class Object(object): def isManaged(self): """ Return True iff this object is a proxy for a managed object on an agent. """ - return self._managed + return self._objectId and self._agent def getIndex(self): """ Return a string describing this object's primary key. """ @@ -225,7 +264,7 @@ class Object(object): """ Contact the agent and retrieve the lastest property and statistic values for this object. """ if not self.isManaged(): raise Exception("Object is not managed") - obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker) + obj = self._agent.getObjects(_objectId=self._objectId) if obj: self.mergeUpdate(obj[0]) else: @@ -244,17 +283,17 @@ class Object(object): for method in self._schema.getMethods(): if name == method.name: return lambda *args, **kwargs : self._invoke(name, args, kwargs) - for property, value in self._properties: - if name == property.name: + for prop, value in self._properties: + if name == prop.name: return value - if name == "_" + property.name + "_" and property.type == 10: # Dereference references - deref = self._session.getObjects(_objectId=value, _broker=self._broker) + if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references + deref = self._agent.getObjects(_objectId=value) if len(deref) != 1: return None else: return deref[0] - for statistic, value in self._statistics: - if name == statistic.name: + for stat, value in self._statistics: + if name == stat.name: return value raise Exception("Type Object has no attribute '%s'" % name) @@ -282,10 +321,6 @@ class Object(object): aIdx = 0 sendCodec = Codec() seq = self._session.seqMgr._reserve((method, synchronous)) - self._broker._setHeader(sendCodec, 'M', seq) - self._objectId.encode(sendCodec) - self._schema.getKey().encode(sendCodec) - sendCodec.write_str8(name) count = 0 for arg in method.arguments: @@ -294,24 +329,64 @@ class Object(object): if count != len(args): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args))) - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._session._encodeValue(sendCodec, args[aIdx], arg.type) - aIdx += 1 - if timeWait: - ttl = timeWait * 1000 + if self._agent.isV2: + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = self._objectId.asMap() + call['_method_name'] = name + argMap = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + argMap[arg.name] = args[aIdx] + aIdx += 1 + call['_arguments'] = argMap + + dp = self._broker.amqpSession.delivery_properties() + dp.routing_key = self._objectId.getAgentBank() + mp = self._broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self._broker.authUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + smsg = Message(dp, mp, sendCodec.encoded) + exchange = "qmf.default.direct" + else: - ttl = None - smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), - ttl=ttl) + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank()) + self._broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + self._broker._setHeader(sendCodec, 'M', seq) + self._objectId.encode(sendCodec) + self._schema.getKey().encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._session._encodeValue(sendCodec, args[aIdx], arg.type) + aIdx += 1 + smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" % + (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + exchange = "qpid.management" + if synchronous: try: self._broker.cv.acquire() self._broker.syncInFlight = True finally: self._broker.cv.release() - self._broker._send(smsg) + self._broker._send(smsg, exchange) return seq return None @@ -352,7 +427,6 @@ class Object(object): raise Exception("Invalid Method (software defect) [%s]" % name) def _encodeUnmanaged(self, codec): - codec.write_uint8(20) codec.write_str8(self._schema.getKey().getPackageName()) codec.write_str8(self._schema.getKey().getClassName()) @@ -399,6 +473,10 @@ class Object(object): bit = 0 return excludeList + +#=================================================================================================== +# Session +#=================================================================================================== class Session: """ An instance of the Session class represents a console session running @@ -423,6 +501,7 @@ class Session: list: 21 } + def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): """ @@ -450,7 +529,7 @@ class Session: """ self.console = console self.brokers = [] - self.packages = {} + self.schemaCache = SchemaCache() self.seqMgr = SequenceManager() self.cv = Condition() self.syncSequenceList = [] @@ -471,9 +550,35 @@ class Session: if self.userBindings and not self.rcvObjects: raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided") + + def _getBrokerForAgentAddr(self, agent_addr): + try: + self.cv.acquire() + key = (1, agent_addr) + for b in self.brokers: + if key in b.agents: + return b + finally: + self.cv.release() + return None + + + def _getAgentForAgentAddr(self, agent_addr): + try: + self.cv.acquire() + key = agent_addr + for b in self.brokers: + if key in b.agents: + return b.agents[key] + finally: + self.cv.release() + return None + + def __repr__(self): return "QMF Console Session Manager (brokers: %d)" % len(self.brokers) + def addBroker(self, target="localhost", timeout=None, mechanisms=None): """ Connect to a Qpid broker. Returns an object of type Broker. """ url = BrokerURL(target) @@ -482,9 +587,12 @@ class Session: self.brokers.append(broker) if not self.manageConnections: - self.getObjects(broker=broker, _class="agent") + agent = broker.getAgent(1,0) + if agent: + agent.getObjects(_class="agent") return broker + def delBroker(self, broker): """ Disconnect from a broker. The 'broker' argument is the object returned from the addBroker call """ @@ -495,34 +603,27 @@ class Session: self.brokers.remove(broker) del broker + def getPackages(self): """ Get the list of known QMF packages """ for broker in self.brokers: broker._waitForStable() - list = [] - for package in self.packages: - list.append(package) - return list + return self.schemaCache.getPackages() + def getClasses(self, packageName): """ Get the list of known classes within a QMF package """ for broker in self.brokers: broker._waitForStable() - list = [] - if packageName in self.packages: - for pkey in self.packages[packageName]: - list.append(self.packages[packageName][pkey].getKey()) - return list + return self.schemaCache.getClasses(packageName) + def getSchema(self, classKey): """ Get the schema for a QMF class """ for broker in self.brokers: broker._waitForStable() - pname = classKey.getPackageName() - pkey = classKey.getPackageKey() - if pname in self.packages: - if pkey in self.packages[pname]: - return self.packages[pname][pkey] + return self.schemaCache.getSchema(classKey) + def bindPackage(self, packageName): """ Request object updates for all table classes within a package. """ @@ -535,6 +636,7 @@ class Session: broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) + def bindClass(self, pname, cname): """ Request object updates for a particular table class by package and class name. """ if not self.userBindings or not self.rcvObjects: @@ -545,6 +647,7 @@ class Session: if broker.isConnected(): broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName, binding_key=key) + def bindClassKey(self, classKey): """ Request object updates for a particular table class by class key. """ @@ -552,6 +655,7 @@ class Session: cname = classKey.getClassName() self.bindClass(pname, cname) + def getAgents(self, broker=None): """ Get a list of currently known agents """ brokerList = [] @@ -569,12 +673,14 @@ class Session: agentList.append(a) return agentList - def makeObject(self, classKey, broker=None, **kwargs): + + def makeObject(self, classKey, **kwargs): """ Create a new, unmanaged object of the schema indicated by classKey """ schema = self.getSchema(classKey) if schema == None: raise Exception("Schema not found for classKey") - return Object(self, broker, schema, None, True, True, False, kwargs) + return Object(None, schema, None, True, True, kwargs) + def getObjects(self, **kwargs): """ Get a list of objects from QMF agents. @@ -644,81 +750,24 @@ class Session: if len(agentList) == 0: return [] - pname = None - cname = None - hash = None - classKey = None - if "_schema" in kwargs: classKey = kwargs["_schema"].getKey() - elif "_key" in kwargs: classKey = kwargs["_key"] - elif "_class" in kwargs: - cname = kwargs["_class"] - if "_package" in kwargs: - pname = kwargs["_package"] - if cname == None and classKey == None and "_objectId" not in kwargs: - raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument") - - map = {} - self.getSelect = [] - if "_objectId" in kwargs: - map["_objectid"] = kwargs["_objectId"].__repr__() - else: - if cname == None: - cname = classKey.getClassName() - pname = classKey.getPackageName() - hash = classKey.getHash() - map["_class"] = cname - if pname != None: map["_package"] = pname - if hash != None: map["_hash"] = hash - for item in kwargs: - if item[0] != '_': - self.getSelect.append((item, kwargs[item])) - - self.getResult = [] + # + # We now have a list of agents to query, start the queries and gather the results. + # + request = SessionGetRequest(len(agentList)) for agent in agentList: - broker = agent.broker - sendCodec = Codec() - try: - self.cv.acquire() - seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET) - self.syncSequenceList.append(seq) - finally: - self.cv.release() - broker._setHeader(sendCodec, 'G', seq) - sendCodec.write_map(map) - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank)) - broker._send(smsg) + agent.getObjects(request, **kwargs) + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + request.wait(timeout) + return request.result - starttime = time() - timeout = False - if "_timeout" in kwargs: - waitTime = kwargs["_timeout"] - else: - waitTime = self.DEFAULT_GET_WAIT_TIME - try: - self.cv.acquire() - while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(waitTime) - if time() - starttime > waitTime: - for pendingSeq in self.syncSequenceList: - self.seqMgr._release(pendingSeq) - self.syncSequenceList = [] - timeout = True - finally: - self.cv.release() - - if self.error: - errorText = self.error - self.error = None - raise Exception(errorText) - - if len(self.getResult) == 0 and timeout: - raise RuntimeError("No agent responded within timeout period") - return self.getResult def setEventFilter(self, **kwargs): """ """ pass + def _bindingKeys(self): keyList = [] keyList.append("schema.#") @@ -735,18 +784,21 @@ class Session: keyList.append("console.heartbeat.#") return keyList + def _handleBrokerConnect(self, broker): if self.console: for agent in broker.getAgents(): self.console.newAgent(agent) self.console.brokerConnected(broker) + def _handleBrokerDisconnect(self, broker): if self.console: for agent in broker.getAgents(): self.console.delAgent(agent) self.console.brokerDisconnected(broker) + def _handleBrokerResp(self, broker, codec, seq): broker.brokerId = codec.read_uuid() if self.console != None: @@ -760,16 +812,10 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) + def _handlePackageInd(self, broker, codec, seq): pname = str(codec.read_str8()) - notify = False - try: - self.cv.acquire() - if pname not in self.packages: - self.packages[pname] = {} - notify = True - finally: - self.cv.release() + notify = self.schemaCache.declarePackage(pname) if notify and self.console != None: self.console.newPackage(pname) @@ -782,7 +828,8 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleCommandComplete(self, broker, codec, seq): + + def _handleCommandComplete(self, broker, codec, seq, agent): code = codec.read_uint32() text = codec.read_str8() context = self.seqMgr._release(seq) @@ -804,20 +851,16 @@ class Session: finally: self.cv.release() + if agent: + agent._handleV1Completion(seq, code, text) + + def _handleClassInd(self, broker, codec, seq): kind = codec.read_uint8() classKey = ClassKey(codec) - unknown = False + schema = self.schemaCache.getSchema(classKey) - try: - self.cv.acquire() - if classKey.getPackageName() in self.packages: - if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]: - unknown = True - finally: - self.cv.release() - - if unknown: + if not schema: # Send a schema request for the unknown class broker._incOutstanding() sendCodec = Codec() @@ -827,30 +870,6 @@ class Session: smsg = broker._message(sendCodec.encoded) broker._send(smsg) - def _handleMethodResp(self, broker, codec, seq): - code = codec.read_uint32() - text = codec.read_str16() - outArgs = {} - pair = self.seqMgr._release(seq) - if pair == None: - return - method, synchronous = pair - if code == 0: - for arg in method.arguments: - if arg.dir.find("O") != -1: - outArgs[arg.name] = self._decodeValue(codec, arg.type, broker) - result = MethodResult(code, text, outArgs) - if synchronous: - try: - broker.cv.acquire() - broker.syncResult = result - broker.syncInFlight = False - broker.cv.notify() - finally: - broker.cv.release() - else: - if self.console: - self.console.methodResponse(broker, seq, result) def _handleHeartbeatInd(self, broker, codec, seq, msg): brokerBank = 1 @@ -873,59 +892,49 @@ class Session: timestamp = codec.read_uint64() if self.console != None and agent != None: self.console.heartbeat(agent, timestamp) + broker._ageAgents() - def _handleEventInd(self, broker, codec, seq): - if self.console != None: - event = Event(self, broker, codec) - self.console.event(broker, event) - def _handleSchemaResp(self, broker, codec, seq): + def _handleSchemaResp(self, broker, codec, seq, agent_addr): kind = codec.read_uint8() classKey = ClassKey(codec) _class = SchemaClass(kind, classKey, codec, self) - try: - self.cv.acquire() - self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class - finally: - self.cv.release() - - self.seqMgr._release(seq) - broker._decOutstanding() + self.schemaCache.declareClass(classKey, _class) + ctx = self.seqMgr._release(seq) + if ctx: + broker._decOutstanding() if self.console != None: self.console.newClass(kind, classKey) - def _handleContentInd(self, broker, codec, seq, prop=False, stat=False): - classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return - schema = self.packages[pname][pkey] - finally: - self.cv.release() + if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode): + agent = self._getAgentForAgentAddr(agent_addr) + if agent: + agent._schemaInfoFromV2Agent() - object = Object(self, broker, schema, codec, prop, stat) - if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: - broker._updateAgent(object) + def _v2HandleHeartbeatInd(self, broker, mp, ah, content): try: - self.cv.acquire() - if seq in self.syncSequenceList: - if object.getTimestamps()[2] == 0 and self._selectMatch(object): - self.getResult.append(object) - return - finally: - self.cv.release() + agentName = ah["qmf.agent"] + values = content["_values"] + timestamp = values["timestamp"] + interval = values["heartbeat_interval"] + except: + return + + agent = broker.getAgent(1, agentName) + if agent == None: + agent = Agent(broker, agentName, "QMFv2 Agent", True, interval) + broker._addAgent(agentName, agent) + else: + agent.touch() + if self.console and agent: + self.console.heartbeat(agent, timestamp) + broker._ageAgents() + + + def _v2HandleAgentLocateRsp(self, broker, mp, ah, content): + self._v2HandleHeartbeatInd(broker, mp, ah, content) - if self.console and self.rcvObjects: - if prop: - self.console.objectProps(broker, object) - if stat: - self.console.objectStats(broker, object) def _handleError(self, error): try: @@ -937,6 +946,7 @@ class Session: finally: self.cv.release() + def _selectMatch(self, object): """ Check the object against self.getSelect to check for a match """ for key, value in self.getSelect: @@ -945,6 +955,7 @@ class Session: return False return True + def _decodeValue(self, codec, typecode, broker=None): """ Decode, from the codec, a value based on its typecode. """ if typecode == 1: data = codec.read_uint8() # U8 @@ -971,17 +982,9 @@ class Session: inner_type_code = codec.read_uint8() if inner_type_code == 20: classKey = ClassKey(codec) - try: - self.cv.acquire() - pname = classKey.getPackageName() - if pname not in self.packages: - return None - pkey = classKey.getPackageKey() - if pkey not in self.packages[pname]: - return None - schema = self.packages[pname][pkey] - finally: - self.cv.release() + schema = self.schemaCache.getSchema(classKey) + if not schema: + return None data = Object(self, broker, schema, codec, True, True, False) else: data = self._decodeValue(codec, inner_type_code, broker) @@ -999,6 +1002,7 @@ class Session: raise ValueError("Invalid type code: %d" % typecode) return data + def _encodeValue(self, codec, value, typecode): """ Encode, into the codec, a value based on its typecode. """ if typecode == 1: codec.write_uint8 (int(value)) # U8 @@ -1033,9 +1037,11 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def encoding(self, value): return self._encoding(value.__class__) + def _encoding(self, klass): if Session.ENCODINGS.has_key(klass): return self.ENCODINGS[klass] @@ -1044,6 +1050,7 @@ class Session: if result != None: return result + def _displayValue(self, value, typecode): """ """ if typecode == 1: return unicode(value) @@ -1072,6 +1079,7 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _defaultValue(self, stype, broker=None, kwargs={}): """ """ typecode = stype.type @@ -1110,6 +1118,7 @@ class Session: else: raise ValueError ("Invalid type code: %d" % typecode) + def _bestClassKey(self, pname, cname, preferredList): """ """ if pname == None or cname == None: @@ -1125,6 +1134,7 @@ class Session: return c return None + def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList): """ This function can be used to send a method request to an object given only the broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are @@ -1133,14 +1143,10 @@ class Session: schema = self.getSchema(schemaKey) for method in schema.getMethods(): if name == method.name: - aIdx = 0 - sendCodec = Codec() - seq = self.seqMgr._reserve((method, False)) - broker._setHeader(sendCodec, 'M', seq) - objectId.encode(sendCodec) - schemaKey.encode(sendCodec) - sendCodec.write_str8(name) - + # + # Count the arguments supplied and validate that the number is what is expected + # based on the schema. + # count = 0 for arg in method.arguments: if arg.dir.find("I") != -1: @@ -1148,25 +1154,192 @@ class Session: if count != len(argList): raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList))) - for arg in method.arguments: - if arg.dir.find("I") != -1: - self._encodeValue(sendCodec, argList[aIdx], arg.type) - aIdx += 1 - smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % - (objectId.getBrokerBank(), objectId.getAgentBank())) - broker._send(smsg) + aIdx = 0 + sendCodec = Codec() + seq = self.seqMgr._reserve((method, False)) + + if objectId.isV2(): + # + # Compose and send a QMFv2 method request + # + call = {} + call['_object_id'] = objectId.asMap() + call['_method_name'] = name + args = {} + for arg in method.arguments: + if arg.dir.find("I") != -1: + args[arg.name] = argList[aIdx] + aIdx += 1 + call['_arguments'] = args + + dp = broker.amqpSession.delivery_properties() + dp.routing_key = objectId.getAgentBank() + mp = broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = broker.authUser + mp.correlation_id = str(seq) + mp.app_id = "qmf2" + mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_method_request'} + sendCodec.write_map(call) + msg = Message(dp, mp, sendCodec.encoded) + broker._send(msg, "qmf.default.direct") + + else: + # + # Associate this sequence with the agent hosting the object so we can correctly + # route the method-response + # + agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank()) + broker._setSequence(seq, agent) + + # + # Compose and send a QMFv1 method request + # + broker._setHeader(sendCodec, 'M', seq) + objectId.encode(sendCodec) + schemaKey.encode(sendCodec) + sendCodec.write_str8(name) + + for arg in method.arguments: + if arg.dir.find("I") != -1: + self._encodeValue(sendCodec, argList[aIdx], arg.type) + aIdx += 1 + smsg = broker._message(sendCodec.encoded, "agent.%d.%s" % + (objectId.getBrokerBank(), objectId.getAgentBank())) + broker._send(smsg) return seq return None -class Package: - """ """ - def __init__(self, name): - self.name = name +#=================================================================================================== +# SessionGetRequest +#=================================================================================================== +class SessionGetRequest(object): + """ + This class is used to track get-object queries at the Session level. + """ + def __init__(self, agentCount): + self.agentCount = agentCount + self.result = [] + self.cv = Condition() + self.waiting = True + + def __call__(self, **kwargs): + """ + Callable entry point for gathering collected objects. + """ + try: + self.cv.acquire() + if 'qmf_object' in kwargs: + self.result.append(kwargs['qmf_object']) + elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs: + self.agentCount -= 1 + if self.agentCount == 0: + self.waiting = None + self.cv.notify() + finally: + self.cv.release() + + def wait(self, timeout): + starttime = time() + try: + self.cv.acquire() + while self.waiting: + if (time() - starttime) > timeout: + raise Exception("Timed out after %d seconds" % timeout) + self.cv.wait(1) + finally: + self.cv.release() + + +#=================================================================================================== +# SchemaCache +#=================================================================================================== +class SchemaCache(object): + """ + The SchemaCache is a data structure that stores learned schema information. + """ + def __init__(self): + """ + Create a map of schema packages and a lock to protect this data structure. + Note that this lock is at the bottom of any lock hierarchy. If it is held, no other + lock in the system should attempt to be acquired. + """ + self.packages = {} + self.lock = Lock() + + def getPackages(self): + """ Get the list of known QMF packages """ + list = [] + try: + self.lock.acquire() + for package in self.packages: + list.append(package) + finally: + self.lock.release() + return list + + def getClasses(self, packageName): + """ Get the list of known classes within a QMF package """ + list = [] + try: + self.lock.acquire() + if packageName in self.packages: + for pkey in self.packages[packageName]: + list.append(self.packages[packageName][pkey].getKey()) + finally: + self.lock.release() + return list + + def getSchema(self, classKey): + """ Get the schema for a QMF class """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname in self.packages: + if pkey in self.packages[pname]: + return self.packages[pname][pkey] + finally: + self.lock.release() + return None + + def declarePackage(self, pname): + """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """ + try: + self.lock.acquire() + if pname in self.packages: + return None + self.packages[pname] = {} + finally: + self.lock.release() + return True + + def declareClass(self, classKey, classDef): + """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """ + pname = classKey.getPackageName() + pkey = classKey.getPackageKey() + try: + self.lock.acquire() + if pname not in self.packages: + self.packages[pname] = {} + packageMap = self.packages[pname] + if pkey in packageMap: + return None + packageMap[pkey] = classDef + finally: + self.lock.release() + return True + + +#=================================================================================================== +# ClassKey +#=================================================================================================== class ClassKey: """ A ClassKey uniquely identifies a class from the schema. """ def __init__(self, constructor): - if type(constructor) == str: + if constructor.__class__ == str: # construct from __repr__ string try: self.pname, cls = constructor.split(":") @@ -1177,20 +1350,33 @@ class ClassKey: h1 = int(hexValues[1], 16) h2 = int(hexValues[2], 16) h3 = int(hexValues[3], 16) - self.hash = struct.pack("!LLLL", h0, h1, h2, h3) + h4 = int(hexValues[4][0:4], 16) + h5 = int(hexValues[4][4:12], 16) + self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5)) except: raise Exception("Invalid ClassKey format") + elif constructor.__class__ == dict: + # construct from QMFv2 map + try: + self.pname = constructor['_package_name'] + self.cname = constructor['_class_name'] + self.hash = constructor['_hash'] + except: + raise Exception("Invalid ClassKey map format") else: # construct from codec codec = constructor self.pname = str(codec.read_str8()) self.cname = str(codec.read_str8()) - self.hash = codec.read_bin128() + self.hash = UUID(codec.read_bin128()) def encode(self, codec): codec.write_str8(self.pname) codec.write_str8(self.cname) - codec.write_bin128(self.hash) + codec.write_bin128(self.hash.bytes) + + def asMap(self): + return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash} def getPackageName(self): return self.pname @@ -1202,7 +1388,7 @@ class ClassKey: return self.hash def getHashString(self): - return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash) + return str(self.hash) def getPackageKey(self): return (self.cname, self.hash) @@ -1210,6 +1396,10 @@ class ClassKey: def __repr__(self): return self.pname + ":" + self.cname + "(" + self.getHashString() + ")" + +#=================================================================================================== +# SchemaClass +#=================================================================================================== class SchemaClass: """ """ CLASS_KIND_TABLE = 1 @@ -1292,6 +1482,10 @@ class SchemaClass: else: return self.arguments + self.session.getSchema(self.superTypeKey).getArguments() + +#=================================================================================================== +# SchemaProperty +#=================================================================================================== class SchemaProperty: """ """ def __init__(self, codec): @@ -1321,6 +1515,10 @@ class SchemaProperty: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaStatistic +#=================================================================================================== class SchemaStatistic: """ """ def __init__(self, codec): @@ -1337,6 +1535,10 @@ class SchemaStatistic: def __repr__(self): return self.name + +#=================================================================================================== +# SchemaMethod +#=================================================================================================== class SchemaMethod: """ """ def __init__(self, codec): @@ -1365,6 +1567,10 @@ class SchemaMethod: result += ")" return result + +#=================================================================================================== +# SchemaArgument +#=================================================================================================== class SchemaArgument: """ """ def __init__(self, codec, methodArg): @@ -1392,64 +1598,113 @@ class SchemaArgument: elif key == "refPackage" : self.refPackage = value elif key == "refClass" : self.refClass = value + +#=================================================================================================== +# ObjectId +#=================================================================================================== class ObjectId: """ Object that represents QMF object identifiers """ - def __init__(self, codec, first=0, second=0): - if codec: - self.first = codec.read_uint64() - self.second = codec.read_uint64() + def __init__(self, constructor, first=0, second=0, agentName=None): + if constructor.__class__ == dict: + self.agentName = agentName + self.agentEpoch = 0 + if '_agent_name' in constructor: self.agentName = constructor['_agent_name'] + if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch'] + if '_object_name' not in constructor: + raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.") + self.objectName = constructor['_object_name'] else: - self.first = first - self.second = second + if not constructor: + first = first + second = second + else: + first = constructor.read_uint64() + second = constructor.read_uint64() + self.agentName = str(first & 0x000000000FFFFFFF) + self.agentEpoch = (first & 0x0FFF000000000000) >> 48 + self.objectName = str(second) def __cmp__(self, other): if other == None or not isinstance(other, ObjectId) : return 1 - if self.first < other.first: + + if self.objectName < other.objectName: + return -1 + if self.objectName > other.objectName: + return 1 + + if self.agentName < other.agentName: return -1 - if self.first > other.first: + if self.agentName > other.agentName: return 1 - if self.second < other.second: + + if self.agentEpoch < other.agentEpoch: return -1 - if self.second > other.second: + if self.agentEpoch > other.agentEpoch: return 1 return 0 def __repr__(self): - return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(), + return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(), self.getBrokerBank(), self.getAgentBank(), self.getObject()) + def isV2(self): + return not self.agentName.isdigit() + def index(self): - return (self.first, self.second) + return self.__repr__() def getFlags(self): - return (self.first & 0xF000000000000000) >> 60 + return 0 def getSequence(self): - return (self.first & 0x0FFF000000000000) >> 48 + return self.agentEpoch def getBrokerBank(self): - return (self.first & 0x0000FFFFF0000000) >> 28 + return 1 def getAgentBank(self): - return self.first & 0x000000000FFFFFFF + return self.agentName def getObject(self): - return self.second + return self.objectName def isDurable(self): return self.getSequence() == 0 def encode(self, codec): - codec.write_uint64(self.first) - codec.write_uint64(self.second) + first = (self.agentEpoch << 48) + (1 << 28) + second = 0 + + try: + first += int(self.agentName) + except: + pass + + try: + second = int(self.objectName) + except: + pass + + codec.write_uint64(first) + codec.write_uint64(second) + + def asMap(self): + omap = {'_agent_name': self.agentName, '_object_name': self.objectName} + if self.agentEpoch != 0: + omap['_agent_epoch'] = self.agentEpoch + return omap def __hash__(self): - return (self.first, self.second).__hash__() + return self.__repr__().__hash__() def __eq__(self, other): - return (self.first, self.second).__eq__(other) + return self.__repr__().__eq__(other) + +#=================================================================================================== +# MethodResult +#=================================================================================================== class MethodResult(object): """ """ def __init__(self, status, text, outArgs): @@ -1465,6 +1720,10 @@ class MethodResult(object): def __repr__(self): return "%s (%d) - %s" % (self.text, self.status, self.outArgs) + +#=================================================================================================== +# ManagedConnection +#=================================================================================================== class ManagedConnection(Thread): """ Thread class for managing a connection. """ DELAY_MIN = 1 @@ -1527,6 +1786,10 @@ class ManagedConnection(Thread): finally: self.cv.release() + +#=================================================================================================== +# Broker +#=================================================================================================== class Broker: """ This object represents a connection (or potential connection) to a QMF broker. """ SYNC_TIME = 60 @@ -1542,6 +1805,7 @@ class Broker: self.authUser = authUser self.authPass = authPass self.cv = Condition() + self.seqToAgentMap = {} self.error = None self.brokerId = None self.connected = False @@ -1574,9 +1838,13 @@ class Broker: def getAgent(self, brokerBank, agentBank): """ Return the agent object associated with a particular broker and agent bank value.""" - bankKey = (brokerBank, agentBank) - if bankKey in self.agents: - return self.agents[bankKey] + bankKey = str(agentBank) + try: + self.cv.acquire() + if bankKey in self.agents: + return self.agents[bankKey] + finally: + self.cv.release() return None def getSessionId(self): @@ -1585,7 +1853,11 @@ class Broker: def getAgents(self): """ Get the list of agents reachable via this broker """ - return self.agents.values() + try: + self.cv.acquire() + return self.agents.values() + finally: + self.cv.release() def getAmqpSession(self): """ Get the AMQP session object for this connected broker. """ @@ -1612,10 +1884,29 @@ class Broker: else: return "Disconnected Broker" + def _setSequence(self, sequence, agent): + try: + self.cv.acquire() + self.seqToAgentMap[sequence] = agent + finally: + self.cv.release() + + def _clearSequence(self, sequence): + try: + self.cv.acquire() + self.seqToAgentMap.pop(sequence) + finally: + self.cv.release() + def _tryToConnect(self): try: - self.agents = {} - self.agents[(1,0)] = Agent(self, 0, "BrokerAgent") + try: + self.cv.acquire() + self.agents = {} + self.agents['0'] = Agent(self, 0, "BrokerAgent") + finally: + self.cv.release() + self.topicBound = False self.syncInFlight = False self.syncRequest = 0 @@ -1649,7 +1940,7 @@ class Broker: self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb) + self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb) self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1) self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL) @@ -1659,11 +1950,29 @@ class Broker: self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest", accept_mode=self.amqpSession.accept_mode.none, acquire_mode=self.amqpSession.acquire_mode.pre_acquired) - self.amqpSession.incoming("tdest").listen(self._replyCb) + self.amqpSession.incoming("tdest").listen(self._v1Cb) self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1) self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL) self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL) + ## + ## Set up connectivity for QMFv2 + ## + self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId + self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True) + self.amqpSession.exchange_bind(exchange="qmf.default.direct", + queue=self.v2_queue_name, binding_key=self.v2_queue_name) + self.amqpSession.exchange_bind(exchange="qmf.default.topic", + queue=self.v2_queue_name, binding_key="agent.#") + ## Other bindings here... + self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest", + accept_mode=self.amqpSession.accept_mode.none, + acquire_mode=self.amqpSession.acquire_mode.pre_acquired) + self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb) + self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1) + self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL) + self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL) + self.connected = True self.session._handleBrokerConnect(self) @@ -1671,6 +1980,7 @@ class Broker: self._setHeader(codec, 'B') msg = self._message(codec.encoded) self._send(msg) + self._v2SendAgentLocate() except socket.error, e: self.error = "Socket Error %s - %s" % (e.__class__.__name__, e) @@ -1683,18 +1993,73 @@ class Broker: raise def _updateAgent(self, obj): - bankKey = (obj.brokerBank, obj.agentBank) + bankKey = str(obj.agentBank) + agent = None if obj._deleteTime == 0: - if bankKey not in self.agents: - agent = Agent(self, obj.agentBank, obj.label) - self.agents[bankKey] = agent - if self.session.console != None: - self.session.console.newAgent(agent) + try: + self.cv.acquire() + if bankKey not in self.agents: + agent = Agent(self, obj.agentBank, obj.label) + self.agents[bankKey] = agent + finally: + self.cv.release() + if agent and self.session.console: + self.session.console.newAgent(agent) else: - agent = self.agents.pop(bankKey, None) - if agent != None and self.session.console != None: + try: + self.cv.acquire() + agent = self.agents.pop(bankKey, None) + if agent: + agent.close() + finally: + self.cv.release() + if agent and self.session.console: self.session.console.delAgent(agent) + def _addAgent(self, name, agent): + try: + self.cv.acquire() + self.agents[name] = agent + finally: + self.cv.release() + if self.session.console: + self.session.console.newAgent(agent) + + def _ageAgents(self): + try: + self.cv.acquire() + to_delete = [] + to_notify = [] + for key in self.agents: + if self.agents[key].isOld(): + to_delete.append(key) + for key in to_delete: + agent = self.agents.pop(key) + agent.close() + to_notify.append(agent) + finally: + self.cv.release() + if self.session.console: + for agent in to_notify: + self.session.console.delAgent(agent) + + def _v2SendAgentLocate(self, predicate={}): + """ + Broadcast an agent-locate request to cause all agents in the domain to tell us who they are. + """ + dp = self.amqpSession.delivery_properties() + dp.routing_key = "console.request.agent_locate" + mp = self.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.authUser + mp.app_id = "qmf2" + mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_agent_locate_request'} + sendCodec = Codec() + sendCodec.write_map(predicate) + msg = Message(dp, mp, sendCodec.encoded) + self._send(msg, "qmf.default.topic") + def _setHeader(self, codec, opcode, seq=0): """ Compose the header of a management message. """ codec.write_uint8(ord('A')) @@ -1785,24 +2150,105 @@ class Broker: finally: self.cv.release() - def _replyCb(self, msg): + def _v1Cb(self, msg): + try: + self._v1CbProtected(msg) + except Exception, e: + print "EXCEPTION in Broker._v1Cb:", e + + def _v1CbProtected(self, msg): + """ + This is the general message handler for messages received via the QMFv1 exchanges. + """ + agent = None + agent_addr = None + mp = msg.get("message_properties") + ah = mp.application_headers + if ah and 'qmf.agent' in ah: + agent_addr = ah['qmf.agent'] + + if not agent_addr: + # + # See if we can determine the agent identity from the routing key + # + dp = msg.get("delivery_properties") + rkey = None + if dp.routing_key: + rkey = dp.routing_key + items = rkey.split('.') + if len(items) >= 4: + if items[0] == 'console' and items[3].isdigit(): + agent_addr = str(items[3]) # The QMFv1 Agent Bank + if agent_addr != None and agent_addr in self.agents: + agent = self.agents[agent_addr] + codec = Codec(msg.body) + alreadyTried = None while True: opcode, seq = self._checkHeader(codec) + + if not agent and not alreadyTried: + alreadyTried = True + try: + self.cv.acquire() + if seq in self.seqToAgentMap: + agent = self.seqToAgentMap[seq] + finally: + self.cv.release() + if opcode == None: return if opcode == 'b': self.session._handleBrokerResp (self, codec, seq) elif opcode == 'p': self.session._handlePackageInd (self, codec, seq) - elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq) elif opcode == 'q': self.session._handleClassInd (self, codec, seq) - elif opcode == 'm': self.session._handleMethodResp (self, codec, seq) + elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr) elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg) - elif opcode == 'e': self.session._handleEventInd (self, codec, seq) - elif opcode == 's': self.session._handleSchemaResp (self, codec, seq) - elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True) - elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True) - elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True) - self.session.receiver._completed.add(msg.id) - self.session.channel.session_completed(self.session.receiver._completed) + elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent) + elif agent: + agent._handleQmfV1Message(opcode, seq, mp, ah, codec) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) + + def _v2Cb(self, msg): + """ + This is the general message handler for messages received via QMFv2 exchanges. + """ + mp = msg.get("message_properties") + ah = mp["application_headers"] + codec = Codec(msg.body) + + if 'qmf.opcode' in ah: + opcode = ah['qmf.opcode'] + if mp.content_type == "amqp/list": + content = codec.read_list() + if not content: + content = [] + elif mp.content_type == "amqp/map": + content = codec.read_map() + if not content: + content = {} + else: + content = None + + if content != None: + ## + ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are + ## used to maintain the broker's list of agent proxies. + ## + if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content) + elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content) + else: + ## + ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender + ## of the message. + ## + agent_addr = ah['qmf.agent'] + if agent_addr in self.agents: + agent = self.agents[agent_addr] + agent._handleQmfV2Message(opcode, mp, ah, content) + + self.amqpSession.receiver._completed.add(msg.id) + self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed) def _exceptionCb(self, data): self.connected = False @@ -1818,43 +2264,697 @@ class Broker: if self.thread: self.thread.disconnected() + +#=================================================================================================== +# Agent +#=================================================================================================== class Agent: - """ """ - def __init__(self, broker, agentBank, label): + """ + This class represents a proxy for a remote agent being managed + """ + def __init__(self, broker, agentBank, label, isV2=False, interval=0): self.broker = broker + self.session = broker.session + self.schemaCache = self.session.schemaCache self.brokerBank = broker.getBrokerBank() - self.agentBank = agentBank + self.agentBank = str(agentBank) self.label = label + self.isV2 = isV2 + self.heartbeatInterval = interval + self.lock = Lock() + self.seqMgr = self.session.seqMgr + self.contextMap = {} + self.unsolicitedContext = RequestContext(self, self) + self.lastSeenTime = time() + self.closed = None + + + def _checkClosed(self): + if self.closed: + raise Exception("Agent is disconnected") + + + def __call__(self, **kwargs): + """ + This is the handler for unsolicited stuff received from the agent + """ + if 'qmf_object' in kwargs: + if self.session.console: + self.session.console.objectProps(self.broker, kwargs['qmf_object']) + elif 'qmf_object_stats' in kwargs: + if self.session.console: + self.session.console.objectStats(self.broker, kwargs['qmf_object_stats']) + elif 'qmf_event' in kwargs: + if self.session.console: + self.session.console.event(self.broker, kwargs['qmf_event']) + + + def touch(self): + self.lastSeenTime = time() + + + def isOld(self): + if self.heartbeatInterval == 0: + return None + if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval): + return True + return None + + + def close(self): + self.closed = True + copy = {} + try: + self.lock.acquire() + for seq in self.contextMap: + copy[seq] = self.contextMap[seq] + finally: + self.lock.release() + + for seq in copy: + context = copy[seq] + context.cancel("Agent disconnected") + def __repr__(self): - return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label) + if self.isV2: + ver = "v2" + else: + ver = "v1" + return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label) + def getBroker(self): + self._checkClosed() return self.broker + def getBrokerBank(self): + self._checkClosed() return self.brokerBank + def getAgentBank(self): + self._checkClosed() return self.agentBank + + def getObjects(self, notifiable=None, **kwargs): + """ Get a list of objects from QMF agents. + All arguments are passed by name(keyword). + + If 'notifiable' is None (default), this call will block until completion or timeout. + If supplied, notifiable is assumed to be a callable object that will be called when the + list of queried objects arrives. The single argument to the call shall be a list of + the returned objects. + + The class for queried objects may be specified in one of the following ways: + + _schema = <schema> - supply a schema object returned from getSchema. + _key = <key> - supply a classKey from the list returned by getClasses. + _class = <name> - supply a class name as a string. If the class name exists + in multiple packages, a _package argument may also be supplied. + _objectId = <id> - get the object referenced by the object-id + + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + + If additional arguments are supplied, they are used as property selectors. For example, + if the argument name="test" is supplied, only objects whose "name" property is "test" + will be returned in the result. + """ + self._checkClosed() + if notifiable: + if not callable(notifiable): + raise Exception("notifiable object must be callable") + + # + # Isolate the selectors from the kwargs + # + selectors = {} + for key in kwargs: + value = kwargs[key] + if key[0] != '_': + selectors[key] = value + + # + # Allocate a context to track this asynchronous request. + # + context = RequestContext(self, notifiable, selectors) + sequence = self.seqMgr._reserve(context) + try: + self.lock.acquire() + self.contextMap[sequence] = context + context.setSequence(sequence) + finally: + self.lock.release() + + # + # Compose and send the query message to the agent using the appropriate protocol for the + # agent's QMF version. + # + if self.isV2: + self._v2SendGetQuery(sequence, kwargs) + else: + self.broker._setSequence(sequence, self) + self._v1SendGetQuery(sequence, kwargs) + + # + # If this is a synchronous call, block and wait for completion. + # + if not notifiable: + timeout = 60 + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + context.waitForSignal(timeout) + if context.exception: + raise Exception(context.exception) + result = context.queryResults + return result + + + def _clearContext(self, sequence): + try: + self.lock.acquire() + self.contextMap.pop(sequence) + finally: + self.lock.release() + + + def _schemaInfoFromV2Agent(self): + """ + We have just received new schema information from this agent. Check to see if there's + more work that can now be done. + """ + try: + self.lock.acquire() + copy_of_map = {} + for item in self.contextMap: + copy_of_map[item] = self.contextMap[item] + finally: + self.lock.release() + + self.unsolicitedContext.reprocess() + for context in copy_of_map: + copy_of_map[context].reprocess() + + + def _handleV1Completion(self, sequence, code, text): + """ + Called if one of this agent's V1 commands completed + """ + context = None + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + if context: + if code != 0: + ex = "Error %d: %s" % (code, text) + context.setException(ex) + context.signal() + self.broker._clearSequence(sequence) + + + def _v1HandleMethodResp(self, codec, seq): + """ + Handle a QMFv1 method response + """ + code = codec.read_uint32() + text = codec.read_str16() + outArgs = {} + self.broker._clearSequence(seq) + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + if code == 0: + for arg in method.arguments: + if arg.dir.find("O") != -1: + outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) + result = MethodResult(code, text, outArgs) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + + def _v1HandleEventInd(self, codec, seq): + """ + Handle a QMFv1 event indication + """ + event = Event(self, codec) + self.unsolicitedContext.doEvent(event) + + + def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False): + """ + Handle a QMFv1 content indication + """ + classKey = ClassKey(codec) + schema = self.schemaCache.getSchema(classKey) + if not schema: + return + + obj = Object(self, schema, codec, prop, stat) + if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop: + self.broker._updateAgent(obj) + + context = self.unsolicitedContext + try: + self.lock.acquire() + if sequence in self.contextMap: + context = self.contextMap[sequence] + finally: + self.lock.release() + + context.addV1QueryResult(obj) + + + def _v2HandleDataInd(self, mp, ah, content): + """ + Handle a QMFv2 data indication from the agent + """ + if mp.correlation_id: + try: + self.lock.acquire() + sequence = int(mp.correlation_id) + if sequence not in self.contextMap: + return + context = self.contextMap[sequence] + finally: + self.lock.release() + else: + context = self.unsolicitedContext + + kind = "_data" + if "qmf.content" in ah: + kind = ah["qmf.content"] + if kind == "_data": + if content.__class__ != list: + return + for omap in content: + context.addV2QueryResult(omap) + context.processV2Data() + + if 'partial' not in ah: + context.signal() + + + def _v2HandleMethodResp(self, mp, ah, content): + """ + Handle a QMFv2 method response from the agent + """ + context = None + sequence = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + + result = MethodResult(0, 'OK', content['_arguments']) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + def _v2HandleException(self, mp, ah, content): + """ + Handle a QMFv2 exception + """ + context = None + if mp.correlation_id: + try: + self.lock.acquire() + seq = int(mp.correlation_id) + finally: + self.lock.release() + else: + return + + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair + + code = 7 + text = "" + if '_status_code' in content: + code = content['_status_code'] + if '_status_text' in content: + text = content['_status_text'] + else: + text = content + + result = MethodResult(code, text, {}) + if synchronous: + try: + self.broker.cv.acquire() + self.broker.syncResult = result + self.broker.syncInFlight = False + self.broker.cv.notify() + finally: + self.broker.cv.release() + else: + if self.session.console: + self.session.console.methodResponse(self.broker, seq, result) + + + def _v1SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv1 agent. + """ + # + # Build the query map + # + query = {} + if '_class' in kwargs: + query['_class'] = kwargs['_class'] + if '_package' in kwargs: + query['_package'] = kwargs['_package'] + elif '_key' in kwargs: + key = kwargs['_key'] + query['_class'] = key.getClassName() + query['_package'] = key.getPackageName() + elif '_objectId' in kwargs: + query['_objectid'] = kwargs['_objectId'].__repr__() + + # + # Construct and transmit the message + # + sendCodec = Codec() + self.broker._setHeader(sendCodec, 'G', sequence) + sendCodec.write_map(query) + smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank)) + self.broker._send(smsg) + + + def _v2SendGetQuery(self, sequence, kwargs): + """ + Send a get query to a QMFv2 agent. + """ + # + # Build the query map + # + query = {'_what': 'OBJECT'} + if '_class' in kwargs: + schemaMap = {'_class_name': kwargs['_class']} + if '_package' in kwargs: + schemaMap['_package_name'] = kwargs['_package'] + query['_schema_id'] = schemaMap + elif '_key' in kwargs: + query['_schema_id'] = kwargs['_key'].asMap() + elif '_objectId' in kwargs: + query['_object_id'] = kwargs['_objectId'].asMap() + + # + # Construct and transmit the message + # + dp = self.broker.amqpSession.delivery_properties() + dp.routing_key = self.agentBank + mp = self.broker.amqpSession.message_properties() + mp.content_type = "amqp/map" + mp.user_id = self.broker.authUser + mp.correlation_id = str(sequence) + mp.app_id = "qmf2" + mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name) + mp.application_headers = {'qmf.opcode':'_query_request'} + sendCodec = Codec() + sendCodec.write_map(query) + msg = Message(dp, mp, sendCodec.encoded) + self.broker._send(msg, "qmf.default.direct") + + + def _v2SendSchemaRequest(self, schemaId): + """ + Send a query to an agent to request details on a particular schema class. + IMPORTANT: This function currently sends a QMFv1 schema-request to the address of + the agent. The agent will send its response to amq.direct/<our-key>. + Eventually, this will be converted to a proper QMFv2 schema query. + """ + sendCodec = Codec() + seq = self.seqMgr._reserve(None) + self.broker._setHeader(sendCodec, 'S', seq) + schemaId.encode(sendCodec) + smsg = self.broker._message(sendCodec.encoded, self.agentBank) + self.broker._send(smsg, "qmf.default.direct") + + + def _handleQmfV1Message(self, opcode, seq, mp, ah, codec): + """ + Process QMFv1 messages arriving from an agent. + """ + if opcode == 'm': self._v1HandleMethodResp(codec, seq) + elif opcode == 'e': self._v1HandleEventInd(codec, seq) + elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True) + elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True) + elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True) + + + def _handleQmfV2Message(self, opcode, mp, ah, content): + """ + Process QMFv2 messages arriving from an agent. + """ + if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content) + elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content) + elif opcode == '_exception': self._v2HandleException(mp, ah, content) + + +#=================================================================================================== +# RequestContext +#=================================================================================================== +class RequestContext(object): + """ + This class tracks an asynchronous request sent to an agent. + TODO: Add logic for client-side selection and filtering deleted objects from get-queries + """ + def __init__(self, agent, notifiable, selectors={}): + self.sequence = None + self.agent = agent + self.schemaCache = self.agent.schemaCache + self.notifiable = notifiable + self.selectors = selectors + self.startTime = time() + self.rawQueryResults = [] + self.queryResults = [] + self.exception = None + self.waitingForSchema = None + self.pendingSignal = None + self.cv = Condition() + self.blocked = notifiable == None + + + def setSequence(self, sequence): + self.sequence = sequence + + + def addV1QueryResult(self, data): + values = {} + for prop, val in data.getProperties(): + values[prop.name] = val + for stat, val in data.getStatistics(): + values[stat.name] = val + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return + + if self.notifiable: + self.notifiable(qmf_object=data) + else: + self.queryResults.append(data) + + + def addV2QueryResult(self, data): + values = data['_values'] + for key in values: + val = values[key] + if key in self.selectors and val != self.selectors[key]: + return + self.rawQueryResults.append(data) + + + def doEvent(self, data): + if self.notifiable: + self.notifiable(qmf_event=data) + + + def setException(self, ex): + self.exception = ex + + + def getAge(self): + return time() - self.startTime + + + def cancel(self, exception): + self.setException(exception) + try: + self.cv.acquire() + self.blocked = None + self.waitingForSchema = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + + def waitForSignal(self, timeout): + try: + self.cv.acquire() + while self.blocked: + if (time() - self.startTime) > timeout: + self.exception = "Request timed out after %d seconds" % timeout + return + self.cv.wait(1) + finally: + self.cv.release() + + + def signal(self): + try: + self.cv.acquire() + if self.waitingForSchema: + self.pendingSignal = True + return + else: + self.blocked = None + self.cv.notify() + finally: + self.cv.release() + self._complete() + + + def _complete(self): + if self.notifiable: + if self.exception: + self.notifiable(qmf_exception=self.exception) + else: + self.notifiable(qmf_complete=True) + + if self.sequence: + self.agent._clearContext(self.sequence) + + + def processV2Data(self): + """ + Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema + that is in our schema cache, process it. Otherwise, send a request for the schema information + to the agent that manages the object. + """ + schemaId = None + queryResults = [] + try: + self.cv.acquire() + if self.waitingForSchema: + return + while (not self.waitingForSchema) and len(self.rawQueryResults) > 0: + head = self.rawQueryResults[0] + schemaId = self._getSchemaIdforV2ObjectLH(head) + schema = self.schemaCache.getSchema(schemaId) + if schema: + obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank) + queryResults.append(obj) + self.rawQueryResults.pop(0) + else: + self.waitingForSchema = True + finally: + self.cv.release() + + if self.waitingForSchema: + self.agent._v2SendSchemaRequest(schemaId) + + for result in queryResults: + if self.notifiable: + self.notifiable(qmf_object=result) + else: + self.queryResults.append(result) + + complete = None + try: + self.cv.acquire() + if not self.waitingForSchema and self.pendingSignal: + self.blocked = None + self.cv.notify() + complete = True + finally: + self.cv.release() + + if complete: + self._complete() + + + def reprocess(self): + """ + New schema information has been added to the schema-cache. Clear our 'waiting' status + and see if we can make more progress on the raw query list. + """ + try: + self.cv.acquire() + self.waitingForSchema = None + finally: + self.cv.release() + self.processV2Data() + + + def _getSchemaIdforV2ObjectLH(self, data): + """ + Given a data map, extract the schema-identifier. + """ + if data.__class__ != dict: + return None + if '_schema_id' in data: + return ClassKey(data['_schema_id']) + return None + + +#=================================================================================================== +# Event +#=================================================================================================== class Event: """ """ - def __init__(self, session, broker, codec): - self.session = session - self.broker = broker + def __init__(self, agent, codec): + self.agent = agent + self.session = agent.session + self.broker = agent.broker self.classKey = ClassKey(codec) self.timestamp = codec.read_int64() self.severity = codec.read_uint8() - self.schema = None - pname = self.classKey.getPackageName() - pkey = self.classKey.getPackageKey() - if pname in session.packages: - if pkey in session.packages[pname]: - self.schema = session.packages[pname][pkey] - self.arguments = {} - for arg in self.schema.arguments: - self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker) + self.arguments = {} + self.schema = self.session.schemaCache.getSchema(self.classKey) + if not self.schema: + return + for arg in self.schema.arguments: + self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, self.broker) def __repr__(self): if self.schema == None: @@ -1895,6 +2995,10 @@ class Event: def getSchema(self): return self.schema + +#=================================================================================================== +# SequenceManager +#=================================================================================================== class SequenceManager: """ Manage sequence numbers for asynchronous method calls """ def __init__(self): @@ -1926,6 +3030,9 @@ class SequenceManager: return data +#=================================================================================================== +# DebugConsole +#=================================================================================================== class DebugConsole(Console): """ """ def brokerConnected(self, broker): |
