From 248f1fe188fe2307b9dcf2c87a83b653eaa1920c Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Sat, 26 Dec 2009 12:42:57 +0000 Subject: synchronized with trunk except for ruby dir git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68 --- python/tests/__init__.py | 10 +---- python/tests/assembler.py | 77 ------------------------------------- python/tests/codec.py | 14 ++----- python/tests/codec010.py | 79 +++++++++++++++++++++++++++++--------- python/tests/connection.py | 44 ++++++++++++--------- python/tests/datatypes.py | 95 ++++++++++++++++++++++++++++++++++++++++------ python/tests/framer.py | 94 --------------------------------------------- python/tests/spec.py | 56 --------------------------- python/tests/spec010.py | 70 +++++++++++++++------------------- 9 files changed, 206 insertions(+), 333 deletions(-) delete mode 100644 python/tests/assembler.py delete mode 100644 python/tests/framer.py delete mode 100644 python/tests/spec.py (limited to 'python/tests') diff --git a/python/tests/__init__.py b/python/tests/__init__.py index 8ad514fc2f..1e495f3af3 100644 --- a/python/tests/__init__.py +++ b/python/tests/__init__.py @@ -19,12 +19,4 @@ # under the License. # -from codec import * -from queue import * -from spec import * -from framer import * -from assembler import * -from datatypes import * -from connection import * -from spec010 import * -from codec010 import * +import codec, queue, datatypes, connection, spec010, codec010 diff --git a/python/tests/assembler.py b/python/tests/assembler.py deleted file mode 100644 index b76924e59d..0000000000 --- a/python/tests/assembler.py +++ /dev/null @@ -1,77 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.assembler import * - -PORT = 1234 - -class AssemblerTest(TestCase): - - def setUp(self): - started = Event() - self.running = True - - def run(): - running = True - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - asm = Assembler(s) - try: - asm.write_header(*asm.read_header()[-2:]) - while True: - seg = asm.read_segment() - asm.write_segment(seg) - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - - def tearDown(self): - self.running = False - self.server.join() - - def test(self): - asm = Assembler(connect("0.0.0.0", PORT), max_payload = 1) - asm.write_header(0, 10) - asm.write_segment(Segment(True, False, 1, 2, 3, "TEST")) - asm.write_segment(Segment(False, True, 1, 2, 3, "ING")) - - assert asm.read_header() == ("AMQP", 1, 1, 0, 10) - - seg = asm.read_segment() - assert seg.first == True - assert seg.last == False - assert seg.type == 1 - assert seg.track == 2 - assert seg.channel == 3 - assert seg.payload == "TEST" - - seg = asm.read_segment() - assert seg.first == False - assert seg.last == True - assert seg.type == 1 - assert seg.track == 2 - assert seg.channel == 3 - assert seg.payload == "ING" diff --git a/python/tests/codec.py b/python/tests/codec.py index 4bd3675af8..9b51b4713c 100644 --- a/python/tests/codec.py +++ b/python/tests/codec.py @@ -23,7 +23,6 @@ from qpid.codec import Codec from qpid.spec import load from cStringIO import StringIO from qpid.reference import ReferenceId -from qpid.testlib import testrunner __doc__ = """ @@ -54,13 +53,8 @@ __doc__ = """ """ -SPEC = None - -def spec(): - global SPEC - if SPEC == None: - SPEC = load(testrunner.get_spec_file("amqp.0-8.xml")) - return SPEC +from qpid_config import amqp_spec_0_8 +SPEC = load(amqp_spec_0_8) # -------------------------------------- # -------------------------------------- @@ -76,7 +70,7 @@ class BaseDataTypes(unittest.TestCase): """ standard setUp for unitetest (refer unittest documentation for details) """ - self.codec = Codec(StringIO(), spec()) + self.codec = Codec(StringIO(), SPEC) # ------------------ def tearDown(self): @@ -507,7 +501,7 @@ def test(type, value): else: values = [value] stream = StringIO() - codec = Codec(stream, spec()) + codec = Codec(stream, SPEC) for v in values: codec.encode(type, v) codec.flush() diff --git a/python/tests/codec010.py b/python/tests/codec010.py index 835966e103..787ebc146f 100644 --- a/python/tests/codec010.py +++ b/python/tests/codec010.py @@ -17,31 +17,54 @@ # under the License. # +import time + from unittest import TestCase -from qpid.spec010 import load from qpid.codec010 import StringCodec -from qpid.testlib import testrunner +from qpid.datatypes import timestamp, uuid4 +from qpid.ops import PRIMITIVE class CodecTest(TestCase): - def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10.xml")) - - def check(self, type, value): - t = self.spec[type] - sc = StringCodec(self.spec) - t.encode(sc, value) - decoded = t.decode(sc) - assert decoded == value, "%s, %s" % (decoded, value) + def check(self, type, value, compare=True): + t = PRIMITIVE[type] + sc = StringCodec() + sc.write_primitive(t, value) + decoded = sc.read_primitive(t) + if compare: + assert decoded == value, "%s, %s" % (decoded, value) + return decoded def testMapString(self): self.check("map", {"string": "this is a test"}) + def testMapUnicode(self): + self.check("map", {"unicode": u"this is a unicode test"}) + + def testMapBinary(self): + self.check("map", {"binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5"}) + + def testMapBuffer(self): + s = "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5" + dec = self.check("map", {"buffer": buffer(s)}, False) + assert dec["buffer"] == s + def testMapInt(self): self.check("map", {"int": 3}) def testMapLong(self): self.check("map", {"long": 2**32}) + self.check("map", {"long": 1 << 34}) + self.check("map", {"long": -(1 << 34)}) + + def testMapTimestamp(self): + decoded = self.check("map", {"timestamp": timestamp(0)}) + assert isinstance(decoded["timestamp"], timestamp) + + def testMapDatetime(self): + decoded = self.check("map", {"datetime": timestamp(0).datetime()}, compare=False) + assert isinstance(decoded["datetime"], timestamp) + assert decoded["datetime"] == 0.0 def testMapNone(self): self.check("map", {"none": None}) @@ -52,13 +75,21 @@ class CodecTest(TestCase): def testMapList(self): self.check("map", {"list": [1, "two", 3.0, -4]}) + def testMapUUID(self): + self.check("map", {"uuid": uuid4()}) + def testMapAll(self): - self.check("map", {"string": "this is a test", - "int": 3, - "long": 2**32, - "none": None, - "map": {"string": "nested map"}, - "list": [1, "two", 3.0, -4]}) + decoded = self.check("map", {"string": "this is a test", + "unicode": u"this is a unicode test", + "binary": "\x7f\xb4R^\xe5\xf0:\x89\x96E1\xf6\xfe\xb9\x1b\xf5", + "int": 3, + "long": 2**32, + "timestamp": timestamp(0), + "none": None, + "map": {"string": "nested map"}, + "list": [1, "two", 3.0, -4], + "uuid": uuid4()}) + assert isinstance(decoded["timestamp"], timestamp) def testMapEmpty(self): self.check("map", {}) @@ -86,3 +117,17 @@ class CodecTest(TestCase): def testArrayNone(self): self.check("array", None) + + def testInt16(self): + self.check("int16", 3) + self.check("int16", -3) + + def testInt64(self): + self.check("int64", 3) + self.check("int64", -3) + self.check("int64", 1<<34) + self.check("int64", -(1<<34)) + + def testDatetime(self): + self.check("datetime", timestamp(0)) + self.check("datetime", timestamp(long(time.time()))) diff --git a/python/tests/connection.py b/python/tests/connection.py index 23e0c937fb..8c00df56e1 100644 --- a/python/tests/connection.py +++ b/python/tests/connection.py @@ -22,11 +22,10 @@ from unittest import TestCase from qpid.util import connect, listen from qpid.connection import * from qpid.datatypes import Message -from qpid.testlib import testrunner from qpid.delegates import Server from qpid.queue import Queue -from qpid.spec010 import load from qpid.session import Delegate +from qpid.ops import QueueQueryResult PORT = 1234 @@ -52,23 +51,24 @@ class TestSession(Delegate): pass def queue_query(self, qq): - return qq._type.result.type.new((qq.queue,), {}) + return QueueQueryResult(qq.queue) - def message_transfer(self, cmd, headers, body): + def message_transfer(self, cmd): if cmd.destination == "echo": - m = Message(body) - m.headers = headers + m = Message(cmd.payload) + m.headers = cmd.headers self.session.message_transfer(cmd.destination, cmd.accept_mode, cmd.acquire_mode, m) elif cmd.destination == "abort": self.session.channel.connection.sock.close() + elif cmd.destination == "heartbeat": + self.session.channel.connection_heartbeat() else: - self.queue.put((cmd, headers, body)) + self.queue.put(cmd) class ConnectionTest(TestCase): def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10.xml")) self.queue = Queue() self.running = True started = Event() @@ -76,7 +76,7 @@ class ConnectionTest(TestCase): def run(): ts = TestServer(self.queue) for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - conn = Connection(s, self.spec, ts.connection) + conn = Connection(s, delegate=ts.connection) try: conn.start(5) except Closed: @@ -87,14 +87,15 @@ class ConnectionTest(TestCase): self.server.start() started.wait(3) + assert started.isSet() def tearDown(self): self.running = False - connect("0.0.0.0", PORT).close() + connect("127.0.0.1", PORT).close() self.server.join(3) - def connect(self): - return Connection(connect("0.0.0.0", PORT), self.spec) + def connect(self, **kwargs): + return Connection(connect("127.0.0.1", PORT), **kwargs) def test(self): c = self.connect() @@ -133,17 +134,17 @@ class ConnectionTest(TestCase): ssn.message_transfer(d) for d in destinations: - cmd, header, body = self.queue.get(10) + cmd = self.queue.get(10) assert cmd.destination == d - assert header == None - assert body == None + assert cmd.headers == None + assert cmd.payload == None msg = Message("this is a test") ssn.message_transfer("four", message=msg) - cmd, header, body = self.queue.get(10) + cmd = self.queue.get(10) assert cmd.destination == "four" - assert header == None - assert body == msg.body + assert cmd.headers == None + assert cmd.payload == msg.body qq = ssn.queue_query("asdf") assert qq.queue == "asdf" @@ -212,3 +213,10 @@ class ConnectionTest(TestCase): s.auto_sync = False s.message_transfer("echo", message=Message("test")) s.sync(10) + + def testHeartbeat(self): + c = self.connect(heartbeat=10) + c.start(10) + s = c.session("test") + s.channel.connection_heartbeat() + s.message_transfer("heartbeat") diff --git a/python/tests/datatypes.py b/python/tests/datatypes.py index ef98e81da0..00e649d6cf 100644 --- a/python/tests/datatypes.py +++ b/python/tests/datatypes.py @@ -18,23 +18,22 @@ # from unittest import TestCase -from qpid.testlib import testrunner -from qpid.spec010 import load from qpid.datatypes import * +from qpid.ops import DeliveryProperties, FragmentProperties, MessageProperties class SerialTest(TestCase): def test(self): - for s in (serial(0), serial(0x8FFFFFFF), serial(0xFFFFFFFF)): + for s in (serial(0), serial(0x8FFFFFFFL), serial(0xFFFFFFFFL)): assert s + 1 > s assert s - 1 < s assert s < s + 1 assert s > s - 1 - assert serial(0xFFFFFFFF) + 1 == serial(0) + assert serial(0xFFFFFFFFL) + 1 == serial(0) - assert min(serial(0xFFFFFFFF), serial(0x0)) == serial(0xFFFFFFFF) - assert max(serial(0xFFFFFFFF), serial(0x0)) == serial(0x0) + assert min(serial(0xFFFFFFFFL), serial(0x0)) == serial(0xFFFFFFFFL) + assert max(serial(0xFFFFFFFFL), serial(0x0)) == serial(0x0) def testIncr(self): s = serial(0) @@ -44,7 +43,7 @@ class SerialTest(TestCase): def testIn(self): l = [serial(1), serial(2), serial(3), serial(4)] assert serial(1) in l - assert serial(0xFFFFFFFF + 2) in l + assert serial(0xFFFFFFFFL + 2) in l assert 4 in l def testNone(self): @@ -55,6 +54,19 @@ class SerialTest(TestCase): d[serial(0)] = "zero" assert d[0] == "zero" + def testAdd(self): + assert serial(2) + 2 == serial(4) + assert serial(2) + 2 == 4 + + def testSub(self): + delta = serial(4) - serial(2) + assert isinstance(delta, int) or isinstance(delta, long) + assert delta == 2 + + delta = serial(4) - 2 + assert isinstance(delta, Serial) + assert delta == serial(2) + class RangedSetTest(TestCase): def check(self, ranges): @@ -136,6 +148,34 @@ class RangedSetTest(TestCase): assert range.lower == 0 assert range.upper == 8 + def testEmpty(self): + s = RangedSet() + assert s.empty() + s.add(0, -1) + assert s.empty() + s.add(0, 0) + assert not s.empty() + + def testMinMax(self): + s = RangedSet() + assert s.max() is None + assert s.min() is None + s.add(0, 10) + assert s.max() == 10 + assert s.min() == 0 + s.add(0, 5) + assert s.max() == 10 + assert s.min() == 0 + s.add(0, 11) + assert s.max() == 11 + assert s.min() == 0 + s.add(15, 20) + assert s.max() == 20 + assert s.min() == 0 + s.add(-10, -5) + assert s.max() == 20 + assert s.min() == -10 + class RangeTest(TestCase): def testIntersect1(self): @@ -176,10 +216,9 @@ class UUIDTest(TestCase): class MessageTest(TestCase): def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml")) - self.mp = Struct(self.spec["message.message_properties"]) - self.dp = Struct(self.spec["message.delivery_properties"]) - self.fp = Struct(self.spec["message.fragment_properties"]) + self.mp = MessageProperties() + self.dp = DeliveryProperties() + self.fp = FragmentProperties() def testHas(self): m = Message(self.mp, self.dp, self.fp, "body") @@ -207,7 +246,7 @@ class MessageTest(TestCase): def testSetReplace(self): m = Message(self.mp, self.dp, self.fp, "body") - dp = Struct(self.spec["message.delivery_properties"]) + dp = DeliveryProperties() assert m.get("delivery_properties") == self.dp assert m.get("delivery_properties") != dp m.set(dp) @@ -223,3 +262,35 @@ class MessageTest(TestCase): assert m.get("fragment_properties") is None assert m.get("message_properties") == self.mp assert m.get("delivery_properties") == self.dp + +class TimestampTest(TestCase): + + def check(self, expected, *values): + for v in values: + assert isinstance(v, timestamp) + assert v == expected + assert v == timestamp(expected) + + def testAdd(self): + self.check(4.0, + timestamp(2.0) + 2.0, + 2.0 + timestamp(2.0)) + + def testSub(self): + self.check(2.0, + timestamp(4.0) - 2.0, + 4.0 - timestamp(2.0)) + + def testNeg(self): + self.check(-4.0, -timestamp(4.0)) + + def testPos(self): + self.check(+4.0, +timestamp(4.0)) + + def testAbs(self): + self.check(4.0, abs(timestamp(-4.0))) + + def testConversion(self): + dt = timestamp(0).datetime() + t = timestamp(dt) + assert t == 0 diff --git a/python/tests/framer.py b/python/tests/framer.py deleted file mode 100644 index 05bb467bbe..0000000000 --- a/python/tests/framer.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from threading import * -from unittest import TestCase -from qpid.util import connect, listen -from qpid.framer import * - -PORT = 1234 - -class FramerTest(TestCase): - - def setUp(self): - self.running = True - started = Event() - def run(): - for s in listen("0.0.0.0", PORT, lambda: self.running, lambda: started.set()): - conn = Framer(s) - try: - conn.write_header(*conn.read_header()[-2:]) - while True: - frame = conn.read_frame() - conn.write_frame(frame) - conn.flush() - except Closed: - pass - - self.server = Thread(target=run) - self.server.setDaemon(True) - self.server.start() - - started.wait(3) - - def tearDown(self): - self.running = False - self.server.join(3) - - def test(self): - c = Framer(connect("0.0.0.0", PORT)) - - c.write_header(0, 10) - assert c.read_header() == ("AMQP", 1, 1, 0, 10) - - c.write_frame(Frame(FIRST_FRM, 1, 2, 3, "THIS")) - c.write_frame(Frame(0, 1, 2, 3, "IS")) - c.write_frame(Frame(0, 1, 2, 3, "A")) - c.write_frame(Frame(LAST_FRM, 1, 2, 3, "TEST")) - c.flush() - - f = c.read_frame() - assert f.flags & FIRST_FRM - assert not (f.flags & LAST_FRM) - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "THIS" - - f = c.read_frame() - assert f.flags == 0 - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "IS" - - f = c.read_frame() - assert f.flags == 0 - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "A" - - f = c.read_frame() - assert f.flags & LAST_FRM - assert not (f.flags & FIRST_FRM) - assert f.type == 1 - assert f.track == 2 - assert f.channel == 3 - assert f.payload == "TEST" diff --git a/python/tests/spec.py b/python/tests/spec.py deleted file mode 100644 index ce03640493..0000000000 --- a/python/tests/spec.py +++ /dev/null @@ -1,56 +0,0 @@ -from unittest import TestCase -from qpid.spec import load -from qpid.testlib import testrunner - -class SpecTest(TestCase): - - def check_load(self, *urls): - spec = load(*map(testrunner.get_spec_file, urls)) - qdecl = spec.method("queue_declare") - assert qdecl != None - assert not qdecl.content - - queue = qdecl.fields.byname["queue"] - assert queue != None - assert queue.domain.name == "queue_name" - assert queue.type == "shortstr" - - qdecl_ok = spec.method("queue_declare_ok") - - # 0-8 is actually 8-0 - if (spec.major == 8 and spec.minor == 0 or - spec.major == 0 and spec.minor == 9): - assert qdecl_ok != None - - assert len(qdecl.responses) == 1 - assert qdecl_ok in qdecl.responses - - publish = spec.method("basic_publish") - assert publish != None - assert publish.content - - if (spec.major == 0 and spec.minor == 10): - assert qdecl_ok == None - reply_to = spec.domains.byname["reply_to"] - assert reply_to.type.size == 2 - assert reply_to.type.pack == 2 - assert len(reply_to.type.fields) == 2 - - qq = spec.method("queue_query") - assert qq != None - assert qq.result.size == 4 - assert qq.result.type != None - args = qq.result.fields.byname["arguments"] - assert args.type == "table" - - def test_load_0_8(self): - self.check_load("amqp.0-8.xml") - - def test_load_0_9(self): - self.check_load("amqp.0-9.xml") - - def test_load_0_9_errata(self): - self.check_load("amqp.0-9.xml", "amqp-errata.0-9.xml") - - def test_load_0_10(self): - self.check_load("amqp.0-10-preview.xml") diff --git a/python/tests/spec010.py b/python/tests/spec010.py index df9cb9590a..ac04e1ee02 100644 --- a/python/tests/spec010.py +++ b/python/tests/spec010.py @@ -19,66 +19,56 @@ import os, tempfile, shutil, stat from unittest import TestCase -from qpid.spec010 import load from qpid.codec010 import Codec, StringCodec -from qpid.testlib import testrunner -from qpid.datatypes import Struct +from qpid.ops import * class SpecTest(TestCase): - def setUp(self): - self.spec = load(testrunner.get_spec_file("amqp.0-10-qpid-errata.xml")) - def testSessionHeader(self): - hdr = self.spec["session.header"] - sc = StringCodec(self.spec) - hdr.encode(sc, Struct(hdr, sync=True)) + sc = StringCodec() + sc.write_compound(Header(sync=True)) assert sc.encoded == "\x01\x01" - sc = StringCodec(self.spec) - hdr.encode(sc, Struct(hdr, sync=False)) + sc = StringCodec() + sc.write_compound(Header(sync=False)) assert sc.encoded == "\x01\x00" - def encdec(self, type, value): - sc = StringCodec(self.spec) - type.encode(sc, value) - decoded = type.decode(sc) + def encdec(self, value): + sc = StringCodec() + sc.write_compound(value) + decoded = sc.read_compound(value.__class__) return decoded def testMessageProperties(self): - mp = self.spec["message.message_properties"] - rt = self.spec["message.reply_to"] - - props = Struct(mp, content_length=3735928559L, - reply_to=Struct(rt, exchange="the exchange name", - routing_key="the routing key")) - dec = self.encdec(mp, props) + props = MessageProperties(content_length=3735928559L, + reply_to=ReplyTo(exchange="the exchange name", + routing_key="the routing key")) + dec = self.encdec(props) assert props.content_length == dec.content_length assert props.reply_to.exchange == dec.reply_to.exchange assert props.reply_to.routing_key == dec.reply_to.routing_key def testMessageSubscribe(self): - ms = self.spec["message.subscribe"] - cmd = Struct(ms, exclusive=True, destination="this is a test") - dec = self.encdec(self.spec["message.subscribe"], cmd) + cmd = MessageSubscribe(exclusive=True, destination="this is a test") + dec = self.encdec(cmd) assert cmd.exclusive == dec.exclusive assert cmd.destination == dec.destination def testXid(self): - xid = self.spec["dtx.xid"] - sc = StringCodec(self.spec) - st = Struct(xid, format=0, global_id="gid", branch_id="bid") - xid.encode(sc, st) + sc = StringCodec() + xid = Xid(format=0, global_id="gid", branch_id="bid") + sc.write_compound(xid) assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid' - assert xid.decode(sc).__dict__ == st.__dict__ + dec = sc.read_compound(Xid) + assert xid.__dict__ == dec.__dict__ - def testLoadReadOnly(self): - spec = "amqp.0-10-qpid-errata.xml" - f = testrunner.get_spec_file(spec) - dest = tempfile.mkdtemp() - shutil.copy(f, dest) - shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) - os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) - fname = os.path.join(dest, spec) - load(fname) - assert not os.path.exists("%s.pcl" % fname) +# def testLoadReadOnly(self): +# spec = "amqp.0-10-qpid-errata.xml" +# f = testrunner.get_spec_file(spec) +# dest = tempfile.mkdtemp() +# shutil.copy(f, dest) +# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest) +# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR) +# fname = os.path.join(dest, spec) +# load(fname) +# assert not os.path.exists("%s.pcl" % fname) -- cgit v1.2.1