diff options
Diffstat (limited to 'qpid/cpp/src/tests')
| -rwxr-xr-x | qpid/cpp/src/tests/ha_tests.py | 57 |
1 files changed, 57 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py index 2c7e164c42..3b69e3de33 100755 --- a/qpid/cpp/src/tests/ha_tests.py +++ b/qpid/cpp/src/tests/ha_tests.py @@ -939,6 +939,63 @@ class LongTests(HaBrokerTest): if unexpected_dead: raise Exception("Brokers not running: %s"%unexpected_dead) + def test_qmf_order(self): + """QPID 4402: HA QMF events can be out of order. + This test mimics the test described in the JIRA. Two threads repeatedly + declare the same auto-delete queue and close their connection. + """ + broker = Broker(self) + class Receiver(Thread): + def __init__(self, qname): + Thread.__init__(self) + self.qname = qname + self.stopped = False + + def run(self): + while not self.stopped: + self.connection = broker.connect() + try: + self.connection.session().receiver( + self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}") + except NotFound: pass # Can occur occasionally, not an error. + try: self.connection.close() + except: pass + + class QmfObject(object): + """Track existance of an object and validate QMF events""" + def __init__(self, type_name, name_field, name): + self.type_name, self.name_field, self.name = type_name, name_field, name + self.exists = False + + def qmf_event(self, event): + content = event.content[0] + event_type = content['_schema_id']['_class_name'] + values = content['_values'] + if event_type == self.type_name+"Declare" and values[self.name_field] == self.name: + disp = values['disp'] + log.debug("Event %s: disp=%s exists=%s"%( + event_type, values['disp'], self.exists)) + if self.exists: assert values['disp'] == 'existing' + else: assert values['disp'] == 'created' + self.exists = True + elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name: + log.debug("Event %s: exists=%s"%(event_type, self.exists)) + assert self.exists + self.exists = False + + # Verify order of QMF events. + helper = EventHelper() + r = broker.connect().session().receiver(helper.eventAddress()) + threads = [Receiver("qq"), Receiver("qq")] + for t in threads: t.start() + queue = QmfObject("queue", "qName", "qq") + finish = time.time() + self.duration() + try: + while time.time() < finish: + queue.qmf_event(r.fetch()) + finally: + for t in threads: t.stopped = True; t.join() + class RecoveryTests(HaBrokerTest): """Tests for recovery after a failure.""" |
