summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests')
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index 2c7e164c42..3b69e3de33 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -939,6 +939,63 @@ class LongTests(HaBrokerTest):
if unexpected_dead:
raise Exception("Brokers not running: %s"%unexpected_dead)
+ def test_qmf_order(self):
+ """QPID 4402: HA QMF events can be out of order.
+ This test mimics the test described in the JIRA. Two threads repeatedly
+ declare the same auto-delete queue and close their connection.
+ """
+ broker = Broker(self)
+ class Receiver(Thread):
+ def __init__(self, qname):
+ Thread.__init__(self)
+ self.qname = qname
+ self.stopped = False
+
+ def run(self):
+ while not self.stopped:
+ self.connection = broker.connect()
+ try:
+ self.connection.session().receiver(
+ self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}")
+ except NotFound: pass # Can occur occasionally, not an error.
+ try: self.connection.close()
+ except: pass
+
+ class QmfObject(object):
+ """Track existance of an object and validate QMF events"""
+ def __init__(self, type_name, name_field, name):
+ self.type_name, self.name_field, self.name = type_name, name_field, name
+ self.exists = False
+
+ def qmf_event(self, event):
+ content = event.content[0]
+ event_type = content['_schema_id']['_class_name']
+ values = content['_values']
+ if event_type == self.type_name+"Declare" and values[self.name_field] == self.name:
+ disp = values['disp']
+ log.debug("Event %s: disp=%s exists=%s"%(
+ event_type, values['disp'], self.exists))
+ if self.exists: assert values['disp'] == 'existing'
+ else: assert values['disp'] == 'created'
+ self.exists = True
+ elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name:
+ log.debug("Event %s: exists=%s"%(event_type, self.exists))
+ assert self.exists
+ self.exists = False
+
+ # Verify order of QMF events.
+ helper = EventHelper()
+ r = broker.connect().session().receiver(helper.eventAddress())
+ threads = [Receiver("qq"), Receiver("qq")]
+ for t in threads: t.start()
+ queue = QmfObject("queue", "qName", "qq")
+ finish = time.time() + self.duration()
+ try:
+ while time.time() < finish:
+ queue.qmf_event(r.fetch())
+ finally:
+ for t in threads: t.stopped = True; t.join()
+
class RecoveryTests(HaBrokerTest):
"""Tests for recovery after a failure."""