summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2015-03-03 14:58:01 +0000
committerKeith Wall <kwall@apache.org>2015-03-03 14:58:01 +0000
commit11a201863b9c989151cf117450785504a61df5ce (patch)
treeba96c870aa9ed34edcac0bd07fc0e0138f715bbd /qpid/cpp/src/tests
parent9dc57fe738f366d875c2319dafdfa2c50ce2f20b (diff)
parent83120216de949c1cae3004c74475cc6c54cd61f1 (diff)
downloadqpid-python-11a201863b9c989151cf117450785504a61df5ce.tar.gz
merge from trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/QPID-6262-JavaBrokerNIO@1663719 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/tests')
-rw-r--r--qpid/cpp/src/tests/BrokerFixture.h12
-rw-r--r--qpid/cpp/src/tests/CMakeLists.txt5
-rw-r--r--qpid/cpp/src/tests/Variant.cpp24
-rw-r--r--qpid/cpp/src/tests/brokertest.py46
-rwxr-xr-xqpid/cpp/src/tests/ha_test.py47
-rwxr-xr-xqpid/cpp/src/tests/ha_tests.py35
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py1
-rwxr-xr-xqpid/cpp/src/tests/interop_tests.py220
-rw-r--r--qpid/cpp/src/tests/qpid-receive.cpp9
-rw-r--r--qpid/cpp/src/tests/qpid-send.cpp21
-rw-r--r--qpid/cpp/src/tests/qpid-txtest2.cpp5
-rwxr-xr-xqpid/cpp/src/tests/swig_python_tests7
-rw-r--r--qpid/cpp/src/tests/test_env.sh.in16
-rw-r--r--qpid/cpp/src/tests/test_store.cpp19
14 files changed, 374 insertions, 93 deletions
diff --git a/qpid/cpp/src/tests/BrokerFixture.h b/qpid/cpp/src/tests/BrokerFixture.h
index c455dd10fc..474b9d747f 100644
--- a/qpid/cpp/src/tests/BrokerFixture.h
+++ b/qpid/cpp/src/tests/BrokerFixture.h
@@ -101,11 +101,13 @@ struct BrokerFixture : private boost::noncopyable {
opts.auth=false;
// Argument parsing
- std::vector<const char*> argv(args.size());
- for (size_t i = 0; i<args.size(); ++i)
- argv[i] = args[i].c_str();
- Plugin::addOptions(opts);
- opts.parse(argv.size(), &argv[0]);
+ if (args.size() > 0) {
+ std::vector<const char*> argv(args.size());
+ for (size_t i = 0; i<args.size(); ++i)
+ argv[i] = args[i].c_str();
+ Plugin::addOptions(opts);
+ opts.parse(argv.size(), &argv[0]);
+ }
broker = Broker::create(opts);
// TODO aconway 2007-12-05: At one point BrokerFixture
// tests could hang in Connection ctor if the following
diff --git a/qpid/cpp/src/tests/CMakeLists.txt b/qpid/cpp/src/tests/CMakeLists.txt
index c914c50e33..f3443aa57e 100644
--- a/qpid/cpp/src/tests/CMakeLists.txt
+++ b/qpid/cpp/src/tests/CMakeLists.txt
@@ -360,6 +360,11 @@ if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
# paged queue not yet implemented for windows
add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix})
endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+
+if (BUILD_AMQP)
+ add_test (interop_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/interop_tests.py)
+endif (BUILD_AMQP)
+
add_test (ha_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/ha_tests.py)
add_test (qpidd_qmfv2_tests ${python_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/qpidd_qmfv2_tests.py)
if (BUILD_AMQP)
diff --git a/qpid/cpp/src/tests/Variant.cpp b/qpid/cpp/src/tests/Variant.cpp
index d2394bfbad..d6605f9fe5 100644
--- a/qpid/cpp/src/tests/Variant.cpp
+++ b/qpid/cpp/src/tests/Variant.cpp
@@ -18,14 +18,16 @@
* under the License.
*
*/
-#include <iostream>
-#include "qpid/types/Variant.h"
-#include "qpid/amqp_0_10/Codecs.h"
#include "unit_test.h"
+#include "qpid/types/Variant.h"
+#include "qpid/amqp_0_10/Codecs.h"
+#include <boost/assign.hpp>
+#include <iostream>
using namespace qpid::types;
using namespace qpid::amqp_0_10;
+using boost::assign::list_of;
namespace qpid {
namespace tests {
@@ -807,6 +809,22 @@ QPID_AUTO_TEST_CASE(parse)
BOOST_CHECK(a.getType()==types::VAR_DOUBLE);
}
+QPID_AUTO_TEST_CASE(described)
+{
+ Variant a;
+ BOOST_CHECK(!a.isDescribed());
+ a.getDescriptors().push_back("foo");
+ BOOST_CHECK(a.isDescribed());
+ BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo"));
+ a = 42;
+ BOOST_CHECK(a.isDescribed());
+ BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo"));
+ a.getDescriptors().push_back(33);
+ BOOST_CHECK_EQUAL(a.getDescriptors(), list_of<Variant>("foo")(33));
+ a.getDescriptors().clear();
+ BOOST_CHECK(!a.isDescribed());
+}
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests
diff --git a/qpid/cpp/src/tests/brokertest.py b/qpid/cpp/src/tests/brokertest.py
index 598879d4ad..2566bc527d 100644
--- a/qpid/cpp/src/tests/brokertest.py
+++ b/qpid/cpp/src/tests/brokertest.py
@@ -21,9 +21,9 @@
import os, signal, string, tempfile, subprocess, socket, threading, time, imp, re
import qpid, traceback, signal
+import proton
from qpid import connection, util
from qpid.compat import format_exc
-from qpid.harness import Skipped
from unittest import TestCase
from copy import copy
from threading import Thread, Lock, Condition
@@ -49,13 +49,18 @@ from qpidtoollibs import BrokerAgent
import qpid.messaging
qm = qpid.messaging
qpid_messaging = None
+
+def env_has_log_config():
+ """True if there are qpid log configuratoin settings in the environment."""
+ return "QPID_LOG_ENABLE" in os.environ or "QPID_TRACE" in os.environ
+
if not os.environ.get("QPID_PY_NO_SWIG"):
try:
import qpid_messaging
from qpid.datatypes import uuid4
qm = qpid_messaging
# Silence warnings from swigged messaging library unless enabled in environment.
- if "QPID_LOG_ENABLE" not in os.environ and "QPID_TRACE" not in os.environ:
+ if not env_has_log_config():
qm.Logger.configure(["--log-enable=error"])
except ImportError:
print "Cannot load python SWIG bindings, falling back to native qpid.messaging."
@@ -136,7 +141,7 @@ _popen_id = AtomicCounter() # Popen identifier for use in output file names.
# Constants for file descriptor arguments to Popen
FILE = "FILE" # Write to file named after process
-PIPE = subprocess.PIPE
+from subprocess import PIPE, STDOUT
class Popen(subprocess.Popen):
"""
@@ -202,7 +207,7 @@ class Popen(subprocess.Popen):
def communicate(self, input=None):
ret = subprocess.Popen.communicate(self, input)
- self.cleanup()
+ self._cleanup()
return ret
def is_running(self): return self.poll() is None
@@ -254,6 +259,7 @@ class Popen(subprocess.Popen):
def cmd_str(self): return " ".join([str(s) for s in self.cmd])
+
def checkenv(name):
value = os.getenv(name)
if not value: raise Exception("Environment variable %s is not set" % name)
@@ -308,7 +314,7 @@ class Broker(Popen):
cmd += ["--log-to-stderr=no"]
# Add default --log-enable arguments unless args already has --log arguments.
- if not [l for l in args if l.startswith("--log")]:
+ if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
args += ["--log-enable=info+"]
if test_store: cmd += ["--load-module", BrokerTest.test_store_lib,
@@ -444,10 +450,11 @@ def browse(session, queue, timeout=0, transform=lambda m: m.content):
finally:
r.close()
-def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg="browse failed"):
+def assert_browse(session, queue, expect_contents, timeout=0, transform=lambda m: m.content, msg=None):
"""Assert that the contents of messages on queue (as retrieved
using session and timeout) exactly match the strings in
expect_contents"""
+ if msg is None: msg = "browse '%s' failed" % queue
actual_contents = browse(session, queue, timeout, transform=transform)
if msg: msg = "%s: %r != %r"%(msg, expect_contents, actual_contents)
assert expect_contents == actual_contents, msg
@@ -486,6 +493,18 @@ class BrokerTest(TestCase):
test_store_lib = os.getenv("TEST_STORE_LIB")
rootdir = os.getcwd()
+ PN_VERSION = (proton.VERSION_MAJOR, proton.VERSION_MINOR)
+ PN_TX_VERSION = (0, 9)
+
+ amqp_tx_supported = PN_VERSION >= PN_TX_VERSION
+
+ @classmethod
+ def amqp_tx_warning(cls):
+ if not cls.amqp_tx_supported:
+ print "WARNING: Cannot test transactions over AMQP 1.0, proton version %s.%s < %s.%s" % (cls.PN_VERSION + cls.PN_TX_VERSION)
+ return False
+ return True
+
def configure(self, config): self.config=config
def setUp(self):
@@ -498,8 +517,8 @@ class BrokerTest(TestCase):
if qpid_messaging and self.amqp_lib: default_protocol="amqp1.0"
else: default_protocol="amqp0-10"
self.protocol = defs.get("PROTOCOL") or default_protocol
- self.tx_protocol = "amqp0-10" # Transactions not yet supported over 1.0
-
+ self.tx_protocol = self.protocol
+ if not self.amqp_tx_supported: self.tx_protocol = "amqp0-10"
def tearDown(self):
err = []
@@ -530,15 +549,22 @@ class BrokerTest(TestCase):
self.teardown_add(p)
return p
- def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False):
+ def broker(self, args=[], name=None, expect=EXPECT_RUNNING, wait=True, port=0, show_cmd=False, **kw):
"""Create and return a broker ready for use"""
- b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd)
+ b = Broker(self, args=args, name=name, expect=expect, port=port, show_cmd=show_cmd, **kw)
if (wait):
try: b.ready()
except Exception, e:
raise RethrownException("Failed to start broker %s(%s): %s" % (b.name, b.log, e))
return b
+ def check_output(self, args, stdin=None):
+ p = self.popen(args, stdout=PIPE, stderr=STDOUT)
+ out = p.communicate(stdin)
+ if p.returncode != 0:
+ raise Exception("%s exit code %s, output:\n%s" % (args, p.returncode, out[0]))
+ return out[0]
+
def browse(self, *args, **kwargs): browse(*args, **kwargs)
def assert_browse(self, *args, **kwargs): assert_browse(*args, **kwargs)
def assert_browse_retry(self, *args, **kwargs): assert_browse_retry(*args, **kwargs)
diff --git a/qpid/cpp/src/tests/ha_test.py b/qpid/cpp/src/tests/ha_test.py
index 40ea3854c9..82ca808cb1 100755
--- a/qpid/cpp/src/tests/ha_test.py
+++ b/qpid/cpp/src/tests/ha_test.py
@@ -24,6 +24,7 @@ from brokertest import *
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent
+from qpid.harness import Skipped
log = getLogger(__name__)
@@ -129,12 +130,9 @@ class HaBroker(Broker):
args += ["--load-module", BrokerTest.ha_lib,
# Non-standard settings for faster tests.
"--link-maintenance-interval=0.1",
- # Heartbeat and negotiate time are needed so that a broker wont
- # stall on an address that doesn't currently have a broker running.
- "--max-negotiate-time=1000",
"--ha-cluster=%s"%ha_cluster]
# Add default --log-enable arguments unless args already has --log arguments.
- if not [l for l in args if l.startswith("--log")]:
+ if not env_has_log_config() and not [l for l in args if l.startswith("--log")]:
args += ["--log-enable=info+", "--log-enable=debug+:ha::"]
if not [h for h in args if h.startswith("--link-heartbeat-interval")]:
args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)]
@@ -159,13 +157,20 @@ acl allow all all
Broker.__init__(self, test, args, port=ha_port.port, **kwargs)
# Do some static setup to locate the qpid-config and qpid-ha tools.
- qpid_ha_script=import_script(os.path.join(os.getenv("PYTHON_COMMANDS"),"qpid-ha"))
- qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
- assert os.path.isfile(qpid_config_path)
+ @property
+ def qpid_ha_script(self):
+ if not hasattr(self, "_qpid_ha_script"):
+ qpid_ha_exec = os.getenv("QPID_HA_EXEC")
+ if not qpid_ha_exec or not os.path.isfile(qpid_ha_exec):
+ raise Skipped("qpid-ha not available")
+ self._qpid_ha_script = import_script(qpid_ha_exec)
+ return self._qpid_ha_script
def __repr__(self): return "<HaBroker:%s:%d>"%(self.log, self.port())
def qpid_ha(self, args):
+ if not self.qpid_ha_script:
+ raise Skipped("qpid-ha not available")
try:
cred = self.client_credentials
url = self.host_port()
@@ -195,33 +200,37 @@ acl allow all all
def ha_status(self): return self.qmf().status
- def wait_status(self, status, timeout=5):
+ def wait_status(self, status, timeout=10):
+
def try_get_status():
self._status = "<unknown>"
- # Ignore ConnectionError, the broker may not be up yet.
try:
self._status = self.ha_status()
- return self._status == status;
- except qm.ConnectionError: return False
+ except qm.ConnectionError, e:
+ # Record the error but don't raise, the broker may not be up yet.
+ self._status = "%s: %s" % (type(e).__name__, e)
+ return self._status == status;
assert retry(try_get_status, timeout=timeout), "%s expected=%r, actual=%r"%(
self, status, self._status)
- def wait_queue(self, queue, timeout=1, msg="wait_queue"):
+ def wait_queue(self, queue, timeout=10, msg="wait_queue"):
""" Wait for queue to be visible via QMF"""
agent = self.agent
- assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), msg+"queue %s not present"%queue
+ assert retry(lambda: agent.getQueue(queue) is not None, timeout=timeout), \
+ "%s queue %s not present" % (msg, queue)
- def wait_no_queue(self, queue, timeout=1, msg="wait_no_queue"):
+ def wait_no_queue(self, queue, timeout=10, msg="wait_no_queue"):
""" Wait for queue to be invisible via QMF"""
agent = self.agent
assert retry(lambda: agent.getQueue(queue) is None, timeout=timeout), "%s: queue %s still present"%(msg,queue)
- # TODO aconway 2012-05-01: do direct python call to qpid-config code.
def qpid_config(self, args):
+ qpid_config_exec = os.getenv("QPID_CONFIG_EXEC")
+ if not qpid_config_exec or not os.path.isfile(qpid_config_exec):
+ raise Skipped("qpid-config not available")
assert subprocess.call(
- [self.qpid_config_path, "--broker", self.host_port()]+args,
- stdout=1, stderr=subprocess.STDOUT
- ) == 0
+ [qpid_config_exec, "--broker", self.host_port()]+args, stdout=1, stderr=subprocess.STDOUT
+ ) == 0, "qpid-config failed"
def config_replicate(self, from_broker, queue):
self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
@@ -325,7 +334,7 @@ class HaCluster(object):
ha_port = self._ports[i]
b = HaBroker(ha_port.test, ha_port, brokers_url=self.url, name=name,
args=args, **self.kwargs)
- b.ready(timeout=5)
+ b.ready(timeout=10)
return b
def start(self):
diff --git a/qpid/cpp/src/tests/ha_tests.py b/qpid/cpp/src/tests/ha_tests.py
index a43b939ee3..180831569f 100755
--- a/qpid/cpp/src/tests/ha_tests.py
+++ b/qpid/cpp/src/tests/ha_tests.py
@@ -1025,8 +1025,8 @@ class LongTests(HaBrokerTest):
"--broker", brokers[0].host_port(),
"--address", "q;{create:always}",
"--messages=1000",
- "--tx=10"
- # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
+ "--tx=10",
+ "--connection-options={protocol:%s}" % self.tx_protocol
])
receiver = self.popen(
["qpid-receive",
@@ -1034,8 +1034,8 @@ class LongTests(HaBrokerTest):
"--address", "q;{create:always}",
"--messages=990",
"--timeout=10",
- "--tx=10"
- # TODO aconway 2014-02-21: can't use amqp1.0 for transactions yet
+ "--tx=10",
+ "--connection-options={protocol:%s}" % self.tx_protocol
])
self.assertEqual(sender.wait(), 0)
self.assertEqual(receiver.wait(), 0)
@@ -1268,7 +1268,7 @@ class StoreTests(HaBrokerTest):
"""Verify that a backup erases queue data from store recovery before
doing catch-up from the primary."""
if self.check_skip(): return
- cluster = HaCluster(self, 2, args=['--log-enable=trace+:ha', '--log-enable=trace+:Store'])
+ cluster = HaCluster(self, 2)
sn = cluster[0].connect(heartbeat=HaBroker.heartbeat).session()
s1 = sn.sender("q1;{create:always,node:{durable:true}}")
for m in ["foo","bar"]: s1.send(qm.Message(m, durable=True))
@@ -1532,7 +1532,7 @@ class TransactionTests(HaBrokerTest):
except qm.TransactionUnknown: pass
for b in cluster: self.assert_tx_clean(b)
try: tx.connection.close()
- except TransactionUnknown: pass # Occasionally get exception on close.
+ except qm.TransactionUnknown: pass # Occasionally get exception on close.
finally: l.restore()
def test_tx_no_backups(self):
@@ -1622,21 +1622,26 @@ class TransactionTests(HaBrokerTest):
import qpid_tests.broker_0_10
except ImportError:
raise Skipped("Tests not found")
-
cluster = HaCluster(self, 3)
- self.popen(["qpid-txtest", "-p%s"%cluster[0].port()]).assert_exit_ok()
+ if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"]
+ self.popen(["qpid-txtest2", "--broker", cluster[0].host_port()]).assert_exit_ok()
+ print
self.popen(["qpid-python-test",
"-m", "qpid_tests.broker_0_10",
+ "-m", "qpid_tests.broker_1_0",
"-b", "localhost:%s"%(cluster[0].port()),
- "*.tx.*"]).assert_exit_ok()
+ "*.tx.*"], stdout=None, stderr=None).assert_exit_ok()
if __name__ == "__main__":
- outdir = "ha_tests.tmp"
- shutil.rmtree(outdir, True)
- qpid_ha = os.getenv("QPID_HA_EXEC")
- if qpid_ha and os.path.exists(qpid_ha):
+ qpid_ha_exec = os.getenv("QPID_HA_EXEC")
+ if qpid_ha_exec and os.path.isfile(qpid_ha_exec):
+ BrokerTest.amqp_tx_warning()
+ outdir = "ha_tests.tmp"
+ shutil.rmtree(outdir, True)
os.execvp("qpid-python-test",
- ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir]
+ ["qpid-python-test", "-m", "ha_tests", "-DOUTDIR=%s"%outdir]
+ sys.argv[1:])
else:
- print "Skipping ha_tests, %s not available"%(qpid_ha)
+ print "Skipping ha_tests, qpid-ha not available"
+
+
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index 20ce6167a8..3eec2422f1 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -88,6 +88,7 @@ class AmqpBrokerTest(BrokerTest):
result = self.popen(cmd, stdout=PIPE)
r.fetch(timeout=1) # wait until receiver is actually ready
s.acknowledge()
+ r.close()
s.close()
return result
diff --git a/qpid/cpp/src/tests/interop_tests.py b/qpid/cpp/src/tests/interop_tests.py
new file mode 100755
index 0000000000..d5533ead21
--- /dev/null
+++ b/qpid/cpp/src/tests/interop_tests.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+A set of tests that can be run against a foreign AMQP 1.0 broker.
+
+RUNNING WITH A FOREIGN BROKER:
+
+1. Start the broker
+2. Create persistent queues named: interop-a interop-b interop-q tx-1 tx-2
+3. Export the environment variable QPID_INTEROP_URL with the URL to connect to your broker
+ in the form [user[:password]@]host[:port]
+4. From the build directory run this test:
+ ctest -VV -R interop_tests
+
+If QPID_INTEROP_URL is not set, a qpidd broker will be started for the test.
+"""
+
+import os, sys, shutil, subprocess
+import qpid_messaging as qm
+from brokertest import *
+
+URL='QPID_INTEROP_URL'
+
+class InteropTest(BrokerTest):
+
+ def setUp(self):
+ super(InteropTest, self).setUp()
+ self.url = os.environ[URL]
+ self.connect_opts = ['--broker', self.url, '--connection-options', '{protocol:amqp1.0}']
+
+ def connect(self, **kwargs):
+ """Python connection to interop URL"""
+ c = qm.Connection.establish(self.url, protocol='amqp1.0', **kwargs)
+ self.teardown_add(c)
+ return c
+
+ def drain(self, queue, connection=None):
+ """
+ Drain a queue to make sure it is empty. Throw away the messages.
+ """
+ c = connection or self.connect()
+ r = c.session().receiver(queue)
+ try:
+ while True:
+ r.fetch(timeout=0)
+ r.session.acknowledge()
+ except qm.Empty:
+ pass
+ r.close()
+
+ def clear_queue(self, queue, connection=None, properties=None, durable=False):
+ """
+ Make empty queue, prefix with self.id(). Create if needed, drain if needed
+ @return queue name.
+ """
+ queue = "interop-%s" % queue
+ c = connection or self.connect()
+ props = {'create':'always'}
+ if durable: props['node'] = {'durable':True}
+ if properties: props.update(properties)
+ self.drain("%s;%s" % (queue, props), c)
+ return queue
+
+
+class SimpleTest(InteropTest):
+ """Simple test to check the broker is responding."""
+
+ def test_send_receive_python(self):
+ c = self.connect()
+ q = self.clear_queue('q', c)
+ s = c.session()
+ s.sender(q).send('foo')
+ self.assertEqual('foo', s.receiver(q).fetch().content)
+
+ def test_send_receive_cpp(self):
+ q = self.clear_queue('q')
+ args = ['-b', self.url, '-a', q]
+ self.check_output(['qpid-send', '--content-string=cpp_foo'] + args)
+ self.assertEqual('cpp_foo', self.check_output(['qpid-receive'] + args).strip())
+
+
+class PythonTxTest(InteropTest):
+
+ def tx_simple_setup(self):
+ """Start a transaction, remove messages from queue a, add messages to queue b"""
+ c = self.connect()
+ qa, qb = self.clear_queue('a', c, durable=True), self.clear_queue('b', c, durable=True)
+
+ # Send messages to a, no transaction.
+ sa = c.session().sender(qa+";{create:always,node:{durable:true}}")
+ tx_msgs = ['x', 'y', 'z']
+ for m in tx_msgs: sa.send(qm.Message(content=m, durable=True))
+
+ # Receive messages from a, in transaction.
+ tx = c.session(transactional=True)
+ txr = tx.receiver(qa)
+ self.assertEqual(tx_msgs, [txr.fetch(1).content for i in xrange(3)])
+ tx.acknowledge()
+
+ # Send messages to b, transactional, mixed with non-transactional.
+ sb = c.session().sender(qb+";{create:always,node:{durable:true}}")
+ txs = tx.sender(qb)
+ msgs = [str(i) for i in xrange(3)]
+ for tx_m, m in zip(tx_msgs, msgs):
+ txs.send(tx_m);
+ sb.send(m)
+ tx.sync()
+ return tx, qa, qb
+
+ def test_tx_simple_commit(self):
+ tx, qa, qb = self.tx_simple_setup()
+ s = self.connect().session()
+ assert_browse(s, qa, [])
+ assert_browse(s, qb, ['0', '1', '2'])
+ tx.commit()
+ assert_browse(s, qa, [])
+ assert_browse(s, qb, ['0', '1', '2', 'x', 'y', 'z'])
+
+ def test_tx_simple_rollback(self):
+ tx, qa, qb = self.tx_simple_setup()
+ s = self.connect().session()
+ assert_browse(s, qa, [])
+ assert_browse(s, qb, ['0', '1', '2'])
+ tx.rollback()
+ assert_browse(s, qa, ['x', 'y', 'z'])
+ assert_browse(s, qb, ['0', '1', '2'])
+
+ def test_tx_sequence(self):
+ tx = self.connect().session(transactional=True)
+ notx = self.connect().session()
+ q = self.clear_queue('q', tx.connection, durable=True)
+ s = tx.sender(q)
+ r = tx.receiver(q)
+ s.send('a')
+ tx.commit()
+ assert_browse(notx, q, ['a'])
+ s.send('b')
+ tx.commit()
+ assert_browse(notx, q, ['a', 'b'])
+ self.assertEqual('a', r.fetch().content)
+ tx.acknowledge();
+ tx.commit()
+ assert_browse(notx, q, ['b'])
+ s.send('z')
+ tx.rollback()
+ assert_browse(notx, q, ['b'])
+ self.assertEqual('b', r.fetch().content)
+ tx.acknowledge();
+ tx.rollback()
+ assert_browse(notx, q, ['b'])
+
+
+class CppTxTest(InteropTest):
+
+ def test_txtest2(self):
+ self.popen(["qpid-txtest2"] + self.connect_opts).assert_exit_ok()
+
+ def test_send_receive(self):
+ q = self.clear_queue('q', durable=True)
+ sender = self.popen(["qpid-send",
+ "--address", q,
+ "--messages=100",
+ "--tx=10",
+ "--durable=yes"] + self.connect_opts)
+ receiver = self.popen(["qpid-receive",
+ "--address", q,
+ "--messages=90",
+ "--timeout=10",
+ "--tx=10"] + self.connect_opts)
+ sender.assert_exit_ok()
+ receiver.assert_exit_ok()
+ expect = [long(i) for i in range(91, 101)]
+ sn = lambda m: m.properties["sn"]
+ assert_browse(self.connect().session(), q, expect, transform=sn)
+
+
+if __name__ == "__main__":
+ if not BrokerTest.amqp_tx_supported:
+ BrokerTest.amqp_tx_warning()
+ print "Skipping interop_tests"
+ exit(0)
+ outdir = "interop_tests.tmp"
+ shutil.rmtree(outdir, True)
+ cmd = ["qpid-python-test", "-m", "interop_tests", "-DOUTDIR=%s"%outdir] + sys.argv[1:]
+ if "QPID_PORT" in os.environ: del os.environ["QPID_PORT"]
+ if os.environ.get(URL):
+ os.execvp(cmd[0], cmd)
+ else:
+ dir = os.getcwd()
+ class StartBroker(BrokerTest):
+ def start_qpidd(self): pass
+ test = StartBroker('start_qpidd')
+ class Config:
+ def __init__(self):
+ self.defines = { 'OUTDIR': outdir }
+ test.configure(Config())
+ test.setUp()
+ os.environ[URL] = test.broker().host_port()
+ os.chdir(dir)
+ p = subprocess.Popen(cmd)
+ status = p.wait()
+ test.tearDown()
+ sys.exit(status)
diff --git a/qpid/cpp/src/tests/qpid-receive.cpp b/qpid/cpp/src/tests/qpid-receive.cpp
index 05a1a6df10..a71fd11fa7 100644
--- a/qpid/cpp/src/tests/qpid-receive.cpp
+++ b/qpid/cpp/src/tests/qpid-receive.cpp
@@ -197,7 +197,7 @@ int main(int argc, char ** argv)
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
Receiver receiver = session.createReceiver(opts.address);
- receiver.setCapacity(opts.capacity);
+ receiver.setCapacity(std::min(opts.capacity, opts.messages));
Message msg;
uint count = 0;
uint txCount = 0;
@@ -207,9 +207,9 @@ int main(int argc, char ** argv)
Reporter<ThroughputAndLatency> reporter(std::cout, opts.reportEvery, opts.reportHeader);
if (!opts.readyAddress.empty()) {
session.createSender(opts.readyAddress).send(msg);
- if (opts.tx)
- session.commit();
- }
+ if (opts.tx)
+ session.commit();
+ }
// For receive rate calculation
qpid::sys::AbsTime start = qpid::sys::now();
int64_t interval = 0;
@@ -290,6 +290,7 @@ int main(int argc, char ** argv)
connection.close();
return 0;
}
+ return 1;
} catch(const std::exception& error) {
std::cerr << "qpid-receive: " << error.what() << std::endl;
connection.close();
diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp
index 498dc96ce9..970944f8d0 100644
--- a/qpid/cpp/src/tests/qpid-send.cpp
+++ b/qpid/cpp/src/tests/qpid-send.cpp
@@ -112,14 +112,14 @@ struct Options : public qpid::Options
log(argv0),
reportTotal(false),
reportEvery(0),
- reportHeader(true),
- sendRate(0),
- sequence(true),
- timestamp(true),
- groupPrefix("GROUP-"),
- groupSize(10),
- groupRandSize(false),
- groupInterleave(1)
+ reportHeader(true),
+ sendRate(0),
+ sequence(true),
+ timestamp(true),
+ groupPrefix("GROUP-"),
+ groupSize(10),
+ groupRandSize(false),
+ groupInterleave(1)
{
addOptions()
("broker,b", qpid::optValue(url, "URL"), "url of broker to connect to")
@@ -272,7 +272,7 @@ class MapContentGenerator : public ContentGenerator {
// tag each generated message with a group identifer
//
class GroupGenerator {
-public:
+ public:
GroupGenerator(const std::string& key,
const std::string& prefix,
const uint size,
@@ -351,7 +351,7 @@ int main(int argc, char ** argv)
try {
Options opts;
if (opts.parse(argc, argv)) {
- connection = Connection(opts.url, opts.connectionOptions);
+ connection = Connection(opts.url, opts.connectionOptions);
connection.open();
std::auto_ptr<FailoverUpdates> updates(opts.failoverUpdates ? new FailoverUpdates(connection) : 0);
Session session = opts.tx ? connection.createTransactionalSession() : connection.createSession();
@@ -447,6 +447,7 @@ int main(int argc, char ** argv)
connection.close();
return 0;
}
+ return 1;
} catch(const std::exception& error) {
std::cerr << "qpid-send: " << error.what() << std::endl;
connection.close();
diff --git a/qpid/cpp/src/tests/qpid-txtest2.cpp b/qpid/cpp/src/tests/qpid-txtest2.cpp
index 2393ec2396..58c48f9a8d 100644
--- a/qpid/cpp/src/tests/qpid-txtest2.cpp
+++ b/qpid/cpp/src/tests/qpid-txtest2.cpp
@@ -353,10 +353,11 @@ int main(int argc, char** argv)
if (opts.init) controller.init();
if (opts.transfer) controller.transfer();
if (opts.check) return controller.check();
+ return 0;
}
- return 0;
+ return 1;
} catch(const std::exception& e) {
- std::cout << argv[0] << ": " << e.what() << std::endl;
+ std::cerr << argv[0] << ": " << e.what() << std::endl;
}
return 2;
}
diff --git a/qpid/cpp/src/tests/swig_python_tests b/qpid/cpp/src/tests/swig_python_tests
index 4d9e5e35d4..40c35ac0fa 100755
--- a/qpid/cpp/src/tests/swig_python_tests
+++ b/qpid/cpp/src/tests/swig_python_tests
@@ -39,7 +39,8 @@ skip() {
}
start_broker() {
- QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no) || fail "Could not start broker"
+ rm -f swig_python_tests.log
+ QPID_PORT=$($QPIDD_EXEC --daemon --port 0 --interface 127.0.0.1 --no-data-dir $MODULES --auth no --log-to-file swig_python_tests.log) || fail "Could not start broker"
}
stop_broker() {
@@ -54,9 +55,9 @@ echo "Running swigged python tests using broker on port $QPID_PORT"
export PYTHONPATH=$PYTHONPATH:$PYTHONPATH_SWIG
export QPID_USE_SWIG_CLIENT=1
-$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests || FAILED=1
+$QPID_PYTHON_TEST -m qpid.tests.messaging.message -m qpid_tests.broker_0_10.priority -m qpid_tests.broker_0_10.lvq -m qpid_tests.broker_0_10.new_api -b localhost:$QPID_PORT -I $srcdir/failing-amqp0-10-python-tests $* || FAILED=1
if [[ -a $AMQP_LIB ]] ; then
- $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests || FAILED=1
+ $QPID_PYTHON_TEST --define="protocol_version=amqp1.0" -m qpid_tests.broker_1_0 -m qpid_tests.broker_0_10.new_api -m assertions -m reject_release -m misc -m policies -b localhost:$QPID_PORT -I $srcdir/failing-amqp1.0-python-tests $* || FAILED=1
fi
stop_broker
if [[ $FAILED -eq 1 ]]; then
diff --git a/qpid/cpp/src/tests/test_env.sh.in b/qpid/cpp/src/tests/test_env.sh.in
index 96f1596890..1c4c117e4b 100644
--- a/qpid/cpp/src/tests/test_env.sh.in
+++ b/qpid/cpp/src/tests/test_env.sh.in
@@ -20,14 +20,14 @@
absdir() { echo `cd $1 && pwd`; }
# Environment variables substituted by cmake.
-srcdir=`absdir @abs_srcdir@`
-builddir=`absdir @abs_builddir@`
-top_srcdir=`absdir @abs_top_srcdir@`
-top_builddir=`absdir @abs_top_builddir@`
-moduledir=$top_builddir/src@builddir_lib_suffix@
-pythonswigdir=$top_builddir/bindings/qpid/python/
-pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
-testmoduledir=$builddir@builddir_lib_suffix@
+export srcdir=`absdir @abs_srcdir@`
+export builddir=`absdir @abs_builddir@`
+export top_srcdir=`absdir @abs_top_srcdir@`
+export top_builddir=`absdir @abs_top_builddir@`
+export moduledir=$top_builddir/src@builddir_lib_suffix@
+export pythonswigdir=$top_builddir/bindings/qpid/python/
+export pythonswiglibdir=$top_builddir/bindings/qpid/python@builddir_lib_suffix@
+export testmoduledir=$builddir@builddir_lib_suffix@
export QPID_INSTALL_PREFIX=@prefix@
# Tools substituted by cmake
diff --git a/qpid/cpp/src/tests/test_store.cpp b/qpid/cpp/src/tests/test_store.cpp
index ee04dddd6a..14aee7b648 100644
--- a/qpid/cpp/src/tests/test_store.cpp
+++ b/qpid/cpp/src/tests/test_store.cpp
@@ -223,27 +223,18 @@ class TestStore : public NullMessageStore {
const boost::intrusive_ptr<PersistableMessage>& pmsg,
const PersistableQueue& queue)
{
- qpid::broker::amqp_0_10::MessageTransfer* msg =
- dynamic_cast<qpid::broker::amqp_0_10::MessageTransfer*>(pmsg.get());
- assert(msg);
-
ostringstream o;
- o << "<enqueue " << queue.getName() << " " << getContent(msg);
+ string data = getContent(pmsg);
+ o << "<enqueue " << queue.getName() << " " << data;
if (tx) o << " tx=" << getId(*tx);
o << ">";
log(o.str());
// Dump the message if there is a dump file.
if (dump.get()) {
- msg->getFrames().getMethod()->print(*dump);
- *dump << endl << " ";
- msg->getFrames().getHeaders()->print(*dump);
- *dump << endl << " ";
- *dump << msg->getFrames().getContentSize() << endl;
+ *dump << "Message(" << data.size() << "): " << data << endl;
}
string logPrefix = "TestStore "+name+": ";
- // Check the message for special instructions for this store.
- string data = msg->getFrames().getContent();
Action action(data);
bool doComplete = true;
if (action.index && action.executeIn(name)) {
@@ -258,7 +249,7 @@ class TestStore : public NullMessageStore {
QPID_LOG(error, logPrefix << "async-id needs argument: " << data);
break;
}
- asyncIds[action.args[0]] = msg;
+ asyncIds[action.args[0]] = pmsg;
QPID_LOG(debug, logPrefix << "delayed completion " << action.args[0]);
doComplete = false;
break;
@@ -284,7 +275,7 @@ class TestStore : public NullMessageStore {
QPID_LOG(error, logPrefix << "unknown action: " << data);
}
}
- if (doComplete) msg->enqueueComplete();
+ if (doComplete) pmsg->enqueueComplete();
}
void dequeue(TransactionContext* tx,