diff options
Diffstat (limited to 't/oldint/transport.py')
-rw-r--r-- | t/oldint/transport.py | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/t/oldint/transport.py b/t/oldint/transport.py new file mode 100644 index 00000000..36b4d33f --- /dev/null +++ b/t/oldint/transport.py @@ -0,0 +1,294 @@ +import random +import sha +import socket +import string +import time +import unittest2 as unittest +import warnings +import weakref +from case.skip import SkipTest +from kombu import Connection +from kombu import Exchange, Queue + + +def consumeN(conn, consumer, n=1, timeout=30): + messages = [] + + def callback(message_data, message): + messages.append(message_data) + message.ack() + + prev, consumer.callbacks = consumer.callbacks, [callback] + consumer.consume() + + seconds = 0 + while True: + try: + conn.drain_events(timeout=1) + except socket.timeout: + seconds += 1 + msg = 'Received %s/%s messages. %s seconds passed.' % ( + len(messages), n, seconds) + if seconds >= timeout: + raise socket.timeout(msg) + if seconds > 1: + print(msg) + if len(messages) >= n: + break + + consumer.cancel() + consumer.callback = prev + return messages + + +class TransportCase(unittest.TestCase): + transport = None + prefix = None + sep = '.' + userid = None + password = None + event_loop_max = 100 + connection_options = {} + suppress_disorder_warning = False + reliable_purge = True + + connected = False + skip_test_reason = None + + message_size_limit = None + + def before_connect(self): + ... + + def after_connect(self, connection): + ... + + def setUp(self): + if self.transport: + try: + self.before_connect() + except SkipTest as exc: + self.skip_test_reason = str(exc) + else: + self.do_connect() + self.exchange = Exchange(self.prefix, 'direct') + self.queue = Queue(self.prefix, self.exchange, self.prefix) + + def purge(self, names): + chan = self.connection.channel() + total = 0 + for queue in names: + while 1: + # ensure the queue is completly empty + purged = chan.queue_purge(queue=queue) + if not purged: + break + total += purged + chan.close() + return total + + def get_connection(self, **options): + if self.userid: + options.setdefault('userid', self.userid) + if self.password: + options.setdefault('password', self.password) + return Connection(transport=self.transport, **options) + + def do_connect(self): + self.connection = self.get_connection(**self.connection_options) + try: + self.connection.connect() + self.after_connect(self.connection) + except self.connection.connection_errors: + self.skip_test_reason = '{0} transport cannot connect'.format( + self.transport, + ) + else: + self.connected = True + + def verify_alive(self): + if self.transport: + if not self.connected: + raise SkipTest(self.skip_test_reason) + return True + + def purge_consumer(self, consumer): + return self.purge([queue.name for queue in consumer.queues]) + + def test_produce__consume(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) + producer.publish({'foo': 'bar'}, routing_key=self.prefix) + message = consumeN(self.connection, consumer) + self.assertDictEqual(message[0], {'foo': 'bar'}) + chan1.close() + self.purge([self.queue.name]) + + def test_purge(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + + producer = chan1.Producer(self.exchange) + for i in range(10): + producer.publish({'foo': 'bar'}, routing_key=self.prefix) + if self.reliable_purge: + self.assertEqual(consumer.purge(), 10) + self.assertEqual(consumer.purge(), 0) + else: + purged = 0 + while purged < 9: + purged += self.purge_consumer(consumer) + + def _digest(self, data): + return sha.new(data).hexdigest() + + def test_produce__consume_large_messages( + self, bytes=1048576, n=10, + charset=string.punctuation + string.ascii_letters + string.digits): + if not self.verify_alive(): + return + bytes = min(x for x in [bytes, self.message_size_limit] if x) + messages = [''.join(random.choice(charset) + for j in range(bytes)) + '--%s' % n + for i in range(n)] + digests = [] + chan1 = self.connection.channel() + consumer = chan1.Consumer(self.queue) + self.purge_consumer(consumer) + producer = chan1.Producer(self.exchange) + for i, message in enumerate(messages): + producer.publish({'text': message, + 'i': i}, routing_key=self.prefix) + digests.append(self._digest(message)) + + received = [(msg['i'], msg['text']) + for msg in consumeN(self.connection, consumer, n)] + self.assertEqual(len(received), n) + ordering = [i for i, _ in received] + if ordering != list(range(n)) and not self.suppress_disorder_warning: + warnings.warn( + '%s did not deliver messages in FIFO order: %r' % ( + self.transport, ordering)) + + for i, text in received: + if text != messages[i]: + raise AssertionError('%i: %r is not %r' % ( + i, text[-100:], messages[i][-100:])) + self.assertEqual(self._digest(text), digests[i]) + + chan1.close() + self.purge([self.queue.name]) + + def P(self, rest): + return '%s%s%s' % (self.prefix, self.sep, rest) + + def test_produce__consume_multiple(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + producer = chan1.Producer(self.exchange) + b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1) + b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1) + b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1) + [q.declare() for q in (b1, b2, b3)] + self.purge([b1.name, b2.name, b3.name]) + + producer.publish('b1', routing_key='b1') + producer.publish('b2', routing_key='b2') + producer.publish('b3', routing_key='b3') + chan1.close() + + chan2 = self.connection.channel() + consumer = chan2.Consumer([b1, b2, b3]) + messages = consumeN(self.connection, consumer, 3) + self.assertItemsEqual(messages, ['b1', 'b2', 'b3']) + chan2.close() + self.purge([self.P('b1'), self.P('b2'), self.P('b3')]) + + def test_timeout(self): + if not self.verify_alive(): + return + chan = self.connection.channel() + self.purge([self.queue.name]) + consumer = chan.Consumer(self.queue) + self.assertRaises( + socket.timeout, self.connection.drain_events, timeout=0.3, + ) + consumer.cancel() + chan.close() + + def test_basic_get(self): + if not self.verify_alive(): + return + chan1 = self.connection.channel() + producer = chan1.Producer(self.exchange) + chan2 = self.connection.channel() + queue = Queue(self.P('basic_get'), self.exchange, 'basic_get') + queue = queue(chan2) + queue.declare() + producer.publish({'basic.get': 'this'}, routing_key='basic_get') + chan1.close() + + for i in range(self.event_loop_max): + m = queue.get() + if m: + break + time.sleep(0.1) + self.assertEqual(m.payload, {'basic.get': 'this'}) + self.purge([queue.name]) + chan2.close() + + def test_cyclic_reference_transport(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.transport + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + + def test_cyclic_reference_connection(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.connect() + conn.close() + return weakref.ref(conn) + + self.assertIsNone(_createref()()) + + def test_cyclic_reference_channel(self): + if not self.verify_alive(): + return + + def _createref(): + conn = self.get_connection() + conn.connect() + chanrefs = [] + try: + for i in range(100): + channel = conn.channel() + chanrefs.append(weakref.ref(channel)) + channel.close() + finally: + conn.close() + return chanrefs + + for chanref in _createref(): + self.assertIsNone(chanref()) + + def tearDown(self): + if self.transport and self.connected: + self.connection.close() |