diff options
Diffstat (limited to 't/oldint')
-rw-r--r-- | t/oldint/__init__.py | 6 | ||||
-rw-r--r-- | t/oldint/tests/__init__.py | 8 | ||||
-rw-r--r-- | t/oldint/tests/test_SLMQ.py | 19 | ||||
-rw-r--r-- | t/oldint/tests/test_SQS.py | 20 | ||||
-rw-r--r-- | t/oldint/tests/test_amqp.py | 7 | ||||
-rw-r--r-- | t/oldint/tests/test_librabbitmq.py | 10 | ||||
-rw-r--r-- | t/oldint/tests/test_mongodb.py | 73 | ||||
-rw-r--r-- | t/oldint/tests/test_pyamqp.py | 7 | ||||
-rw-r--r-- | t/oldint/tests/test_qpid.py | 10 | ||||
-rw-r--r-- | t/oldint/tests/test_redis.py | 19 | ||||
-rw-r--r-- | t/oldint/tests/test_zookeeper.py | 14 | ||||
-rw-r--r-- | t/oldint/transport.py | 294 |
12 files changed, 487 insertions, 0 deletions
diff --git a/t/oldint/__init__.py b/t/oldint/__init__.py new file mode 100644 index 00000000..23ffb4c1 --- /dev/null +++ b/t/oldint/__init__.py @@ -0,0 +1,6 @@ + +import os +import sys + +sys.path.insert(0, os.pardir) +sys.path.insert(0, os.getcwd()) diff --git a/t/oldint/tests/__init__.py b/t/oldint/tests/__init__.py new file mode 100644 index 00000000..094b08e0 --- /dev/null +++ b/t/oldint/tests/__init__.py @@ -0,0 +1,8 @@ + +import os +import sys + +sys.path.insert(0, os.path.join(os.getcwd(), os.pardir)) +print(sys.path[0]) +sys.path.insert(0, os.getcwd()) +print(sys.path[0]) diff --git a/t/oldint/tests/test_SLMQ.py b/t/oldint/tests/test_SLMQ.py new file mode 100644 index 00000000..8428f7d1 --- /dev/null +++ b/t/oldint/tests/test_SLMQ.py @@ -0,0 +1,19 @@ + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_environ('SLMQ_ACCOUNT') +@skip.unless_environ('SL_USERNAME') +@skip.unless_environ('SL_API_KEY') +@skip.unless_environ('SLMQ_HOST') +@skip.unless_environ('SLMQ_SECURE') +class test_SLMQ(transport.TransportCase): + transport = 'SLMQ' + prefix = 'slmq' + event_loop_max = 100 + message_size_limit = 4192 + reliable_purge = False + #: does not guarantee FIFO order, even in simple cases. + suppress_disorder_warning = True diff --git a/t/oldint/tests/test_SQS.py b/t/oldint/tests/test_SQS.py new file mode 100644 index 00000000..571cff24 --- /dev/null +++ b/t/oldint/tests/test_SQS.py @@ -0,0 +1,20 @@ + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_environ('AWS_ACCESS_KEY_ID') +@skip.unless_environ('AWS_SECRET_ACCESS_KEY') +@skip.unless_module('boto') +class test_SQS(transport.TransportCase): + transport = 'SQS' + prefix = 'sqs' + event_loop_max = 100 + message_size_limit = 4192 # SQS max body size / 2. + reliable_purge = False + #: does not guarantee FIFO order, even in simple cases + suppress_disorder_warning = True + + def after_connect(self, connection): + connection.channel().sqs diff --git a/t/oldint/tests/test_amqp.py b/t/oldint/tests/test_amqp.py new file mode 100644 index 00000000..f7aa762e --- /dev/null +++ b/t/oldint/tests/test_amqp.py @@ -0,0 +1,7 @@ + +from funtests import transport + + +class test_pyamqp(transport.TransportCase): + transport = 'pyamqp' + prefix = 'pyamqp' diff --git a/t/oldint/tests/test_librabbitmq.py b/t/oldint/tests/test_librabbitmq.py new file mode 100644 index 00000000..41a21b3c --- /dev/null +++ b/t/oldint/tests/test_librabbitmq.py @@ -0,0 +1,10 @@ + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_module('librabbitmq') +class test_librabbitmq(transport.TransportCase): + transport = 'librabbitmq' + prefix = 'librabbitmq' diff --git a/t/oldint/tests/test_mongodb.py b/t/oldint/tests/test_mongodb.py new file mode 100644 index 00000000..495208c7 --- /dev/null +++ b/t/oldint/tests/test_mongodb.py @@ -0,0 +1,73 @@ +from kombu import Consumer, Producer, Exchange, Queue +from kombu.utils.compat import nested + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_module('pymongo') +class test_mongodb(transport.TransportCase): + transport = 'mongodb' + prefix = 'mongodb' + event_loop_max = 100 + + def after_connect(self, connection): + connection.channel().client # evaluate connection. + self.c = self.connection # shortcut + + def test_fanout(self, name='test_mongodb_fanout'): + if not self.verify_alive(): + return + c = self.connection + self.e = Exchange(name, type='fanout') + self.q = Queue(name, exchange=self.e, routing_key=name) + self.q2 = Queue(name + '2', exchange=self.e, routing_key=name + '2') + + channel = c.default_channel + producer = Producer(channel, self.e) + consumer1 = Consumer(channel, self.q) + consumer2 = Consumer(channel, self.q2) + self.q2(channel).declare() + + for i in range(10): + producer.publish({'foo': i}, routing_key=name) + for i in range(10): + producer.publish({'foo': i}, routing_key=name + '2') + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + with nested(consumer1, consumer2): + + while 1: + if len(_received1) + len(_received2) == 20: + break + c.drain_events(timeout=60) + self.assertEqual(len(_received1) + len(_received2), 20) + + # queue.delete + for i in range(10): + producer.publish({'foo': i}, routing_key=name) + self.assertTrue(self.q(channel).get()) + self.q(channel).delete() + self.q(channel).declare() + self.assertIsNone(self.q(channel).get()) + + # queue.purge + for i in range(10): + producer.publish({'foo': i}, routing_key=name + '2') + self.assertTrue(self.q2(channel).get()) + self.q2(channel).purge() + self.assertIsNone(self.q2(channel).get()) diff --git a/t/oldint/tests/test_pyamqp.py b/t/oldint/tests/test_pyamqp.py new file mode 100644 index 00000000..f7aa762e --- /dev/null +++ b/t/oldint/tests/test_pyamqp.py @@ -0,0 +1,7 @@ + +from funtests import transport + + +class test_pyamqp(transport.TransportCase): + transport = 'pyamqp' + prefix = 'pyamqp' diff --git a/t/oldint/tests/test_qpid.py b/t/oldint/tests/test_qpid.py new file mode 100644 index 00000000..a5c9141d --- /dev/null +++ b/t/oldint/tests/test_qpid.py @@ -0,0 +1,10 @@ + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_module('qpid.messaging') +class test_qpid(transport.TransportCase): + transport = 'qpid' + prefix = 'qpid' diff --git a/t/oldint/tests/test_redis.py b/t/oldint/tests/test_redis.py new file mode 100644 index 00000000..610b0149 --- /dev/null +++ b/t/oldint/tests/test_redis.py @@ -0,0 +1,19 @@ + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_module('redis') +class test_redis(transport.TransportCase): + transport = 'redis' + prefix = 'redis' + + def after_connect(self, connection): + client = connection.channel().client + client.info() + + def test_cannot_connect_raises_connection_error(self): + conn = self.get_connection(port=65534) + with self.assertRaises(conn.connection_errors): + conn.connect() diff --git a/t/oldint/tests/test_zookeeper.py b/t/oldint/tests/test_zookeeper.py new file mode 100644 index 00000000..150c4a35 --- /dev/null +++ b/t/oldint/tests/test_zookeeper.py @@ -0,0 +1,14 @@ + +from funtests import transport + +from kombu.tests.case import skip + + +@skip.unless_module('kazoo') +class test_zookeeper(transport.TransportCase): + transport = 'zookeeper' + prefix = 'zookeeper' + event_loop_max = 100 + + def after_connect(self, connection): + connection.channel().client diff --git a/t/oldint/transport.py b/t/oldint/transport.py new file mode 100644 index 00000000..36b4d33f --- /dev/null +++ b/t/oldint/transport.py @@ -0,0 +1,294 @@ +import random +import sha +import socket +import string +import time +import unittest2 as unittest +import warnings +import weakref +from case.skip import SkipTest +from kombu import Connection +from kombu import Exchange, Queue + + +def consumeN(conn, consumer, n=1, timeout=30): + messages = [] + + def callback(message_data, message): + messages.append(message_data) + message.ack() + + prev, consumer.callbacks = consumer.callbacks, [callback] + consumer.consume() + + seconds = 0 + while True: + try: + conn.drain_events(timeout=1) + except socket.timeout: + seconds += 1 + msg = 'Received %s/%s messages. %s seconds passed.' % ( + len(messages), n, seconds) + if seconds >= timeout: + raise socket.timeout(msg) + if seconds > 1: + print(msg) + if len(messages) >= n: + break + + consumer.cancel() + consumer.callback = prev + return messages + + +class TransportCase(unittest.TestCase): + transport = None + prefix = None + sep = '.' + userid = None + password = None + event_loop_max = 100 + connection_options = {} + suppress_disorder_warning = False + reliable_purge = True + + connected = False + skip_test_reason = None + + message_size_limit = None + + def before_connect(self): + ... + + def after_connect(self, connection): + ... + + def setUp(self): + if self.transport: + try: + self.before_connect() + except SkipTest as exc: + self.skip_test_reason = str(exc) + else: + self.do_connect() + self.exchange = Exchange(self.prefix, 'direct') + self.queue = Queue(self.prefix, self.exchange, self.prefix) + + def purge(self, names): + chan = self.connection.channel() + total = 0 + for queue in names: + while 1: + # ensure the queue is completly empty + purged = chan.queue_purge(queue=queue) + if not purged: + break + total += purged + chan.close() + return total + + def get_connection(self, **options): + if self.userid: + options.setdefault('userid', self.userid) + if self.password: + options.setdefault('password', self.password) + return Connection(transport=self.transport, **options) + + def do_connect(self): + self.connection = self.get_connection(**self.connection_options) + try: + self.connection.connect() + self.after_connect(self.connection) + except self.connection.connection_errors: + self.skip_test_reason = '{0} transport cannot connect'.format( + self.transport, + ) + else: + self.connected = True + + def verify_alive(self): + if self.transport: + if not self.connected: + raise SkipTest(self.skip_test_reason) + return True + + def purge_consumer(self, consumer): + return self.purge([queue.name for queue in consumer.queues]) + + def test_produce__consume(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) + producer.publish({'foo': 'bar'}, routing_key=self.prefix) + message = consumeN(self.connection, consumer) + self.assertDictEqual(message[0], {'foo': 'bar'}) + chan1.close() + self.purge([self.queue.name]) + + def test_purge(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + + producer = chan1.Producer(self.exchange) + for i in range(10): + producer.publish({'foo': 'bar'}, routing_key=self.prefix) + if self.reliable_purge: + self.assertEqual(consumer.purge(), 10) + self.assertEqual(consumer.purge(), 0) + else: + purged = 0 + while purged < 9: + purged += self.purge_consumer(consumer) + + def _digest(self, data): + return sha.new(data).hexdigest() + + def test_produce__consume_large_messages( + self, bytes=1048576, n=10, + charset=string.punctuation + string.ascii_letters + string.digits): + if not self.verify_alive(): + return + bytes = min(x for x in [bytes, self.message_size_limit] if x) + messages = [''.join(random.choice(charset) + for j in range(bytes)) + '--%s' % n + for i in range(n)] + digests = [] + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) + for i, message in enumerate(messages): + producer.publish({'text': message, + 'i': i}, routing_key=self.prefix) + digests.append(self._digest(message)) + + received = [(msg['i'], msg['text']) + for msg in consumeN(self.connection, consumer, n)] + self.assertEqual(len(received), n) + ordering = [i for i, _ in received] + if ordering != list(range(n)) and not self.suppress_disorder_warning: + warnings.warn( + '%s did not deliver messages in FIFO order: %r' % ( + self.transport, ordering)) + + for i, text in received: + if text != messages[i]: + raise AssertionError('%i: %r is not %r' % ( + i, text[-100:], messages[i][-100:])) + self.assertEqual(self._digest(text), digests[i]) + + chan1.close() + self.purge([self.queue.name]) + + def P(self, rest): + return '%s%s%s' % (self.prefix, self.sep, rest) + + def test_produce__consume_multiple(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + producer = chan1.Producer(self.exchange) + b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1) + b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1) + b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1) + [q.declare() for q in (b1, b2, b3)] + self.purge([b1.name, b2.name, b3.name]) + + producer.publish('b1', routing_key='b1') + producer.publish('b2', routing_key='b2') + producer.publish('b3', routing_key='b3') + chan1.close() + + chan2 = self.connection.channel() + consumer = chan2.Consumer([b1, b2, b3]) + messages = consumeN(self.connection, consumer, 3) + self.assertItemsEqual(messages, ['b1', 'b2', 'b3']) + chan2.close() + self.purge([self.P('b1'), self.P('b2'), self.P('b3')]) + + def test_timeout(self): + if not self.verify_alive(): + return + chan = self.connection.channel() + self.purge([self.queue.name]) + consumer = chan.Consumer(self.queue) + self.assertRaises( + socket.timeout, self.connection.drain_events, timeout=0.3, + ) + consumer.cancel() + chan.close() + + def test_basic_get(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + producer = chan1.Producer(self.exchange) + chan2 = self.connection.channel() + queue = Queue(self.P('basic_get'), self.exchange, 'basic_get') + queue = queue(chan2) + queue.declare() + producer.publish({'basic.get': 'this'}, routing_key='basic_get') + chan1.close() + + for i in range(self.event_loop_max): + m = queue.get() + if m: + break + time.sleep(0.1) + self.assertEqual(m.payload, {'basic.get': 'this'}) + self.purge([queue.name]) + chan2.close() + + def test_cyclic_reference_transport(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.transport + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + + def test_cyclic_reference_connection(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.connect() + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + + def test_cyclic_reference_channel(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.connect() + chanrefs = [] + try: + for i in range(100): + channel = conn.channel() + chanrefs.append(weakref.ref(channel)) + channel.close() + finally: + conn.close() + return chanrefs + + for chanref in _createref(): + self.assertIsNone(chanref()) + + def tearDown(self): + if self.transport and self.connected: + self.connection.close() |