diff options
| author | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2009-03-27 20:46:24 +0000 |
| commit | d66d576676f8fcc0ee3fc11fb34322f499c43ca8 (patch) | |
| tree | e8dc1950a16336c085cc128176e9e15d7a8a91ac /qpid/python | |
| parent | c44a4323bad9d2774e8d26049fd36c02441baede (diff) | |
| download | qpid-python-d66d576676f8fcc0ee3fc11fb34322f499c43ca8.tar.gz | |
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby
- Added support for asynchronous method invocation
- Added option to override timeout for method request and get request
- Added exception handler in delegates.rb to catch Sasl errors
- Added tests for the async method features
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qmf/console.py | 60 | ||||
| -rw-r--r-- | qpid/python/qpid/testlib.py | 4 | ||||
| -rw-r--r-- | qpid/python/tests_0-10/management.py | 71 |
3 files changed, 113 insertions, 22 deletions
diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py index 3b99595f1f..ef2ab264eb 100644 --- a/qpid/python/qmf/console.py +++ b/qpid/python/qmf/console.py @@ -77,15 +77,15 @@ class Console: pass def heartbeat(self, agent, timestamp): - """ """ + """ Invoked when an agent heartbeat is received. """ pass def brokerInfo(self, broker): - """ """ + """ Invoked when the connection sequence reaches the point where broker information is available. """ pass def methodResponse(self, broker, seq, response): - """ """ + """ Invoked when a method response from an asynchronous method call is received. """ pass class BrokerURL(URL): @@ -117,7 +117,7 @@ class Session: _CONTEXT_STARTUP = 2 _CONTEXT_MULTIGET = 3 - GET_WAIT_TIME = 60 + DEFAULT_GET_WAIT_TIME = 60 def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True, manageConnections=False, userBindings=False): @@ -284,6 +284,11 @@ class Session: _broker = <broker> - supply a broker as returned by addBroker. + The default timeout for this synchronous operation is 60 seconds. To change the timeout, + use the following argument: + + _timeout = <time in seconds> + If additional arguments are supplied, they are used as property selectors. For example, if the argument name="test" is supplied, only objects whose "name" property is "test" will be returned in the result. @@ -365,11 +370,15 @@ class Session: starttime = time() timeout = False + if "_timeout" in kwargs: + waitTime = kwargs["_timeout"] + else: + waitTime = self.DEFAULT_GET_WAIT_TIME try: self.cv.acquire() while len(self.syncSequenceList) > 0 and self.error == None: - self.cv.wait(self.GET_WAIT_TIME) - if time() - starttime > self.GET_WAIT_TIME: + self.cv.wait(waitTime) + if time() - starttime > waitTime: for pendingSeq in self.syncSequenceList: self.seqMgr._release(pendingSeq) self.syncSequenceList = [] @@ -498,7 +507,10 @@ class Session: code = codec.read_uint32() text = codec.read_str16() outArgs = {} - method, synchronous = self.seqMgr._release(seq) + pair = self.seqMgr._release(seq) + if pair == None: + return + method, synchronous = pair if code == 0: for arg in method.arguments: if arg.dir.find("O") != -1: @@ -1083,7 +1095,7 @@ class Object(object): return value raise Exception("Type Object has no attribute '%s'" % name) - def _sendMethodRequest(self, name, args, kwargs, synchronous=False): + def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None): for method in self._schema.getMethods(): if name == method.name: aIdx = 0 @@ -1105,8 +1117,13 @@ class Object(object): if arg.dir.find("I") != -1: self._session._encodeValue(sendCodec, args[aIdx], arg.type) aIdx += 1 + if timeWait: + ttl = timeWait * 1000 + else: + ttl = None smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" % - (self._objectId.getBrokerBank(), self._objectId.getAgentBank())) + (self._objectId.getBrokerBank(), self._objectId.getAgentBank()), + ttl=ttl) if synchronous: try: self._broker.cv.acquire() @@ -1118,13 +1135,28 @@ class Object(object): return None def _invoke(self, name, args, kwargs): - if self._sendMethodRequest(name, args, kwargs, True): + if "_timeout" in kwargs: + timeout = kwargs["_timeout"] + else: + timeout = self._broker.SYNC_TIME + + if "_async" in kwargs and kwargs["_async"]: + sync = False + if "_timeout" not in kwargs: + timeout = None + else: + sync = True + + seq = self._sendMethodRequest(name, args, kwargs, sync, timeout) + if seq: + if not sync: + return seq try: self._broker.cv.acquire() starttime = time() while self._broker.syncInFlight and self._broker.error == None: - self._broker.cv.wait(self._broker.SYNC_TIME) - if time() - starttime > self._broker.SYNC_TIME: + self._broker.cv.wait(timeout) + if time() - starttime > timeout: self._session.seqMgr._release(seq) raise RuntimeError("Timed out waiting for method to respond") finally: @@ -1407,9 +1439,11 @@ class Broker: except: return None, None - def _message (self, body, routing_key="broker"): + def _message (self, body, routing_key="broker", ttl=None): dp = self.amqpSession.delivery_properties() dp.routing_key = routing_key + if ttl: + dp.ttl = ttl mp = self.amqpSession.message_properties() mp.content_type = "x-application/qmf" mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName) diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py index 7f5ac1fcd2..114a56de08 100644 --- a/qpid/python/qpid/testlib.py +++ b/qpid/python/qpid/testlib.py @@ -365,8 +365,8 @@ class TestBase010(unittest.TestCase): self.session = self.conn.session("test-session", timeout=10) self.qmf = None - def startQmf(self): - self.qmf = qmf.console.Session() + def startQmf(self, handler=None): + self.qmf = qmf.console.Session(handler) self.qmf_broker = self.qmf.addBroker(str(testrunner.url)) def connect(self, host=None, port=None): diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py index 545dc3db3b..29394c6a7d 100644 --- a/qpid/python/tests_0-10/management.py +++ b/qpid/python/tests_0-10/management.py @@ -20,6 +20,9 @@ from qpid.datatypes import Message, RangedSet from qpid.testlib import TestBase010 from qpid.management import managementChannel, managementClient +from threading import Condition +from time import sleep +import qmf.console class ManagementTest (TestBase010): """ @@ -52,7 +55,7 @@ class ManagementTest (TestBase010): self.assertEqual (res.body, body) mc.removeChannel (mch) - def test_broker_connectivity (self): + def test_methods_sync (self): """ Call the "echo" method on the broker to verify it is alive and talking. """ @@ -60,16 +63,16 @@ class ManagementTest (TestBase010): self.startQmf() brokers = self.qmf.getObjects(_class="broker") - self.assertEqual (len(brokers), 1) + self.assertEqual(len(brokers), 1) broker = brokers[0] body = "Echo Message Body" - for seq in range (1, 10): + for seq in range(1, 20): res = broker.echo(seq, body) - self.assertEqual (res.status, 0) - self.assertEqual (res.text, "OK") - self.assertEqual (res.sequence, seq) - self.assertEqual (res.body, body) + self.assertEqual(res.status, 0) + self.assertEqual(res.text, "OK") + self.assertEqual(res.sequence, seq) + self.assertEqual(res.body, body) def test_get_objects(self): self.startQmf() @@ -238,3 +241,57 @@ class ManagementTest (TestBase010): pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0] self.assertEqual (pq.msgDepth,0) + def test_methods_async (self): + """ + """ + class Handler (qmf.console.Console): + def __init__(self): + self.cv = Condition() + self.xmtList = {} + self.rcvList = {} + + def methodResponse(self, broker, seq, response): + self.cv.acquire() + try: + self.rcvList[seq] = response + finally: + self.cv.release() + + def request(self, broker, count): + self.count = count + for idx in range(count): + self.cv.acquire() + try: + seq = broker.echo(idx, "Echo Message", _async = True) + self.xmtList[seq] = idx + finally: + self.cv.release() + + def check(self): + if self.count != len(self.xmtList): + return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList)) + lost = 0 + mismatched = 0 + for seq in self.xmtList: + value = self.xmtList[seq] + if seq in self.rcvList: + result = self.rcvList.pop(seq) + if result.sequence != value: + mismatched += 1 + else: + lost += 1 + spurious = len(self.rcvList) + if lost == 0 and mismatched == 0 and spurious == 0: + return "pass" + else: + return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious) + + handler = Handler() + self.startQmf(handler) + brokers = self.qmf.getObjects(_class="broker") + self.assertEqual(len(brokers), 1) + broker = brokers[0] + handler.request(broker, 20) + sleep(1) + self.assertEqual(handler.check(), "pass") + |
