summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2013-05-13 18:46:23 +0000
committerAlan Conway <aconway@apache.org>2013-05-13 18:46:23 +0000
commit718202f10df504ba8c956a4725fa06f5a86a2db2 (patch)
treeec8b81ad9c498d94438326eb6916069df08e9363
parent39d7f4c245989ee09b5d6f4b36be0b4ba212ba38 (diff)
downloadqpid-python-718202f10df504ba8c956a4725fa06f5a86a2db2.tar.gz
NO-JIRA: HA moved ha_store_tests.py into ha_tests.py test suite.
It was too easy to forget the store tests in a separate file. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1482023 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/tests/CMakeLists.txt3
-rw-r--r--cpp/src/tests/Makefile.am1
-rwxr-xr-xcpp/src/tests/ha_store_tests.py125
-rwxr-xr-xcpp/src/tests/ha_tests.py88
4 files changed, 88 insertions, 129 deletions
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index cee7ae5d20..f3ee40621f 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -334,9 +334,6 @@ if (PYTHON_EXECUTABLE)
if (BUILD_AMQP)
add_test (interlink_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/interlink_tests.py)
endif (BUILD_AMQP)
- if (BUILD_LEGACYSTORE)
- add_test (ha_store_tests ${test_wrap} ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/ha_store_tests.py)
- endif (BUILD_LEGACYSTORE)
add_test (ipv6_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/ipv6_test${test_script_suffix})
add_test (federation_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_tests${test_script_suffix})
add_test (federation_sys_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_federation_sys_tests${test_script_suffix})
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 43a538d6d0..92532895c5 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -350,7 +350,6 @@ EXTRA_DIST += \
run_interlink_tests \
interlink_tests.py \
brokertest.py \
- ha_store_tests.py \
test_env.ps1.in
check_LTLIBRARIES += libdlclose_noop.la
diff --git a/cpp/src/tests/ha_store_tests.py b/cpp/src/tests/ha_store_tests.py
deleted file mode 100755
index c6a514977c..0000000000
--- a/cpp/src/tests/ha_store_tests.py
+++ /dev/null
@@ -1,125 +0,0 @@
-#!/usr/bin/env python
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-"""
-This module contains tests for HA functionality that requires a store.
-It will only be run if the STORE_LIB environment variable is defined.
-"""
-
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
-import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
-from qpid.datatypes import uuid4, UUID
-from brokertest import *
-from ha_test import *
-from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerAgent
-
-
-class StoreTests(BrokerTest):
- """Test for HA with persistence."""
-
- def test_store_recovery(self):
- """Verify basic store and recover functionality"""
- cluster = HaCluster(self, 1)
- sn = cluster[0].connect().session()
- # Create queue qq, exchange exx and binding between them
- s = sn.sender("qq;{create:always,node:{durable:true}}")
- sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:exx,key:k,queue:qq}]}}")
- for m in ["foo", "bar", "baz"]: s.send(Message(m, durable=True))
- r = cluster[0].connect().session().receiver("qq")
- self.assertEqual(r.fetch().content, "foo")
- r.session.acknowledge()
- # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
- # the dequeue operation on qq.
- s.send(Message("flush", durable=True))
-
- def verify(broker, x_count):
- sn = broker.connect().session()
- assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
- sn.sender("exx/k").send(Message("x", durable=True))
- assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
-
- verify(cluster[0], 0) # Sanity check
- cluster.bounce(0)
- cluster[0].wait_status("active")
- verify(cluster[0], 1) # Loaded from store
- cluster.start()
- cluster[1].wait_status("ready")
- cluster.kill(0)
- cluster[1].wait_status("active")
- verify(cluster[1], 2)
- cluster.bounce(1, promote_next=False)
- cluster[1].promote()
- cluster[1].wait_status("active")
- verify(cluster[1], 3)
-
- def test_catchup_store(self):
- """Verify that a backup erases queue data from store recovery before
- doing catch-up from the primary."""
- cluster = HaCluster(self, 2)
- sn = cluster[0].connect().session()
- s1 = sn.sender("q1;{create:always,node:{durable:true}}")
- for m in ["foo","bar"]: s1.send(Message(m, durable=True))
- s2 = sn.sender("q2;{create:always,node:{durable:true}}")
- sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}")
- sk2.send(Message("hello", durable=True))
- # Wait for backup to catch up.
- cluster[1].assert_browse_backup("q1", ["foo","bar"])
- cluster[1].assert_browse_backup("q2", ["hello"])
-
- # Make changes that the backup doesn't see
- cluster.kill(1, promote_next=False)
- r1 = cluster[0].connect().session().receiver("q1")
- for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
- r1.session.acknowledge()
- for m in ["x","y","z"]: s1.send(Message(m, durable=True))
- # Use old connection to unbind
- us = cluster[0].connect_old().session(str(uuid4()))
- us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2")
- us.exchange_bind(exchange="ex", binding_key="k1", queue="q1")
- # Restart both brokers from store to get inconsistent sequence numbering.
- cluster.bounce(0, promote_next=False)
- cluster[0].promote()
- cluster[0].wait_status("active")
- cluster.restart(1)
- cluster[1].wait_status("ready")
-
- # Verify state
- cluster[0].assert_browse("q1", ["x","y","z"])
- cluster[1].assert_browse_backup("q1", ["x","y","z"])
- sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
- sn.sender("ex/k1").send("boo")
- cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
- cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
- sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped.
- sn.sender("q2").send("end") # mark the end of the queue for assert_browse
- cluster[0].assert_browse("q2", ["hello", "end"])
- cluster[1].assert_browse_backup("q2", ["hello", "end"])
-
-if __name__ == "__main__":
- shutil.rmtree("brokertest.tmp", True)
- qpid_ha = os.getenv("QPID_HA_EXEC")
- if qpid_ha and os.path.exists(qpid_ha):
- os.execvp("qpid-python-test",
- ["qpid-python-test", "-m", "ha_store_tests"] + sys.argv[1:])
- else:
- print "Skipping ha_store_tests, %s not available"%(qpid_ha)
diff --git a/cpp/src/tests/ha_tests.py b/cpp/src/tests/ha_tests.py
index 8ee27b5519..db4c7f62e7 100755
--- a/cpp/src/tests/ha_tests.py
+++ b/cpp/src/tests/ha_tests.py
@@ -1206,6 +1206,94 @@ class ConfigurationTests(HaBrokerTest):
b = start("none", "none")
check(b, "", "")
+
+class StoreTests(BrokerTest):
+ """Test for HA with persistence."""
+ def check_skip(self):
+ if not BrokerTest.store_lib:
+ print "WARNING: skipping HA+store tests, no store lib found."
+ return not BrokerTest.store_lib
+
+ def test_store_recovery(self):
+ """Verify basic store and recover functionality"""
+ if self.check_skip(): return
+ cluster = HaCluster(self, 1)
+ sn = cluster[0].connect().session()
+ # Create queue qq, exchange exx and binding between them
+ s = sn.sender("qq;{create:always,node:{durable:true}}")
+ sk = sn.sender("exx/k;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:exx,key:k,queue:qq}]}}")
+ for m in ["foo", "bar", "baz"]: s.send(Message(m, durable=True))
+ r = cluster[0].connect().session().receiver("qq")
+ self.assertEqual(r.fetch().content, "foo")
+ r.session.acknowledge()
+ # FIXME aconway 2012-09-21: sending this message is an ugly hack to flush
+ # the dequeue operation on qq.
+ s.send(Message("flush", durable=True))
+
+ def verify(broker, x_count):
+ sn = broker.connect().session()
+ assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count)*["x"])
+ sn.sender("exx/k").send(Message("x", durable=True))
+ assert_browse(sn, "qq", [ "bar", "baz", "flush" ]+ (x_count+1)*["x"])
+
+ verify(cluster[0], 0) # Sanity check
+ cluster.bounce(0)
+ cluster[0].wait_status("active")
+ verify(cluster[0], 1) # Loaded from store
+ cluster.start()
+ cluster[1].wait_status("ready")
+ cluster.kill(0)
+ cluster[1].wait_status("active")
+ verify(cluster[1], 2)
+ cluster.bounce(1, promote_next=False)
+ cluster[1].promote()
+ cluster[1].wait_status("active")
+ verify(cluster[1], 3)
+
+ def test_catchup_store(self):
+ """Verify that a backup erases queue data from store recovery before
+ doing catch-up from the primary."""
+ if self.check_skip(): return
+ cluster = HaCluster(self, 2)
+ sn = cluster[0].connect().session()
+ s1 = sn.sender("q1;{create:always,node:{durable:true}}")
+ for m in ["foo","bar"]: s1.send(Message(m, durable=True))
+ s2 = sn.sender("q2;{create:always,node:{durable:true}}")
+ sk2 = sn.sender("ex/k2;{create:always,node:{type:topic, durable:true, x-declare:{type:'direct'}, x-bindings:[{exchange:ex,key:k2,queue:q2}]}}")
+ sk2.send(Message("hello", durable=True))
+ # Wait for backup to catch up.
+ cluster[1].assert_browse_backup("q1", ["foo","bar"])
+ cluster[1].assert_browse_backup("q2", ["hello"])
+
+ # Make changes that the backup doesn't see
+ cluster.kill(1, promote_next=False)
+ r1 = cluster[0].connect().session().receiver("q1")
+ for m in ["foo", "bar"]: self.assertEqual(r1.fetch().content, m)
+ r1.session.acknowledge()
+ for m in ["x","y","z"]: s1.send(Message(m, durable=True))
+ # Use old connection to unbind
+ us = cluster[0].connect_old().session(str(uuid4()))
+ us.exchange_unbind(exchange="ex", binding_key="k2", queue="q2")
+ us.exchange_bind(exchange="ex", binding_key="k1", queue="q1")
+ # Restart both brokers from store to get inconsistent sequence numbering.
+ cluster.bounce(0, promote_next=False)
+ cluster[0].promote()
+ cluster[0].wait_status("active")
+ cluster.restart(1)
+ cluster[1].wait_status("ready")
+
+ # Verify state
+ cluster[0].assert_browse("q1", ["x","y","z"])
+ cluster[1].assert_browse_backup("q1", ["x","y","z"])
+ sn = cluster[0].connect().session() # FIXME aconway 2012-09-25: should fail over!
+ sn.sender("ex/k1").send("boo")
+ cluster[0].assert_browse_backup("q1", ["x","y","z", "boo"])
+ cluster[1].assert_browse_backup("q1", ["x","y","z", "boo"])
+ sn.sender("ex/k2").send("hoo") # q2 was unbound so this should be dropped.
+ sn.sender("q2").send("end") # mark the end of the queue for assert_browse
+ cluster[0].assert_browse("q2", ["hello", "end"])
+ cluster[1].assert_browse_backup("q2", ["hello", "end"])
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)
qpid_ha = os.getenv("QPID_HA_EXEC")