From 62ea3272e4e7a6d0acf338820068b816418b826e Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Fri, 5 Feb 2010 15:27:54 +0000 Subject: moved tests/*.py underneath qpid/tests/*.py git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@906969 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/python/qpid-python-test | 4 +- qpid/python/qpid/tests/__init__.py | 4 + qpid/python/qpid/tests/codec.py | 601 +++++++++++++++++++++++++++++++++++ qpid/python/qpid/tests/codec010.py | 133 ++++++++ qpid/python/qpid/tests/connection.py | 222 +++++++++++++ qpid/python/qpid/tests/datatypes.py | 296 +++++++++++++++++ qpid/python/qpid/tests/queue.py | 71 +++++ qpid/python/qpid/tests/spec010.py | 74 +++++ qpid/python/tests/__init__.py | 22 -- qpid/python/tests/codec.py | 601 ----------------------------------- qpid/python/tests/codec010.py | 133 -------- qpid/python/tests/connection.py | 222 ------------- qpid/python/tests/datatypes.py | 296 ----------------- qpid/python/tests/queue.py | 71 ----- qpid/python/tests/spec010.py | 74 ----- 15 files changed, 1403 insertions(+), 1421 deletions(-) create mode 100644 qpid/python/qpid/tests/codec.py create mode 100644 qpid/python/qpid/tests/codec010.py create mode 100644 qpid/python/qpid/tests/connection.py create mode 100644 qpid/python/qpid/tests/datatypes.py create mode 100644 qpid/python/qpid/tests/queue.py create mode 100644 qpid/python/qpid/tests/spec010.py delete mode 100644 qpid/python/tests/__init__.py delete mode 100644 qpid/python/tests/codec.py delete mode 100644 qpid/python/tests/codec010.py delete mode 100644 qpid/python/tests/connection.py delete mode 100644 qpid/python/tests/datatypes.py delete mode 100644 qpid/python/tests/queue.py delete mode 100644 qpid/python/tests/spec010.py (limited to 'qpid/python') diff --git a/qpid/python/qpid-python-test b/qpid/python/qpid-python-test index 5aedcb68ea..f7955a4a9d 100755 --- a/qpid/python/qpid-python-test +++ b/qpid/python/qpid-python-test @@ -107,7 +107,7 @@ if not includes: if opts.modules: includes.append("*") else: - includes.extend(["qpid.tests.*", "tests.*"]) + includes.extend(["qpid.tests.*"]) def is_ignored(path): for p in excludes: @@ -512,7 +512,7 @@ class Harness: modules = opts.modules if not modules: - modules.extend(["qpid.tests", "tests"]) + modules.extend(["qpid.tests"]) h = Harness() for name in modules: m = __import__(name, None, None, ["dummy"]) diff --git a/qpid/python/qpid/tests/__init__.py b/qpid/python/qpid/tests/__init__.py index 2f0fcfdf67..039214ca42 100644 --- a/qpid/python/qpid/tests/__init__.py +++ b/qpid/python/qpid/tests/__init__.py @@ -25,4 +25,8 @@ class Test: def configure(self, config): self.config = config +# API Tests import address, framing, mimetype, messaging + +# Legacy Tests +import codec, queue, datatypes, connection, spec010, codec010 diff --git a/qpid/python/qpid/tests/codec.py b/qpid/python/qpid/tests/codec.py new file mode 100644 index 0000000000..9b51b4713c --- /dev/null +++ b/qpid/python/qpid/tests/codec.py @@ -0,0 +1,601 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import unittest +from qpid.codec import Codec +from qpid.spec import load +from cStringIO import StringIO +from qpid.reference import ReferenceId + +__doc__ = """ + + This is a unit test script for qpid/codec.py + + It can be run standalone or as part of the existing test framework. + + To run standalone: + ------------------- + + Place in the qpid/python/tests/ directory and type... + + python codec.py + + A brief output will be printed on screen. The verbose output will be placed inn a file called + codec_unit_test_output.txt. [TODO: make this filename configurable] + + To run as part of the existing test framework: + ----------------------------------------------- + + python run-tests tests.codec + + Change History: + ----------------- + Jimmy John 05/19/2007 Initial draft + Jimmy John 05/22/2007 Implemented comments by Rafael Schloming + + +""" + +from qpid_config import amqp_spec_0_8 +SPEC = load(amqp_spec_0_8) + +# -------------------------------------- +# -------------------------------------- +class BaseDataTypes(unittest.TestCase): + + + """ + Base class containing common functions + """ + + # --------------- + def setUp(self): + """ + standard setUp for unitetest (refer unittest documentation for details) + """ + self.codec = Codec(StringIO(), SPEC) + + # ------------------ + def tearDown(self): + """ + standard tearDown for unitetest (refer unittest documentation for details) + """ + self.codec.stream.flush() + self.codec.stream.close() + + # ---------------------------------------- + def callFunc(self, functionName, *args): + """ + helper function - given a function name and arguments, calls the function with the args and + returns the contents of the stream + """ + getattr(self.codec, functionName)(args[0]) + return self.codec.stream.getvalue() + + # ---------------------------------------- + def readFunc(self, functionName, *args): + """ + helper function - creates a input stream and then calls the function with arguments as have been + supplied + """ + self.codec.stream = StringIO(args[0]) + return getattr(self.codec, functionName)() + + +# ---------------------------------------- +# ---------------------------------------- +class IntegerTestCase(BaseDataTypes): + + """ + Handles octet, short, long, long long + + """ + + # ------------------------- + def __init__(self, *args): + """ + sets constants for use in tests + """ + + BaseDataTypes.__init__(self, *args) + self.const_integer = 2 + self.const_integer_octet_encoded = '\x02' + self.const_integer_short_encoded = '\x00\x02' + self.const_integer_long_encoded = '\x00\x00\x00\x02' + self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' + + # -------------------------- # + # Unsigned Octect - 8 bits # + # -------------------------- # + + # -------------------------- + def test_unsigned_octet(self): + """ + ubyte format requires 0<=number<=255 + """ + self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') + + # ------------------------------------------- + def test_octet_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_octet, 256) + + # ------------------------------------------- + def test_uoctet_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_octet, -1) + + # --------------------------------- + def test_uoctet_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') + + # ------------------------------------ + def test_unsigned_octet_decode(self): + """ + octet decoding + """ + self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') + + # ----------------------------------- # + # Unsigned Short Integers - 16 bits # + # ----------------------------------- # + + # ----------------------- + def test_ushort_int(self): + """ + testing unsigned short integer + """ + self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') + + # ------------------------------------------- + def test_ushort_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_short, 65536) + + # ------------------------------------------- + def test_ushort_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_short, -1) + + # --------------------------------- + def test_ushort_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') + + # ------------------------------------ + def test_ushort_int_decode(self): + """ + unsigned short decoding + """ + self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') + + + # ---------------------------------- # + # Unsigned Long Integers - 32 bits # + # ---------------------------------- # + + # ----------------------- + def test_ulong_int(self): + """ + testing unsigned long iteger + """ + self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') + + # ------------------------------------------- + def test_ulong_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) + + # ------------------------------------------- + def test_ulong_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_long, -1) + + # --------------------------------- + def test_ulong_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') + + # ------------------------------- + def test_ulong_int_decode(self): + """ + unsigned long decoding + """ + self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') + + + # --------------------------------------- # + # Unsigned Long Long Integers - 64 bits # + # --------------------------------------- # + + # ----------------------- + def test_ulong_long_int(self): + """ + testing unsinged long long integer + """ + self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') + + # ------------------------------------------- + def test_ulong_long_int_out_of_upper_range(self): + """ + testing for input above acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) + + # ------------------------------------------- + def test_ulong_long_int_out_of_lower_range(self): + """ + testing for input below acceptable range + """ + self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) + + # --------------------------------- + def test_ulong_long_int_with_fraction(self): + """ + the fractional part should be ignored... + """ + self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') + + # ------------------------------------ + def test_ulong_long_int_decode(self): + """ + unsigned long long decoding + """ + self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') + +# ----------------------------------- +# ----------------------------------- +class BitTestCase(BaseDataTypes): + + """ + Handles bits + """ + + # ---------------------------------------------- + def callFunc(self, functionName, *args): + """ + helper function + """ + for ele in args: + getattr(self.codec, functionName)(ele) + + self.codec.flush() + return self.codec.stream.getvalue() + + # ------------------- + def test_bit1(self): + """ + sends in 11 + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') + + # ------------------- + def test_bit2(self): + """ + sends in 10011 + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') + + # ------------------- + def test_bit3(self): + """ + sends in 1110100111 [10 bits(right to left), should be compressed into two octets] + """ + self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') + + # ------------------------------------ + def test_bit_decode_1(self): + """ + decode bit 1 + """ + self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') + + # ------------------------------------ + def test_bit_decode_0(self): + """ + decode bit 0 + """ + self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') + +# ----------------------------------- +# ----------------------------------- +class StringTestCase(BaseDataTypes): + + """ + Handles short strings, long strings + """ + + # ------------------------------------------------------------- # + # Short Strings - 8 bit length followed by zero or more octets # + # ------------------------------------------------------------- # + + # --------------------------------------- + def test_short_string_zero_length(self): + """ + 0 length short string + """ + self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') + + # ------------------------------------------- + def test_short_string_positive_length(self): + """ + positive length short string + """ + self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') + + # ------------------------------------------- + def test_short_string_out_of_upper_range(self): + """ + string length > 255 + """ + self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) + + # ------------------------------------ + def test_short_string_decode(self): + """ + short string decode + """ + self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') + + + # ------------------------------------------------------------- # + # Long Strings - 32 bit length followed by zero or more octets # + # ------------------------------------------------------------- # + + # --------------------------------------- + def test_long_string_zero_length(self): + """ + 0 length long string + """ + self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') + + # ------------------------------------------- + def test_long_string_positive_length(self): + """ + positive length long string + """ + self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') + + # ------------------------------------ + def test_long_string_decode(self): + """ + long string decode + """ + self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') + + +# -------------------------------------- +# -------------------------------------- +class TimestampTestCase(BaseDataTypes): + + """ + No need of any test cases here as timestamps are implemented as long long which is tested above + """ + pass + +# --------------------------------------- +# --------------------------------------- +class FieldTableTestCase(BaseDataTypes): + + """ + Handles Field Tables + + Only S/I type messages seem to be implemented currently + """ + + # ------------------------- + def __init__(self, *args): + """ + sets constants for use in tests + """ + + BaseDataTypes.__init__(self, *args) + self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} + self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' + + # ------------------------------------------- + def test_field_table_name_value_pair(self): + """ + valid name value pair + """ + self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') + + # --------------------------------------------------- + def test_field_table_multiple_name_value_pair(self): + """ + multiple name value pair + """ + self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') + + # ------------------------------------ + def test_field_table_decode(self): + """ + field table decode + """ + self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') + + +# ------------------------------------ +# ------------------------------------ +class ContentTestCase(BaseDataTypes): + + """ + Handles Content data types + """ + + # ----------------------------- + def test_content_inline(self): + """ + inline content + """ + self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') + + # -------------------------------- + def test_content_reference(self): + """ + reference content + """ + self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') + + # ------------------------------------ + def test_content_inline_decode(self): + """ + inline content decode + """ + self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') + + # ------------------------------------ + def test_content_reference_decode(self): + """ + reference content decode + """ + self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') + +# ------------------------ # +# Pre - existing test code # +# ------------------------ # + +# --------------------- +def test(type, value): + """ + old test function cut/copy/paste from qpid/codec.py + """ + if isinstance(value, (list, tuple)): + values = value + else: + values = [value] + stream = StringIO() + codec = Codec(stream, SPEC) + for v in values: + codec.encode(type, v) + codec.flush() + enc = stream.getvalue() + stream.reset() + dup = [] + for i in xrange(len(values)): + dup.append(codec.decode(type)) + if values != dup: + raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) + +# ----------------------- +def dotest(type, value): + """ + old test function cut/copy/paste from qpid/codec.py + """ + args = (type, value) + test(*args) + +# ------------- +def oldtests(): + """ + old test function cut/copy/paste from qpid/codec.py + """ + for value in ("1", "0", "110", "011", "11001", "10101", "10011"): + for i in range(10): + dotest("bit", map(lambda x: x == "1", value*i)) + + for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): + dotest("table", value) + + for type in ("octet", "short", "long", "longlong"): + for value in range(0, 256): + dotest(type, value) + + for type in ("shortstr", "longstr"): + for value in ("", "a", "asdf"): + dotest(type, value) + +# ----------------------------------------- +class oldTests(unittest.TestCase): + + """ + class to handle pre-existing test cases + """ + + # --------------------------- + def test_oldtestcases(self): + """ + call the old tests + """ + return oldtests() + +# --------------------------- +# --------------------------- +if __name__ == '__main__': + + codec_test_suite = unittest.TestSuite() + + #adding all the test suites... + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) + + #loading pre-existing test case from qpid/codec.py + codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) + + run_output_stream = StringIO() + test_runner = unittest.TextTestRunner(run_output_stream, '', '') + test_result = test_runner.run(codec_test_suite) + + print '\n%d test run...' % (test_result.testsRun) + + if test_result.wasSuccessful(): + print '\nAll tests successful\n' + + if test_result.failures: + print '\n----------' + print '%d FAILURES:' % (len(test_result.failures)) + print '----------\n' + for failure in test_result.failures: + print str(failure[0]) + ' ... FAIL' + + if test_result.errors: + print '\n---------' + print '%d ERRORS:' % (len(test_result.errors)) + print '---------\n' + + for error in test_result.errors: + print str(error[0]) + ' ... ERROR' + + f = open('codec_unit_test_output.txt', 'w') + f.write(str(run_output_stream.getvalue())) + f.close() diff --git a/qpid/python/qpid/tests/codec010.py b/qpid/python/qpid/tests/codec010.py new file mode 100644 index 0000000000..787ebc146f --- /dev/null +++ b/qpid/python/qpid/tests/codec010.py @@ -0,0 +1,133 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import time + +from unittest import TestCase +from qpid.codec010 import StringCodec +from qpid.datatypes import timestamp, uuid4 +from qpid.ops import PRIMITIVE + +class CodecTest(TestCase): + + def check(self, type, value, compare=True): + t = PRIMITIVE[type] + sc = StringCodec() + sc.write_primitive(t, value) + decoded = sc.read_primitive(t) + if compare: + assert decoded == value, "%s, %s" % (decoded, value) + return decoded + + def testMapString(self): + self.check("map", {"string": "this is a test"}) + + def testMapUnicode(self): + self.check("map", {"unicode": u"this is a unicode test"}) + + def testMapBinary(self): + self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"}) + + def testMapBuffer(self): + s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5" + dec = self.check("map", {"buffer": buffer(s)}, False) + assert dec["buffer"] == s + + def testMapInt(self): + self.check("map", {"int": 3}) + + def testMapLong(self): + self.check("map", {"long": 2**32}) + self.check("map", {"long": 1 << 34}) + self.check("map", {"long": -(1 << 34)}) + + def testMapTimestamp(self): + decoded = self.check("map", {"timestamp": timestamp(0)}) + assert isinstance(decoded["timestamp"], timestamp) + + def testMapDatetime(self): + decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False) + assert isinstance(decoded["datetime"], timestamp) + assert decoded["datetime"] == 0.0 + + def testMapNone(self): + self.check("map", {"none": None}) + + def testMapNested(self): + self.check("map", {"map": {"string": "nested test"}}) + + def testMapList(self): + self.check("map", {"list": [1, "two", 3.0, -4]}) + + def testMapUUID(self): + self.check("map", {"uuid": uuid4()}) + + def testMapAll(self): + decoded = self.check("map", {"string": "this is a test", + "unicode": u"this is a unicode test", + "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5", + "int": 3, + "long": 2**32, + "timestamp": timestamp(0), + "none": None, + "map": {"string": "nested map"}, + "list": [1, "two", 3.0, -4], + "uuid": uuid4()}) + assert isinstance(decoded["timestamp"], timestamp) + + def testMapEmpty(self): + self.check("map", {}) + + def testMapNone(self): + self.check("map", None) + + def testList(self): + self.check("list", [1, "two", 3.0, -4]) + + def testListEmpty(self): + self.check("list", []) + + def testListNone(self): + self.check("list", None) + + def testArrayInt(self): + self.check("array", [1, 2, 3, 4]) + + def testArrayString(self): + self.check("array", ["one", "two", "three", "four"]) + + def testArrayEmpty(self): + self.check("array", []) + + def testArrayNone(self): + self.check("array", None) + + def testInt16(self): + self.check("int16", 3) + self.check("int16", -3) + + def testInt64(self): + self.check("int64", 3) + self.check("int64", -3) + self.check("int64", 1<<34) + self.check("int64", -(1<<34)) + + def testDatetime(self): + self.check("datetime", timestamp(0)) + self.check("datetime", timestamp(long(time.time()))) diff --git a/qpid/python/qpid/tests/connection.py b/qpid/python/qpid/tests/connection.py new file mode 100644 index 0000000000..8c00df56e1 --- /dev/null +++ b/qpid/python/qpid/tests/connection.py @@ -0,0 +1,222 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from threading import * +from unittest import TestCase +from qpid.util import connect, listen +from qpid.connection import * +from qpid.datatypes import Message +from qpid.delegates import Server +from qpid.queue import Queue +from qpid.session import Delegate +from qpid.ops import QueueQueryResult + +PORT = 1234 + +class TestServer: + + def __init__(self, queue): + self.queue = queue + + def connection(self, connection): + return Server(connection, delegate=self.session) + + def session(self, session): + session.auto_sync = False + return TestSession(session, self.queue) + +class TestSession(Delegate): + + def __init__(self, session, queue): + self.session = session + self.queue = queue + + def execution_sync(self, es): + pass + + def queue_query(self, qq): + return QueueQueryResult(qq.queue) + + def message_transfer(self, cmd): + if cmd.destination == "echo": + m = Message(cmd.payload) + m.headers = cmd.headers + self.session.message_transfer(cmd.destination, cmd.accept_mode, + cmd.acquire_mode, m) + elif cmd.destination == "abort": + self.session.channel.connection.sock.close() + elif cmd.destination == "heartbeat": + self.session.channel.connection_heartbeat() + else: + self.queue.put(cmd) + +class ConnectionTest(TestCase): + + def setUp(self): + self.queue = Queue() + self.running = True + started = Event() + + def run(): + ts = TestServer(self.queue) + for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): + conn = Connection(s, delegate=ts.connection) + try: + conn.start(5) + except Closed: + pass + + self.server = Thread(target=run) + self.server.setDaemon(True) + self.server.start() + + started.wait(3) + assert started.isSet() + + def tearDown(self): + self.running = False + connect("127.0.0.1", PORT).close() + self.server.join(3) + + def connect(self, **kwargs): + return Connection(connect("127.0.0.1", PORT), **kwargs) + + def test(self): + c = self.connect() + c.start(10) + + ssn1 = c.session("test1", timeout=10) + ssn2 = c.session("test2", timeout=10) + + assert ssn1 == c.sessions["test1"] + assert ssn2 == c.sessions["test2"] + assert ssn1.channel != None + assert ssn2.channel != None + assert ssn1 in c.attached.values() + assert ssn2 in c.attached.values() + + ssn1.close(5) + + assert ssn1.channel == None + assert ssn1 not in c.attached.values() + assert ssn2 in c.sessions.values() + + ssn2.close(5) + + assert ssn2.channel == None + assert ssn2 not in c.attached.values() + assert ssn2 not in c.sessions.values() + + ssn = c.session("session", timeout=10) + + assert ssn.channel != None + assert ssn in c.sessions.values() + + destinations = ("one", "two", "three") + + for d in destinations: + ssn.message_transfer(d) + + for d in destinations: + cmd = self.queue.get(10) + assert cmd.destination == d + assert cmd.headers == None + assert cmd.payload == None + + msg = Message("this is a test") + ssn.message_transfer("four", message=msg) + cmd = self.queue.get(10) + assert cmd.destination == "four" + assert cmd.headers == None + assert cmd.payload == msg.body + + qq = ssn.queue_query("asdf") + assert qq.queue == "asdf" + c.close(5) + + def testCloseGet(self): + c = self.connect() + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + for i in range(10): + m = echos.get(timeout=10) + assert m.body == "test%d" % i + + try: + m = echos.get(timeout=10) + assert False + except Closed, e: + pass + + def testCloseListen(self): + c = self.connect() + c.start(10) + ssn = c.session("test", timeout=10) + echos = ssn.incoming("echo") + + messages = [] + exceptions = [] + condition = Condition() + def listener(m): messages.append(m) + def exc_listener(e): + exceptions.append(e) + condition.acquire() + condition.notify() + condition.release() + + echos.listen(listener, exc_listener) + + for i in range(10): + ssn.message_transfer("echo", message=Message("test%d" % i)) + + ssn.auto_sync=False + ssn.message_transfer("abort") + + condition.acquire() + condition.wait(10) + condition.release() + + for i in range(10): + m = messages.pop(0) + assert m.body == "test%d" % i + + assert len(exceptions) == 1 + + def testSync(self): + c = self.connect() + c.start(10) + s = c.session("test") + s.auto_sync = False + s.message_transfer("echo", message=Message("test")) + s.sync(10) + + def testHeartbeat(self): + c = self.connect(heartbeat=10) + c.start(10) + s = c.session("test") + s.channel.connection_heartbeat() + s.message_transfer("heartbeat") diff --git a/qpid/python/qpid/tests/datatypes.py b/qpid/python/qpid/tests/datatypes.py new file mode 100644 index 0000000000..00e649d6cf --- /dev/null +++ b/qpid/python/qpid/tests/datatypes.py @@ -0,0 +1,296 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from unittest import TestCase +from qpid.datatypes import * +from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties + +class SerialTest(TestCase): + + def test(self): + for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)): + assert s + 1 > s + assert s - 1 < s + assert s < s + 1 + assert s > s - 1 + + assert serial(0xFFFFFFFFL) + 1 == serial(0) + + assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL) + assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0) + + def testIncr(self): + s = serial(0) + s += 1 + assert s == serial(1) + + def testIn(self): + l = [serial(1), serial(2), serial(3), serial(4)] + assert serial(1) in l + assert serial(0xFFFFFFFFL + 2) in l + assert 4 in l + + def testNone(self): + assert serial(0) != None + + def testHash(self): + d = {} + d[serial(0)] = "zero" + assert d[0] == "zero" + + def testAdd(self): + assert serial(2) + 2 == serial(4) + assert serial(2) + 2 == 4 + + def testSub(self): + delta = serial(4) - serial(2) + assert isinstance(delta, int) or isinstance(delta, long) + assert delta == 2 + + delta = serial(4) - 2 + assert isinstance(delta, Serial) + assert delta == serial(2) + +class RangedSetTest(TestCase): + + def check(self, ranges): + posts = [] + for range in ranges: + posts.append(range.lower) + posts.append(range.upper) + + sorted = posts[:] + sorted.sort() + + assert posts == sorted + + idx = 1 + while idx + 1 < len(posts): + assert posts[idx] + 1 != posts[idx+1] + idx += 2 + + def test(self): + rs = RangedSet() + + self.check(rs.ranges) + + rs.add(1) + + assert 1 in rs + assert 2 not in rs + assert 0 not in rs + self.check(rs.ranges) + + rs.add(2) + + assert 0 not in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + self.check(rs.ranges) + + rs.add(0) + + assert -1 not in rs + assert 0 in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + self.check(rs.ranges) + + rs.add(37) + + assert -1 not in rs + assert 0 in rs + assert 1 in rs + assert 2 in rs + assert 3 not in rs + assert 36 not in rs + assert 37 in rs + assert 38 not in rs + self.check(rs.ranges) + + rs.add(-1) + self.check(rs.ranges) + + rs.add(-3) + self.check(rs.ranges) + + rs.add(1, 20) + assert 21 not in rs + assert 20 in rs + self.check(rs.ranges) + + def testAddSelf(self): + a = RangedSet() + a.add(0, 8) + self.check(a.ranges) + a.add(0, 8) + self.check(a.ranges) + assert len(a.ranges) == 1 + range = a.ranges[0] + assert range.lower == 0 + assert range.upper == 8 + + def testEmpty(self): + s = RangedSet() + assert s.empty() + s.add(0, -1) + assert s.empty() + s.add(0, 0) + assert not s.empty() + + def testMinMax(self): + s = RangedSet() + assert s.max() is None + assert s.min() is None + s.add(0, 10) + assert s.max() == 10 + assert s.min() == 0 + s.add(0, 5) + assert s.max() == 10 + assert s.min() == 0 + s.add(0, 11) + assert s.max() == 11 + assert s.min() == 0 + s.add(15, 20) + assert s.max() == 20 + assert s.min() == 0 + s.add(-10, -5) + assert s.max() == 20 + assert s.min() == -10 + +class RangeTest(TestCase): + + def testIntersect1(self): + a = Range(0, 10) + b = Range(9, 20) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert i1.upper == 10 + assert i2.upper == 10 + assert i1.lower == 9 + assert i2.lower == 9 + + def testIntersect2(self): + a = Range(0, 10) + b = Range(11, 20) + assert a.intersect(b) == None + assert b.intersect(a) == None + + def testIntersect3(self): + a = Range(0, 10) + b = Range(3, 5) + i1 = a.intersect(b) + i2 = b.intersect(a) + assert i1.upper == 5 + assert i2.upper == 5 + assert i1.lower == 3 + assert i2.lower == 3 + +class UUIDTest(TestCase): + + def test(self): + # this test is kind of lame, but it does excercise the basic + # functionality of the class + u = uuid4() + for i in xrange(1024): + assert u != uuid4() + +class MessageTest(TestCase): + + def setUp(self): + self.mp = MessageProperties() + self.dp = DeliveryProperties() + self.fp = FragmentProperties() + + def testHas(self): + m = Message(self.mp, self.dp, self.fp, "body") + assert m.has("message_properties") + assert m.has("delivery_properties") + assert m.has("fragment_properties") + + def testGet(self): + m = Message(self.mp, self.dp, self.fp, "body") + assert m.get("message_properties") == self.mp + assert m.get("delivery_properties") == self.dp + assert m.get("fragment_properties") == self.fp + + def testSet(self): + m = Message(self.mp, self.dp, "body") + assert m.get("fragment_properties") is None + m.set(self.fp) + assert m.get("fragment_properties") == self.fp + + def testSetOnEmpty(self): + m = Message("body") + assert m.get("delivery_properties") is None + m.set(self.dp) + assert m.get("delivery_properties") == self.dp + + def testSetReplace(self): + m = Message(self.mp, self.dp, self.fp, "body") + dp = DeliveryProperties() + assert m.get("delivery_properties") == self.dp + assert m.get("delivery_properties") != dp + m.set(dp) + assert m.get("delivery_properties") != self.dp + assert m.get("delivery_properties") == dp + + def testClear(self): + m = Message(self.mp, self.dp, self.fp, "body") + assert m.get("message_properties") == self.mp + assert m.get("delivery_properties") == self.dp + assert m.get("fragment_properties") == self.fp + m.clear("fragment_properties") + assert m.get("fragment_properties") is None + assert m.get("message_properties") == self.mp + assert m.get("delivery_properties") == self.dp + +class TimestampTest(TestCase): + + def check(self, expected, *values): + for v in values: + assert isinstance(v, timestamp) + assert v == expected + assert v == timestamp(expected) + + def testAdd(self): + self.check(4.0, + timestamp(2.0) + 2.0, + 2.0 + timestamp(2.0)) + + def testSub(self): + self.check(2.0, + timestamp(4.0) - 2.0, + 4.0 - timestamp(2.0)) + + def testNeg(self): + self.check(-4.0, -timestamp(4.0)) + + def testPos(self): + self.check(+4.0, +timestamp(4.0)) + + def testAbs(self): + self.check(4.0, abs(timestamp(-4.0))) + + def testConversion(self): + dt = timestamp(0).datetime() + t = timestamp(dt) + assert t == 0 diff --git a/qpid/python/qpid/tests/queue.py b/qpid/python/qpid/tests/queue.py new file mode 100644 index 0000000000..e12354eb43 --- /dev/null +++ b/qpid/python/qpid/tests/queue.py @@ -0,0 +1,71 @@ +# Do not delete - marks this directory as a python package. + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import threading, time +from unittest import TestCase +from qpid.queue import Queue, Empty, Closed + + +class QueueTest (TestCase): + + # The qpid queue class just provides sime simple extensions to + # python's standard queue data structure, so we don't need to test + # all the queue functionality. + + def test_listen(self): + values = [] + heard = threading.Event() + def listener(x): + values.append(x) + heard.set() + + q = Queue(0) + q.listen(listener) + heard.clear() + q.put(1) + heard.wait() + assert values[-1] == 1 + heard.clear() + q.put(2) + heard.wait() + assert values[-1] == 2 + + q.listen(None) + q.put(3) + assert q.get(3) == 3 + q.listen(listener) + + heard.clear() + q.put(4) + heard.wait() + assert values[-1] == 4 + + def test_close(self): + q = Queue(0) + q.put(1); q.put(2); q.put(3); q.close() + assert q.get() == 1 + assert q.get() == 2 + assert q.get() == 3 + for i in range(10): + try: + q.get() + raise AssertionError("expected Closed") + except Closed: + pass diff --git a/qpid/python/qpid/tests/spec010.py b/qpid/python/qpid/tests/spec010.py new file mode 100644 index 0000000000..ac04e1ee02 --- /dev/null +++ b/qpid/python/qpid/tests/spec010.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import os, tempfile, shutil, stat +from unittest import TestCase +from qpid.codec010 import Codec, StringCodec +from qpid.ops import * + +class SpecTest(TestCase): + + def testSessionHeader(self): + sc = StringCodec() + sc.write_compound(Header(sync=True)) + assert sc.encoded == "\x01\x01" + + sc = StringCodec() + sc.write_compound(Header(sync=False)) + assert sc.encoded == "\x01\x00" + + def encdec(self, value): + sc = StringCodec() + sc.write_compound(value) + decoded = sc.read_compound(value.__class__) + return decoded + + def testMessageProperties(self): + props = MessageProperties(content_length=3735928559L, + reply_to=ReplyTo(exchange="the exchange name", + routing_key="the routing key")) + dec = self.encdec(props) + assert props.content_length == dec.content_length + assert props.reply_to.exchange == dec.reply_to.exchange + assert props.reply_to.routing_key == dec.reply_to.routing_key + + def testMessageSubscribe(self): + cmd = MessageSubscribe(exclusive=True, destination="this is a test") + dec = self.encdec(cmd) + assert cmd.exclusive == dec.exclusive + assert cmd.destination == dec.destination + + def testXid(self): + sc = StringCodec() + xid = Xid(format=0, global_id="gid", branch_id="bid") + sc.write_compound(xid) + assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' + dec = sc.read_compound(Xid) + assert xid.__dict__ == dec.__dict__ + +# def testLoadReadOnly(self): +# spec = "amqp.0-10-qpid-errata.xml" +# f = testrunner.get_spec_file(spec) +# dest = tempfile.mkdtemp() +# shutil.copy(f, dest) +# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) +# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) +# fname = os.path.join(dest, spec) +# load(fname) +# assert not os.path.exists("%s.pcl" % fname) diff --git a/qpid/python/tests/__init__.py b/qpid/python/tests/__init__.py deleted file mode 100644 index 1e495f3af3..0000000000 --- a/qpid/python/tests/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import codec, queue, datatypes, connection, spec010, codec010 diff --git a/qpid/python/tests/codec.py b/qpid/python/tests/codec.py deleted file mode 100644 index 9b51b4713c..0000000000 --- a/qpid/python/tests/codec.py +++ /dev/null @@ -1,601 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import unittest -from qpid.codec import Codec -from qpid.spec import load -from cStringIO import StringIO -from qpid.reference import ReferenceId - -__doc__ = """ - - This is a unit test script for qpid/codec.py - - It can be run standalone or as part of the existing test framework. - - To run standalone: - ------------------- - - Place in the qpid/python/tests/ directory and type... - - python codec.py - - A brief output will be printed on screen. The verbose output will be placed inn a file called - codec_unit_test_output.txt. [TODO: make this filename configurable] - - To run as part of the existing test framework: - ----------------------------------------------- - - python run-tests tests.codec - - Change History: - ----------------- - Jimmy John 05/19/2007 Initial draft - Jimmy John 05/22/2007 Implemented comments by Rafael Schloming - - -""" - -from qpid_config import amqp_spec_0_8 -SPEC = load(amqp_spec_0_8) - -# -------------------------------------- -# -------------------------------------- -class BaseDataTypes(unittest.TestCase): - - - """ - Base class containing common functions - """ - - # --------------- - def setUp(self): - """ - standard setUp for unitetest (refer unittest documentation for details) - """ - self.codec = Codec(StringIO(), SPEC) - - # ------------------ - def tearDown(self): - """ - standard tearDown for unitetest (refer unittest documentation for details) - """ - self.codec.stream.flush() - self.codec.stream.close() - - # ---------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - given a function name and arguments, calls the function with the args and - returns the contents of the stream - """ - getattr(self.codec, functionName)(args[0]) - return self.codec.stream.getvalue() - - # ---------------------------------------- - def readFunc(self, functionName, *args): - """ - helper function - creates a input stream and then calls the function with arguments as have been - supplied - """ - self.codec.stream = StringIO(args[0]) - return getattr(self.codec, functionName)() - - -# ---------------------------------------- -# ---------------------------------------- -class IntegerTestCase(BaseDataTypes): - - """ - Handles octet, short, long, long long - - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_integer = 2 - self.const_integer_octet_encoded = '\x02' - self.const_integer_short_encoded = '\x00\x02' - self.const_integer_long_encoded = '\x00\x00\x00\x02' - self.const_integer_long_long_encoded = '\x00\x00\x00\x00\x00\x00\x00\x02' - - # -------------------------- # - # Unsigned Octect - 8 bits # - # -------------------------- # - - # -------------------------- - def test_unsigned_octet(self): - """ - ubyte format requires 0<=number<=255 - """ - self.failUnlessEqual(self.callFunc('encode_octet', self.const_integer), self.const_integer_octet_encoded, 'octect encoding FAILED...') - - # ------------------------------------------- - def test_octet_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, 256) - - # ------------------------------------------- - def test_uoctet_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_octet, -1) - - # --------------------------------- - def test_uoctet_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_octet', 2.5), self.const_integer_octet_encoded, 'octect encoding FAILED with fractions...') - - # ------------------------------------ - def test_unsigned_octet_decode(self): - """ - octet decoding - """ - self.failUnlessEqual(self.readFunc('decode_octet', self.const_integer_octet_encoded), self.const_integer, 'octect decoding FAILED...') - - # ----------------------------------- # - # Unsigned Short Integers - 16 bits # - # ----------------------------------- # - - # ----------------------- - def test_ushort_int(self): - """ - testing unsigned short integer - """ - self.failUnlessEqual(self.callFunc('encode_short', self.const_integer), self.const_integer_short_encoded, 'short encoding FAILED...') - - # ------------------------------------------- - def test_ushort_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, 65536) - - # ------------------------------------------- - def test_ushort_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_short, -1) - - # --------------------------------- - def test_ushort_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_short', 2.5), self.const_integer_short_encoded, 'short encoding FAILED with fractions...') - - # ------------------------------------ - def test_ushort_int_decode(self): - """ - unsigned short decoding - """ - self.failUnlessEqual(self.readFunc('decode_short', self.const_integer_short_encoded), self.const_integer, 'unsigned short decoding FAILED...') - - - # ---------------------------------- # - # Unsigned Long Integers - 32 bits # - # ---------------------------------- # - - # ----------------------- - def test_ulong_int(self): - """ - testing unsigned long iteger - """ - self.failUnlessEqual(self.callFunc('encode_long', self.const_integer), self.const_integer_long_encoded, 'long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, 4294967296) - - # ------------------------------------------- - def test_ulong_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_long, -1) - - # --------------------------------- - def test_ulong_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_long', 2.5), self.const_integer_long_encoded, 'long encoding FAILED with fractions...') - - # ------------------------------- - def test_ulong_int_decode(self): - """ - unsigned long decoding - """ - self.failUnlessEqual(self.readFunc('decode_long', self.const_integer_long_encoded), self.const_integer, 'unsigned long decoding FAILED...') - - - # --------------------------------------- # - # Unsigned Long Long Integers - 64 bits # - # --------------------------------------- # - - # ----------------------- - def test_ulong_long_int(self): - """ - testing unsinged long long integer - """ - self.failUnlessEqual(self.callFunc('encode_longlong', self.const_integer), self.const_integer_long_long_encoded, 'long long encoding FAILED...') - - # ------------------------------------------- - def test_ulong_long_int_out_of_upper_range(self): - """ - testing for input above acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, 18446744073709551616) - - # ------------------------------------------- - def test_ulong_long_int_out_of_lower_range(self): - """ - testing for input below acceptable range - """ - self.failUnlessRaises(Exception, self.codec.encode_longlong, -1) - - # --------------------------------- - def test_ulong_long_int_with_fraction(self): - """ - the fractional part should be ignored... - """ - self.failUnlessEqual(self.callFunc('encode_longlong', 2.5), self.const_integer_long_long_encoded, 'long long encoding FAILED with fractions...') - - # ------------------------------------ - def test_ulong_long_int_decode(self): - """ - unsigned long long decoding - """ - self.failUnlessEqual(self.readFunc('decode_longlong', self.const_integer_long_long_encoded), self.const_integer, 'unsigned long long decoding FAILED...') - -# ----------------------------------- -# ----------------------------------- -class BitTestCase(BaseDataTypes): - - """ - Handles bits - """ - - # ---------------------------------------------- - def callFunc(self, functionName, *args): - """ - helper function - """ - for ele in args: - getattr(self.codec, functionName)(ele) - - self.codec.flush() - return self.codec.stream.getvalue() - - # ------------------- - def test_bit1(self): - """ - sends in 11 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1), '\x03', '11 bit encoding FAILED...') - - # ------------------- - def test_bit2(self): - """ - sends in 10011 - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1, 1, 0, 0, 1), '\x13', '10011 bit encoding FAILED...') - - # ------------------- - def test_bit3(self): - """ - sends in 1110100111 [10 bits(right to left), should be compressed into two octets] - """ - self.failUnlessEqual(self.callFunc('encode_bit', 1,1,1,0,0,1,0,1,1,1), '\xa7\x03', '1110100111(right to left) bit encoding FAILED...') - - # ------------------------------------ - def test_bit_decode_1(self): - """ - decode bit 1 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x01'), 1, 'decode bit 1 FAILED...') - - # ------------------------------------ - def test_bit_decode_0(self): - """ - decode bit 0 - """ - self.failUnlessEqual(self.readFunc('decode_bit', '\x00'), 0, 'decode bit 0 FAILED...') - -# ----------------------------------- -# ----------------------------------- -class StringTestCase(BaseDataTypes): - - """ - Handles short strings, long strings - """ - - # ------------------------------------------------------------- # - # Short Strings - 8 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_short_string_zero_length(self): - """ - 0 length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', ''), '\x00', '0 length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_positive_length(self): - """ - positive length short string - """ - self.failUnlessEqual(self.callFunc('encode_shortstr', 'hello world'), '\x0bhello world', 'positive length short string encoding FAILED...') - - # ------------------------------------------- - def test_short_string_out_of_upper_range(self): - """ - string length > 255 - """ - self.failUnlessRaises(Exception, self.codec.encode_shortstr, 'x'*256) - - # ------------------------------------ - def test_short_string_decode(self): - """ - short string decode - """ - self.failUnlessEqual(self.readFunc('decode_shortstr', '\x0bhello world'), 'hello world', 'short string decode FAILED...') - - - # ------------------------------------------------------------- # - # Long Strings - 32 bit length followed by zero or more octets # - # ------------------------------------------------------------- # - - # --------------------------------------- - def test_long_string_zero_length(self): - """ - 0 length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', ''), '\x00\x00\x00\x00', '0 length long string encoding FAILED...') - - # ------------------------------------------- - def test_long_string_positive_length(self): - """ - positive length long string - """ - self.failUnlessEqual(self.callFunc('encode_longstr', 'hello world'), '\x00\x00\x00\x0bhello world', 'positive length long string encoding FAILED...') - - # ------------------------------------ - def test_long_string_decode(self): - """ - long string decode - """ - self.failUnlessEqual(self.readFunc('decode_longstr', '\x00\x00\x00\x0bhello world'), 'hello world', 'long string decode FAILED...') - - -# -------------------------------------- -# -------------------------------------- -class TimestampTestCase(BaseDataTypes): - - """ - No need of any test cases here as timestamps are implemented as long long which is tested above - """ - pass - -# --------------------------------------- -# --------------------------------------- -class FieldTableTestCase(BaseDataTypes): - - """ - Handles Field Tables - - Only S/I type messages seem to be implemented currently - """ - - # ------------------------- - def __init__(self, *args): - """ - sets constants for use in tests - """ - - BaseDataTypes.__init__(self, *args) - self.const_field_table_dummy_dict = {'$key1':'value1','$key2':'value2'} - self.const_field_table_dummy_dict_encoded = '\x00\x00\x00\x22\x05$key2S\x00\x00\x00\x06value2\x05$key1S\x00\x00\x00\x06value1' - - # ------------------------------------------- - def test_field_table_name_value_pair(self): - """ - valid name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', {'$key1':'value1'}), '\x00\x00\x00\x11\x05$key1S\x00\x00\x00\x06value1', 'valid name value pair encoding FAILED...') - - # --------------------------------------------------- - def test_field_table_multiple_name_value_pair(self): - """ - multiple name value pair - """ - self.failUnlessEqual(self.callFunc('encode_table', self.const_field_table_dummy_dict), self.const_field_table_dummy_dict_encoded, 'multiple name value pair encoding FAILED...') - - # ------------------------------------ - def test_field_table_decode(self): - """ - field table decode - """ - self.failUnlessEqual(self.readFunc('decode_table', self.const_field_table_dummy_dict_encoded), self.const_field_table_dummy_dict, 'field table decode FAILED...') - - -# ------------------------------------ -# ------------------------------------ -class ContentTestCase(BaseDataTypes): - - """ - Handles Content data types - """ - - # ----------------------------- - def test_content_inline(self): - """ - inline content - """ - self.failUnlessEqual(self.callFunc('encode_content', 'hello inline message'), '\x00\x00\x00\x00\x14hello inline message', 'inline content encoding FAILED...') - - # -------------------------------- - def test_content_reference(self): - """ - reference content - """ - self.failUnlessEqual(self.callFunc('encode_content', ReferenceId('dummyId')), '\x01\x00\x00\x00\x07dummyId', 'reference content encoding FAILED...') - - # ------------------------------------ - def test_content_inline_decode(self): - """ - inline content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x00\x00\x00\x00\x14hello inline message'), 'hello inline message', 'inline content decode FAILED...') - - # ------------------------------------ - def test_content_reference_decode(self): - """ - reference content decode - """ - self.failUnlessEqual(self.readFunc('decode_content', '\x01\x00\x00\x00\x07dummyId').id, 'dummyId', 'reference content decode FAILED...') - -# ------------------------ # -# Pre - existing test code # -# ------------------------ # - -# --------------------- -def test(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - if isinstance(value, (list, tuple)): - values = value - else: - values = [value] - stream = StringIO() - codec = Codec(stream, SPEC) - for v in values: - codec.encode(type, v) - codec.flush() - enc = stream.getvalue() - stream.reset() - dup = [] - for i in xrange(len(values)): - dup.append(codec.decode(type)) - if values != dup: - raise AssertionError("%r --> %r --> %r" % (values, enc, dup)) - -# ----------------------- -def dotest(type, value): - """ - old test function cut/copy/paste from qpid/codec.py - """ - args = (type, value) - test(*args) - -# ------------- -def oldtests(): - """ - old test function cut/copy/paste from qpid/codec.py - """ - for value in ("1", "0", "110", "011", "11001", "10101", "10011"): - for i in range(10): - dotest("bit", map(lambda x: x == "1", value*i)) - - for value in ({}, {"asdf": "fdsa", "fdsa": 1, "three": 3}, {"one": 1}): - dotest("table", value) - - for type in ("octet", "short", "long", "longlong"): - for value in range(0, 256): - dotest(type, value) - - for type in ("shortstr", "longstr"): - for value in ("", "a", "asdf"): - dotest(type, value) - -# ----------------------------------------- -class oldTests(unittest.TestCase): - - """ - class to handle pre-existing test cases - """ - - # --------------------------- - def test_oldtestcases(self): - """ - call the old tests - """ - return oldtests() - -# --------------------------- -# --------------------------- -if __name__ == '__main__': - - codec_test_suite = unittest.TestSuite() - - #adding all the test suites... - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(IntegerTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(BitTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(StringTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TimestampTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(FieldTableTestCase)) - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(ContentTestCase)) - - #loading pre-existing test case from qpid/codec.py - codec_test_suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(oldTests)) - - run_output_stream = StringIO() - test_runner = unittest.TextTestRunner(run_output_stream, '', '') - test_result = test_runner.run(codec_test_suite) - - print '\n%d test run...' % (test_result.testsRun) - - if test_result.wasSuccessful(): - print '\nAll tests successful\n' - - if test_result.failures: - print '\n----------' - print '%d FAILURES:' % (len(test_result.failures)) - print '----------\n' - for failure in test_result.failures: - print str(failure[0]) + ' ... FAIL' - - if test_result.errors: - print '\n---------' - print '%d ERRORS:' % (len(test_result.errors)) - print '---------\n' - - for error in test_result.errors: - print str(error[0]) + ' ... ERROR' - - f = open('codec_unit_test_output.txt', 'w') - f.write(str(run_output_stream.getvalue())) - f.close() diff --git a/qpid/python/tests/codec010.py b/qpid/python/tests/codec010.py deleted file mode 100644 index 787ebc146f..0000000000 --- a/qpid/python/tests/codec010.py +++ /dev/null @@ -1,133 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import time - -from unittest import TestCase -from qpid.codec010 import StringCodec -from qpid.datatypes import timestamp, uuid4 -from qpid.ops import PRIMITIVE - -class CodecTest(TestCase): - - def check(self, type, value, compare=True): - t = PRIMITIVE[type] - sc = StringCodec() - sc.write_primitive(t, value) - decoded = sc.read_primitive(t) - if compare: - assert decoded == value, "%s, %s" % (decoded, value) - return decoded - - def testMapString(self): - self.check("map", {"string": "this is a test"}) - - def testMapUnicode(self): - self.check("map", {"unicode": u"this is a unicode test"}) - - def testMapBinary(self): - self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"}) - - def testMapBuffer(self): - s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5" - dec = self.check("map", {"buffer": buffer(s)}, False) - assert dec["buffer"] == s - - def testMapInt(self): - self.check("map", {"int": 3}) - - def testMapLong(self): - self.check("map", {"long": 2**32}) - self.check("map", {"long": 1 << 34}) - self.check("map", {"long": -(1 << 34)}) - - def testMapTimestamp(self): - decoded = self.check("map", {"timestamp": timestamp(0)}) - assert isinstance(decoded["timestamp"], timestamp) - - def testMapDatetime(self): - decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False) - assert isinstance(decoded["datetime"], timestamp) - assert decoded["datetime"] == 0.0 - - def testMapNone(self): - self.check("map", {"none": None}) - - def testMapNested(self): - self.check("map", {"map": {"string": "nested test"}}) - - def testMapList(self): - self.check("map", {"list": [1, "two", 3.0, -4]}) - - def testMapUUID(self): - self.check("map", {"uuid": uuid4()}) - - def testMapAll(self): - decoded = self.check("map", {"string": "this is a test", - "unicode": u"this is a unicode test", - "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5", - "int": 3, - "long": 2**32, - "timestamp": timestamp(0), - "none": None, - "map": {"string": "nested map"}, - "list": [1, "two", 3.0, -4], - "uuid": uuid4()}) - assert isinstance(decoded["timestamp"], timestamp) - - def testMapEmpty(self): - self.check("map", {}) - - def testMapNone(self): - self.check("map", None) - - def testList(self): - self.check("list", [1, "two", 3.0, -4]) - - def testListEmpty(self): - self.check("list", []) - - def testListNone(self): - self.check("list", None) - - def testArrayInt(self): - self.check("array", [1, 2, 3, 4]) - - def testArrayString(self): - self.check("array", ["one", "two", "three", "four"]) - - def testArrayEmpty(self): - self.check("array", []) - - def testArrayNone(self): - self.check("array", None) - - def testInt16(self): - self.check("int16", 3) - self.check("int16", -3) - - def testInt64(self): - self.check("int64", 3) - self.check("int64", -3) - self.check("int64", 1<<34) - self.check("int64", -(1<<34)) - - def testDatetime(self): - self.check("datetime", timestamp(0)) - self.check("datetime", timestamp(long(time.time()))) diff --git a/qpid/python/tests/connection.py b/qpid/python/tests/connection.py deleted file mode 100644 index 8c00df56e1..0000000000 --- a/qpid/python/tests/connection.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.connection import * -from qpid.datatypes import Message -from qpid.delegates import Server -from qpid.queue import Queue -from qpid.session import Delegate -from qpid.ops import QueueQueryResult - -PORT = 1234 - -class TestServer: - - def __init__(self, queue): - self.queue = queue - - def connection(self, connection): - return Server(connection, delegate=self.session) - - def session(self, session): - session.auto_sync = False - return TestSession(session, self.queue) - -class TestSession(Delegate): - - def __init__(self, session, queue): - self.session = session - self.queue = queue - - def execution_sync(self, es): - pass - - def queue_query(self, qq): - return QueueQueryResult(qq.queue) - - def message_transfer(self, cmd): - if cmd.destination == "echo": - m = Message(cmd.payload) - m.headers = cmd.headers - self.session.message_transfer(cmd.destination, cmd.accept_mode, - cmd.acquire_mode, m) - elif cmd.destination == "abort": - self.session.channel.connection.sock.close() - elif cmd.destination == "heartbeat": - self.session.channel.connection_heartbeat() - else: - self.queue.put(cmd) - -class ConnectionTest(TestCase): - - def setUp(self): - self.queue = Queue() - self.running = True - started = Event() - - def run(): - ts = TestServer(self.queue) - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - conn = Connection(s, delegate=ts.connection) - try: - conn.start(5) - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - assert started.isSet() - - def tearDown(self): - self.running = False - connect("127.0.0.1", PORT).close() - self.server.join(3) - - def connect(self, **kwargs): - return Connection(connect("127.0.0.1", PORT), **kwargs) - - def test(self): - c = self.connect() - c.start(10) - - ssn1 = c.session("test1", timeout=10) - ssn2 = c.session("test2", timeout=10) - - assert ssn1 == c.sessions["test1"] - assert ssn2 == c.sessions["test2"] - assert ssn1.channel != None - assert ssn2.channel != None - assert ssn1 in c.attached.values() - assert ssn2 in c.attached.values() - - ssn1.close(5) - - assert ssn1.channel == None - assert ssn1 not in c.attached.values() - assert ssn2 in c.sessions.values() - - ssn2.close(5) - - assert ssn2.channel == None - assert ssn2 not in c.attached.values() - assert ssn2 not in c.sessions.values() - - ssn = c.session("session", timeout=10) - - assert ssn.channel != None - assert ssn in c.sessions.values() - - destinations = ("one", "two", "three") - - for d in destinations: - ssn.message_transfer(d) - - for d in destinations: - cmd = self.queue.get(10) - assert cmd.destination == d - assert cmd.headers == None - assert cmd.payload == None - - msg = Message("this is a test") - ssn.message_transfer("four", message=msg) - cmd = self.queue.get(10) - assert cmd.destination == "four" - assert cmd.headers == None - assert cmd.payload == msg.body - - qq = ssn.queue_query("asdf") - assert qq.queue == "asdf" - c.close(5) - - def testCloseGet(self): - c = self.connect() - c.start(10) - ssn = c.session("test", timeout=10) - echos = ssn.incoming("echo") - - for i in range(10): - ssn.message_transfer("echo", message=Message("test%d" % i)) - - ssn.auto_sync=False - ssn.message_transfer("abort") - - for i in range(10): - m = echos.get(timeout=10) - assert m.body == "test%d" % i - - try: - m = echos.get(timeout=10) - assert False - except Closed, e: - pass - - def testCloseListen(self): - c = self.connect() - c.start(10) - ssn = c.session("test", timeout=10) - echos = ssn.incoming("echo") - - messages = [] - exceptions = [] - condition = Condition() - def listener(m): messages.append(m) - def exc_listener(e): - exceptions.append(e) - condition.acquire() - condition.notify() - condition.release() - - echos.listen(listener, exc_listener) - - for i in range(10): - ssn.message_transfer("echo", message=Message("test%d" % i)) - - ssn.auto_sync=False - ssn.message_transfer("abort") - - condition.acquire() - condition.wait(10) - condition.release() - - for i in range(10): - m = messages.pop(0) - assert m.body == "test%d" % i - - assert len(exceptions) == 1 - - def testSync(self): - c = self.connect() - c.start(10) - s = c.session("test") - s.auto_sync = False - s.message_transfer("echo", message=Message("test")) - s.sync(10) - - def testHeartbeat(self): - c = self.connect(heartbeat=10) - c.start(10) - s = c.session("test") - s.channel.connection_heartbeat() - s.message_transfer("heartbeat") diff --git a/qpid/python/tests/datatypes.py b/qpid/python/tests/datatypes.py deleted file mode 100644 index 00e649d6cf..0000000000 --- a/qpid/python/tests/datatypes.py +++ /dev/null @@ -1,296 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from unittest import TestCase -from qpid.datatypes import * -from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties - -class SerialTest(TestCase): - - def test(self): - for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)): - assert s + 1 > s - assert s - 1 < s - assert s < s + 1 - assert s > s - 1 - - assert serial(0xFFFFFFFFL) + 1 == serial(0) - - assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL) - assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0) - - def testIncr(self): - s = serial(0) - s += 1 - assert s == serial(1) - - def testIn(self): - l = [serial(1), serial(2), serial(3), serial(4)] - assert serial(1) in l - assert serial(0xFFFFFFFFL + 2) in l - assert 4 in l - - def testNone(self): - assert serial(0) != None - - def testHash(self): - d = {} - d[serial(0)] = "zero" - assert d[0] == "zero" - - def testAdd(self): - assert serial(2) + 2 == serial(4) - assert serial(2) + 2 == 4 - - def testSub(self): - delta = serial(4) - serial(2) - assert isinstance(delta, int) or isinstance(delta, long) - assert delta == 2 - - delta = serial(4) - 2 - assert isinstance(delta, Serial) - assert delta == serial(2) - -class RangedSetTest(TestCase): - - def check(self, ranges): - posts = [] - for range in ranges: - posts.append(range.lower) - posts.append(range.upper) - - sorted = posts[:] - sorted.sort() - - assert posts == sorted - - idx = 1 - while idx + 1 < len(posts): - assert posts[idx] + 1 != posts[idx+1] - idx += 2 - - def test(self): - rs = RangedSet() - - self.check(rs.ranges) - - rs.add(1) - - assert 1 in rs - assert 2 not in rs - assert 0 not in rs - self.check(rs.ranges) - - rs.add(2) - - assert 0 not in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - self.check(rs.ranges) - - rs.add(0) - - assert -1 not in rs - assert 0 in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - self.check(rs.ranges) - - rs.add(37) - - assert -1 not in rs - assert 0 in rs - assert 1 in rs - assert 2 in rs - assert 3 not in rs - assert 36 not in rs - assert 37 in rs - assert 38 not in rs - self.check(rs.ranges) - - rs.add(-1) - self.check(rs.ranges) - - rs.add(-3) - self.check(rs.ranges) - - rs.add(1, 20) - assert 21 not in rs - assert 20 in rs - self.check(rs.ranges) - - def testAddSelf(self): - a = RangedSet() - a.add(0, 8) - self.check(a.ranges) - a.add(0, 8) - self.check(a.ranges) - assert len(a.ranges) == 1 - range = a.ranges[0] - assert range.lower == 0 - assert range.upper == 8 - - def testEmpty(self): - s = RangedSet() - assert s.empty() - s.add(0, -1) - assert s.empty() - s.add(0, 0) - assert not s.empty() - - def testMinMax(self): - s = RangedSet() - assert s.max() is None - assert s.min() is None - s.add(0, 10) - assert s.max() == 10 - assert s.min() == 0 - s.add(0, 5) - assert s.max() == 10 - assert s.min() == 0 - s.add(0, 11) - assert s.max() == 11 - assert s.min() == 0 - s.add(15, 20) - assert s.max() == 20 - assert s.min() == 0 - s.add(-10, -5) - assert s.max() == 20 - assert s.min() == -10 - -class RangeTest(TestCase): - - def testIntersect1(self): - a = Range(0, 10) - b = Range(9, 20) - i1 = a.intersect(b) - i2 = b.intersect(a) - assert i1.upper == 10 - assert i2.upper == 10 - assert i1.lower == 9 - assert i2.lower == 9 - - def testIntersect2(self): - a = Range(0, 10) - b = Range(11, 20) - assert a.intersect(b) == None - assert b.intersect(a) == None - - def testIntersect3(self): - a = Range(0, 10) - b = Range(3, 5) - i1 = a.intersect(b) - i2 = b.intersect(a) - assert i1.upper == 5 - assert i2.upper == 5 - assert i1.lower == 3 - assert i2.lower == 3 - -class UUIDTest(TestCase): - - def test(self): - # this test is kind of lame, but it does excercise the basic - # functionality of the class - u = uuid4() - for i in xrange(1024): - assert u != uuid4() - -class MessageTest(TestCase): - - def setUp(self): - self.mp = MessageProperties() - self.dp = DeliveryProperties() - self.fp = FragmentProperties() - - def testHas(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.has("message_properties") - assert m.has("delivery_properties") - assert m.has("fragment_properties") - - def testGet(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - assert m.get("fragment_properties") == self.fp - - def testSet(self): - m = Message(self.mp, self.dp, "body") - assert m.get("fragment_properties") is None - m.set(self.fp) - assert m.get("fragment_properties") == self.fp - - def testSetOnEmpty(self): - m = Message("body") - assert m.get("delivery_properties") is None - m.set(self.dp) - assert m.get("delivery_properties") == self.dp - - def testSetReplace(self): - m = Message(self.mp, self.dp, self.fp, "body") - dp = DeliveryProperties() - assert m.get("delivery_properties") == self.dp - assert m.get("delivery_properties") != dp - m.set(dp) - assert m.get("delivery_properties") != self.dp - assert m.get("delivery_properties") == dp - - def testClear(self): - m = Message(self.mp, self.dp, self.fp, "body") - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - assert m.get("fragment_properties") == self.fp - m.clear("fragment_properties") - assert m.get("fragment_properties") is None - assert m.get("message_properties") == self.mp - assert m.get("delivery_properties") == self.dp - -class TimestampTest(TestCase): - - def check(self, expected, *values): - for v in values: - assert isinstance(v, timestamp) - assert v == expected - assert v == timestamp(expected) - - def testAdd(self): - self.check(4.0, - timestamp(2.0) + 2.0, - 2.0 + timestamp(2.0)) - - def testSub(self): - self.check(2.0, - timestamp(4.0) - 2.0, - 4.0 - timestamp(2.0)) - - def testNeg(self): - self.check(-4.0, -timestamp(4.0)) - - def testPos(self): - self.check(+4.0, +timestamp(4.0)) - - def testAbs(self): - self.check(4.0, abs(timestamp(-4.0))) - - def testConversion(self): - dt = timestamp(0).datetime() - t = timestamp(dt) - assert t == 0 diff --git a/qpid/python/tests/queue.py b/qpid/python/tests/queue.py deleted file mode 100644 index e12354eb43..0000000000 --- a/qpid/python/tests/queue.py +++ /dev/null @@ -1,71 +0,0 @@ -# Do not delete - marks this directory as a python package. - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import threading, time -from unittest import TestCase -from qpid.queue import Queue, Empty, Closed - - -class QueueTest (TestCase): - - # The qpid queue class just provides sime simple extensions to - # python's standard queue data structure, so we don't need to test - # all the queue functionality. - - def test_listen(self): - values = [] - heard = threading.Event() - def listener(x): - values.append(x) - heard.set() - - q = Queue(0) - q.listen(listener) - heard.clear() - q.put(1) - heard.wait() - assert values[-1] == 1 - heard.clear() - q.put(2) - heard.wait() - assert values[-1] == 2 - - q.listen(None) - q.put(3) - assert q.get(3) == 3 - q.listen(listener) - - heard.clear() - q.put(4) - heard.wait() - assert values[-1] == 4 - - def test_close(self): - q = Queue(0) - q.put(1); q.put(2); q.put(3); q.close() - assert q.get() == 1 - assert q.get() == 2 - assert q.get() == 3 - for i in range(10): - try: - q.get() - raise AssertionError("expected Closed") - except Closed: - pass diff --git a/qpid/python/tests/spec010.py b/qpid/python/tests/spec010.py deleted file mode 100644 index ac04e1ee02..0000000000 --- a/qpid/python/tests/spec010.py +++ /dev/null @@ -1,74 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import os, tempfile, shutil, stat -from unittest import TestCase -from qpid.codec010 import Codec, StringCodec -from qpid.ops import * - -class SpecTest(TestCase): - - def testSessionHeader(self): - sc = StringCodec() - sc.write_compound(Header(sync=True)) - assert sc.encoded == "\x01\x01" - - sc = StringCodec() - sc.write_compound(Header(sync=False)) - assert sc.encoded == "\x01\x00" - - def encdec(self, value): - sc = StringCodec() - sc.write_compound(value) - decoded = sc.read_compound(value.__class__) - return decoded - - def testMessageProperties(self): - props = MessageProperties(content_length=3735928559L, - reply_to=ReplyTo(exchange="the exchange name", - routing_key="the routing key")) - dec = self.encdec(props) - assert props.content_length == dec.content_length - assert props.reply_to.exchange == dec.reply_to.exchange - assert props.reply_to.routing_key == dec.reply_to.routing_key - - def testMessageSubscribe(self): - cmd = MessageSubscribe(exclusive=True, destination="this is a test") - dec = self.encdec(cmd) - assert cmd.exclusive == dec.exclusive - assert cmd.destination == dec.destination - - def testXid(self): - sc = StringCodec() - xid = Xid(format=0, global_id="gid", branch_id="bid") - sc.write_compound(xid) - assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' - dec = sc.read_compound(Xid) - assert xid.__dict__ == dec.__dict__ - -# def testLoadReadOnly(self): -# spec = "amqp.0-10-qpid-errata.xml" -# f = testrunner.get_spec_file(spec) -# dest = tempfile.mkdtemp() -# shutil.copy(f, dest) -# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) -# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) -# fname = os.path.join(dest, spec) -# load(fname) -# assert not os.path.exists("%s.pcl" % fname) -- cgit v1.2.1