summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
committerTed Ross <tross@apache.org>2009-03-27 20:46:24 +0000
commitd66d576676f8fcc0ee3fc11fb34322f499c43ca8 (patch)
treee8dc1950a16336c085cc128176e9e15d7a8a91ac /qpid/python
parentc44a4323bad9d2774e8d26049fd36c02441baede (diff)
downloadqpid-python-d66d576676f8fcc0ee3fc11fb34322f499c43ca8.tar.gz
QPID-1702 QPID-1706
Updated qmf console in Python and Ruby - Added support for asynchronous method invocation - Added option to override timeout for method request and get request - Added exception handler in delegates.rb to catch Sasl errors - Added tests for the async method features git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@759341 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/qmf/console.py60
-rw-r--r--qpid/python/qpid/testlib.py4
-rw-r--r--qpid/python/tests_0-10/management.py71
3 files changed, 113 insertions, 22 deletions
diff --git a/qpid/python/qmf/console.py b/qpid/python/qmf/console.py
index 3b99595f1f..ef2ab264eb 100644
--- a/qpid/python/qmf/console.py
+++ b/qpid/python/qmf/console.py
@@ -77,15 +77,15 @@ class Console:
pass
def heartbeat(self, agent, timestamp):
- """ """
+ """ Invoked when an agent heartbeat is received. """
pass
def brokerInfo(self, broker):
- """ """
+ """ Invoked when the connection sequence reaches the point where broker information is available. """
pass
def methodResponse(self, broker, seq, response):
- """ """
+ """ Invoked when a method response from an asynchronous method call is received. """
pass
class BrokerURL(URL):
@@ -117,7 +117,7 @@ class Session:
_CONTEXT_STARTUP = 2
_CONTEXT_MULTIGET = 3
- GET_WAIT_TIME = 60
+ DEFAULT_GET_WAIT_TIME = 60
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
@@ -284,6 +284,11 @@ class Session:
_broker = <broker> - supply a broker as returned by addBroker.
+ The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ use the following argument:
+
+ _timeout = <time in seconds>
+
If additional arguments are supplied, they are used as property selectors. For example,
if the argument name="test" is supplied, only objects whose "name" property is "test"
will be returned in the result.
@@ -365,11 +370,15 @@ class Session:
starttime = time()
timeout = False
+ if "_timeout" in kwargs:
+ waitTime = kwargs["_timeout"]
+ else:
+ waitTime = self.DEFAULT_GET_WAIT_TIME
try:
self.cv.acquire()
while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(self.GET_WAIT_TIME)
- if time() - starttime > self.GET_WAIT_TIME:
+ self.cv.wait(waitTime)
+ if time() - starttime > waitTime:
for pendingSeq in self.syncSequenceList:
self.seqMgr._release(pendingSeq)
self.syncSequenceList = []
@@ -498,7 +507,10 @@ class Session:
code = codec.read_uint32()
text = codec.read_str16()
outArgs = {}
- method, synchronous = self.seqMgr._release(seq)
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
if code == 0:
for arg in method.arguments:
if arg.dir.find("O") != -1:
@@ -1083,7 +1095,7 @@ class Object(object):
return value
raise Exception("Type Object has no attribute '%s'" % name)
- def _sendMethodRequest(self, name, args, kwargs, synchronous=False):
+ def _sendMethodRequest(self, name, args, kwargs, synchronous=False, timeWait=None):
for method in self._schema.getMethods():
if name == method.name:
aIdx = 0
@@ -1105,8 +1117,13 @@ class Object(object):
if arg.dir.find("I") != -1:
self._session._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
+ if timeWait:
+ ttl = timeWait * 1000
+ else:
+ ttl = None
smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
+ ttl=ttl)
if synchronous:
try:
self._broker.cv.acquire()
@@ -1118,13 +1135,28 @@ class Object(object):
return None
def _invoke(self, name, args, kwargs):
- if self._sendMethodRequest(name, args, kwargs, True):
+ if "_timeout" in kwargs:
+ timeout = kwargs["_timeout"]
+ else:
+ timeout = self._broker.SYNC_TIME
+
+ if "_async" in kwargs and kwargs["_async"]:
+ sync = False
+ if "_timeout" not in kwargs:
+ timeout = None
+ else:
+ sync = True
+
+ seq = self._sendMethodRequest(name, args, kwargs, sync, timeout)
+ if seq:
+ if not sync:
+ return seq
try:
self._broker.cv.acquire()
starttime = time()
while self._broker.syncInFlight and self._broker.error == None:
- self._broker.cv.wait(self._broker.SYNC_TIME)
- if time() - starttime > self._broker.SYNC_TIME:
+ self._broker.cv.wait(timeout)
+ if time() - starttime > timeout:
self._session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
finally:
@@ -1407,9 +1439,11 @@ class Broker:
except:
return None, None
- def _message (self, body, routing_key="broker"):
+ def _message (self, body, routing_key="broker", ttl=None):
dp = self.amqpSession.delivery_properties()
dp.routing_key = routing_key
+ if ttl:
+ dp.ttl = ttl
mp = self.amqpSession.message_properties()
mp.content_type = "x-application/qmf"
mp.reply_to = self.amqpSession.reply_to("amq.direct", self.replyName)
diff --git a/qpid/python/qpid/testlib.py b/qpid/python/qpid/testlib.py
index 7f5ac1fcd2..114a56de08 100644
--- a/qpid/python/qpid/testlib.py
+++ b/qpid/python/qpid/testlib.py
@@ -365,8 +365,8 @@ class TestBase010(unittest.TestCase):
self.session = self.conn.session("test-session", timeout=10)
self.qmf = None
- def startQmf(self):
- self.qmf = qmf.console.Session()
+ def startQmf(self, handler=None):
+ self.qmf = qmf.console.Session(handler)
self.qmf_broker = self.qmf.addBroker(str(testrunner.url))
def connect(self, host=None, port=None):
diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py
index 545dc3db3b..29394c6a7d 100644
--- a/qpid/python/tests_0-10/management.py
+++ b/qpid/python/tests_0-10/management.py
@@ -20,6 +20,9 @@
from qpid.datatypes import Message, RangedSet
from qpid.testlib import TestBase010
from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
class ManagementTest (TestBase010):
"""
@@ -52,7 +55,7 @@ class ManagementTest (TestBase010):
self.assertEqual (res.body, body)
mc.removeChannel (mch)
- def test_broker_connectivity (self):
+ def test_methods_sync (self):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
@@ -60,16 +63,16 @@ class ManagementTest (TestBase010):
self.startQmf()
brokers = self.qmf.getObjects(_class="broker")
- self.assertEqual (len(brokers), 1)
+ self.assertEqual(len(brokers), 1)
broker = brokers[0]
body = "Echo Message Body"
- for seq in range (1, 10):
+ for seq in range(1, 20):
res = broker.echo(seq, body)
- self.assertEqual (res.status, 0)
- self.assertEqual (res.text, "OK")
- self.assertEqual (res.sequence, seq)
- self.assertEqual (res.body, body)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.text, "OK")
+ self.assertEqual(res.sequence, seq)
+ self.assertEqual(res.body, body)
def test_get_objects(self):
self.startQmf()
@@ -238,3 +241,57 @@ class ManagementTest (TestBase010):
pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
self.assertEqual (pq.msgDepth,0)
+ def test_methods_async (self):
+ """
+ """
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self.startQmf(handler)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+