diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-06-02 19:25:57 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-06-02 19:25:57 +0000 |
| commit | 4032bf8071990156a95889f01f505d757263647a (patch) | |
| tree | d88ab7ab66e5e056deaa19072df4e4247c111c66 /qpid/python | |
| parent | c2484203245c55e1286b19cf31124aef6e411ba7 (diff) | |
| download | qpid-python-4032bf8071990156a95889f01f505d757263647a.tar.gz | |
modified start and stop to function independently of fetch vs listen, added Receiver.pending() and added tests for Receiver.start(), Receiver.stop(), and Receiver.pending()
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@781132 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
| -rw-r--r-- | qpid/python/qpid/messaging.py | 22 | ||||
| -rw-r--r-- | qpid/python/qpid/tests/messaging.py | 84 |
2 files changed, 87 insertions, 19 deletions
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py index 7063dfe684..931784024e 100644 --- a/qpid/python/qpid/messaging.py +++ b/qpid/python/qpid/messaging.py @@ -365,6 +365,14 @@ class Session(Lockable): receiver._link() return receiver + @synchronized + def _count(self, predicate): + result = 0 + for msg in self.incoming: + if predicate(msg): + result += 1 + return result + def _peek(self, predicate): for msg in self.incoming: if predicate(msg): @@ -647,6 +655,10 @@ class Receiver(Lockable): self._ssn = None @synchronized + def pending(self): + return self.session._count(self._pred) + + @synchronized def listen(self, listener=None): """ Sets the message listener for this receiver. @@ -655,13 +667,6 @@ class Receiver(Lockable): @param listener: a callable object to be notified on message arrival """ self.listener = listener - if self.listener is None: - self._ssn.message_stop(self.destination) - self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL, - sync=True) - self._ssn.sync() - else: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) def _pred(self, msg): return msg._receiver == self @@ -692,8 +697,7 @@ class Receiver(Lockable): def _start(self): self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, 0xFFFFFFFFL) - if self.listener is not None: - self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) + self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 0xFFFFFFFFL) @synchronized def start(self): diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py index 53216a249a..ef82d87f13 100644 --- a/qpid/python/qpid/tests/messaging.py +++ b/qpid/python/qpid/tests/messaging.py @@ -60,10 +60,10 @@ class Base(Test): ssn.acknowledge() assert msg.content == content - def drain(self, rcv): + def drain(self, rcv, limit=None): msgs = [] try: - while True: + while limit is None or len(msgs) < limit: msgs.append(rcv.fetch(0)) except Empty: pass @@ -245,14 +245,21 @@ class ReceiverTests(Base): def setup_receiver(self): return self.ssn.receiver("test-receiver-queue") + def send(self, base, count = None): + if count is None: + content = "%s[%s]" % (base, uuid4()) + else: + content = "%s[%s, %s]" % (base, count, uuid4()) + self.snd.send(content) + return content + def testListen(self): msgs = Queue() def listener(m): msgs.put(m) self.ssn.acknowledge(m) self.rcv.listen(listener) - content = "testListen[%s]" % uuid4() - self.snd.send(content) + content = self.send("testListen") try: msg = msgs.get(timeout=3) assert False, "did not expect message: %s" % msg @@ -276,18 +283,75 @@ class ReceiverTests(Base): elapsed = time.time() - start assert elapsed >= 3 - content = "testListen[%s]" % uuid4() - for i in range(3): - self.snd.send(content) + one = self.send("testListen", 1) + two = self.send("testListen", 2) + three = self.send("testListen", 3) msg = self.rcv.fetch(0) - assert msg.content == content + assert msg.content == one msg = self.rcv.fetch(3) - assert msg.content == content + assert msg.content == two msg = self.rcv.fetch() + assert msg.content == three + self.ssn.acknowledge() + + def testStart(self): + content = self.send("testStart") + time.sleep(2) + assert self.rcv.pending() == 0 + self.rcv.start() + time.sleep(2) + assert self.rcv.pending() == 1 + msg = self.rcv.fetch(0) assert msg.content == content + assert self.rcv.pending() == 0 + self.ssn.acknowledge() + + def testStop(self): + self.rcv.start() + one = self.send("testStop", 1) + time.sleep(2) + assert self.rcv.pending() == 1 + msg = self.rcv.fetch(0) + assert msg.content == one + + self.rcv.stop() + + two = self.send("testStop", 2) + time.sleep(2) + assert self.rcv.pending() == 0 + msg = self.rcv.fetch(0) + assert msg.content == two + + self.ssn.acknowledge() + + def testPending(self): + self.rcv.start() + + assert self.rcv.pending() == 0 + + for i in range(3): + self.send("testPending", i) + time.sleep(2) + + assert self.rcv.pending() == 3 + + for i in range(3, 10): + self.send("testPending", i) + time.sleep(2) + + assert self.rcv.pending() == 10 + + self.drain(self.rcv, limit=3) + + assert self.rcv.pending() == 7 + + self.drain(self.rcv) + + assert self.rcv.pending() == 0 + self.ssn.acknowledge() - # XXX: need testStart, testStop and testClose + # XXX: need testClose class MessageTests(Base): |
