summaryrefslogtreecommitdiff
path: root/t/oldint
diff options
context:
space:
mode:
Diffstat (limited to 't/oldint')
-rw-r--r--t/oldint/__init__.py6
-rw-r--r--t/oldint/tests/__init__.py8
-rw-r--r--t/oldint/tests/test_SLMQ.py19
-rw-r--r--t/oldint/tests/test_SQS.py20
-rw-r--r--t/oldint/tests/test_amqp.py7
-rw-r--r--t/oldint/tests/test_librabbitmq.py10
-rw-r--r--t/oldint/tests/test_mongodb.py73
-rw-r--r--t/oldint/tests/test_pyamqp.py7
-rw-r--r--t/oldint/tests/test_qpid.py10
-rw-r--r--t/oldint/tests/test_redis.py19
-rw-r--r--t/oldint/tests/test_zookeeper.py14
-rw-r--r--t/oldint/transport.py294
12 files changed, 487 insertions, 0 deletions
diff --git a/t/oldint/__init__.py b/t/oldint/__init__.py
new file mode 100644
index 00000000..23ffb4c1
--- /dev/null
+++ b/t/oldint/__init__.py
@@ -0,0 +1,6 @@
+
+import os
+import sys
+
+sys.path.insert(0, os.pardir)
+sys.path.insert(0, os.getcwd())
diff --git a/t/oldint/tests/__init__.py b/t/oldint/tests/__init__.py
new file mode 100644
index 00000000..094b08e0
--- /dev/null
+++ b/t/oldint/tests/__init__.py
@@ -0,0 +1,8 @@
+
+import os
+import sys
+
+sys.path.insert(0, os.path.join(os.getcwd(), os.pardir))
+print(sys.path[0])
+sys.path.insert(0, os.getcwd())
+print(sys.path[0])
diff --git a/t/oldint/tests/test_SLMQ.py b/t/oldint/tests/test_SLMQ.py
new file mode 100644
index 00000000..8428f7d1
--- /dev/null
+++ b/t/oldint/tests/test_SLMQ.py
@@ -0,0 +1,19 @@
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_environ('SLMQ_ACCOUNT')
+@skip.unless_environ('SL_USERNAME')
+@skip.unless_environ('SL_API_KEY')
+@skip.unless_environ('SLMQ_HOST')
+@skip.unless_environ('SLMQ_SECURE')
+class test_SLMQ(transport.TransportCase):
+ transport = 'SLMQ'
+ prefix = 'slmq'
+ event_loop_max = 100
+ message_size_limit = 4192
+ reliable_purge = False
+ #: does not guarantee FIFO order, even in simple cases.
+ suppress_disorder_warning = True
diff --git a/t/oldint/tests/test_SQS.py b/t/oldint/tests/test_SQS.py
new file mode 100644
index 00000000..571cff24
--- /dev/null
+++ b/t/oldint/tests/test_SQS.py
@@ -0,0 +1,20 @@
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_environ('AWS_ACCESS_KEY_ID')
+@skip.unless_environ('AWS_SECRET_ACCESS_KEY')
+@skip.unless_module('boto')
+class test_SQS(transport.TransportCase):
+ transport = 'SQS'
+ prefix = 'sqs'
+ event_loop_max = 100
+ message_size_limit = 4192 # SQS max body size / 2.
+ reliable_purge = False
+ #: does not guarantee FIFO order, even in simple cases
+ suppress_disorder_warning = True
+
+ def after_connect(self, connection):
+ connection.channel().sqs
diff --git a/t/oldint/tests/test_amqp.py b/t/oldint/tests/test_amqp.py
new file mode 100644
index 00000000..f7aa762e
--- /dev/null
+++ b/t/oldint/tests/test_amqp.py
@@ -0,0 +1,7 @@
+
+from funtests import transport
+
+
+class test_pyamqp(transport.TransportCase):
+ transport = 'pyamqp'
+ prefix = 'pyamqp'
diff --git a/t/oldint/tests/test_librabbitmq.py b/t/oldint/tests/test_librabbitmq.py
new file mode 100644
index 00000000..41a21b3c
--- /dev/null
+++ b/t/oldint/tests/test_librabbitmq.py
@@ -0,0 +1,10 @@
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_module('librabbitmq')
+class test_librabbitmq(transport.TransportCase):
+ transport = 'librabbitmq'
+ prefix = 'librabbitmq'
diff --git a/t/oldint/tests/test_mongodb.py b/t/oldint/tests/test_mongodb.py
new file mode 100644
index 00000000..495208c7
--- /dev/null
+++ b/t/oldint/tests/test_mongodb.py
@@ -0,0 +1,73 @@
+from kombu import Consumer, Producer, Exchange, Queue
+from kombu.utils.compat import nested
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_module('pymongo')
+class test_mongodb(transport.TransportCase):
+ transport = 'mongodb'
+ prefix = 'mongodb'
+ event_loop_max = 100
+
+ def after_connect(self, connection):
+ connection.channel().client # evaluate connection.
+ self.c = self.connection # shortcut
+
+ def test_fanout(self, name='test_mongodb_fanout'):
+ if not self.verify_alive():
+ return
+ c = self.connection
+ self.e = Exchange(name, type='fanout')
+ self.q = Queue(name, exchange=self.e, routing_key=name)
+ self.q2 = Queue(name + '2', exchange=self.e, routing_key=name + '2')
+
+ channel = c.default_channel
+ producer = Producer(channel, self.e)
+ consumer1 = Consumer(channel, self.q)
+ consumer2 = Consumer(channel, self.q2)
+ self.q2(channel).declare()
+
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key=name)
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key=name + '2')
+
+ _received1 = []
+ _received2 = []
+
+ def callback1(message_data, message):
+ _received1.append(message)
+ message.ack()
+
+ def callback2(message_data, message):
+ _received2.append(message)
+ message.ack()
+
+ consumer1.register_callback(callback1)
+ consumer2.register_callback(callback2)
+
+ with nested(consumer1, consumer2):
+
+ while 1:
+ if len(_received1) + len(_received2) == 20:
+ break
+ c.drain_events(timeout=60)
+ self.assertEqual(len(_received1) + len(_received2), 20)
+
+ # queue.delete
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key=name)
+ self.assertTrue(self.q(channel).get())
+ self.q(channel).delete()
+ self.q(channel).declare()
+ self.assertIsNone(self.q(channel).get())
+
+ # queue.purge
+ for i in range(10):
+ producer.publish({'foo': i}, routing_key=name + '2')
+ self.assertTrue(self.q2(channel).get())
+ self.q2(channel).purge()
+ self.assertIsNone(self.q2(channel).get())
diff --git a/t/oldint/tests/test_pyamqp.py b/t/oldint/tests/test_pyamqp.py
new file mode 100644
index 00000000..f7aa762e
--- /dev/null
+++ b/t/oldint/tests/test_pyamqp.py
@@ -0,0 +1,7 @@
+
+from funtests import transport
+
+
+class test_pyamqp(transport.TransportCase):
+ transport = 'pyamqp'
+ prefix = 'pyamqp'
diff --git a/t/oldint/tests/test_qpid.py b/t/oldint/tests/test_qpid.py
new file mode 100644
index 00000000..a5c9141d
--- /dev/null
+++ b/t/oldint/tests/test_qpid.py
@@ -0,0 +1,10 @@
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_module('qpid.messaging')
+class test_qpid(transport.TransportCase):
+ transport = 'qpid'
+ prefix = 'qpid'
diff --git a/t/oldint/tests/test_redis.py b/t/oldint/tests/test_redis.py
new file mode 100644
index 00000000..610b0149
--- /dev/null
+++ b/t/oldint/tests/test_redis.py
@@ -0,0 +1,19 @@
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_module('redis')
+class test_redis(transport.TransportCase):
+ transport = 'redis'
+ prefix = 'redis'
+
+ def after_connect(self, connection):
+ client = connection.channel().client
+ client.info()
+
+ def test_cannot_connect_raises_connection_error(self):
+ conn = self.get_connection(port=65534)
+ with self.assertRaises(conn.connection_errors):
+ conn.connect()
diff --git a/t/oldint/tests/test_zookeeper.py b/t/oldint/tests/test_zookeeper.py
new file mode 100644
index 00000000..150c4a35
--- /dev/null
+++ b/t/oldint/tests/test_zookeeper.py
@@ -0,0 +1,14 @@
+
+from funtests import transport
+
+from kombu.tests.case import skip
+
+
+@skip.unless_module('kazoo')
+class test_zookeeper(transport.TransportCase):
+ transport = 'zookeeper'
+ prefix = 'zookeeper'
+ event_loop_max = 100
+
+ def after_connect(self, connection):
+ connection.channel().client
diff --git a/t/oldint/transport.py b/t/oldint/transport.py
new file mode 100644
index 00000000..36b4d33f
--- /dev/null
+++ b/t/oldint/transport.py
@@ -0,0 +1,294 @@
+import random
+import sha
+import socket
+import string
+import time
+import unittest2 as unittest
+import warnings
+import weakref
+from case.skip import SkipTest
+from kombu import Connection
+from kombu import Exchange, Queue
+
+
+def consumeN(conn, consumer, n=1, timeout=30):
+ messages = []
+
+ def callback(message_data, message):
+ messages.append(message_data)
+ message.ack()
+
+ prev, consumer.callbacks = consumer.callbacks, [callback]
+ consumer.consume()
+
+ seconds = 0
+ while True:
+ try:
+ conn.drain_events(timeout=1)
+ except socket.timeout:
+ seconds += 1
+ msg = 'Received %s/%s messages. %s seconds passed.' % (
+ len(messages), n, seconds)
+ if seconds >= timeout:
+ raise socket.timeout(msg)
+ if seconds > 1:
+ print(msg)
+ if len(messages) >= n:
+ break
+
+ consumer.cancel()
+ consumer.callback = prev
+ return messages
+
+
+class TransportCase(unittest.TestCase):
+ transport = None
+ prefix = None
+ sep = '.'
+ userid = None
+ password = None
+ event_loop_max = 100
+ connection_options = {}
+ suppress_disorder_warning = False
+ reliable_purge = True
+
+ connected = False
+ skip_test_reason = None
+
+ message_size_limit = None
+
+ def before_connect(self):
+ ...
+
+ def after_connect(self, connection):
+ ...
+
+ def setUp(self):
+ if self.transport:
+ try:
+ self.before_connect()
+ except SkipTest as exc:
+ self.skip_test_reason = str(exc)
+ else:
+ self.do_connect()
+ self.exchange = Exchange(self.prefix, 'direct')
+ self.queue = Queue(self.prefix, self.exchange, self.prefix)
+
+ def purge(self, names):
+ chan = self.connection.channel()
+ total = 0
+ for queue in names:
+ while 1:
+ # ensure the queue is completly empty
+ purged = chan.queue_purge(queue=queue)
+ if not purged:
+ break
+ total += purged
+ chan.close()
+ return total
+
+ def get_connection(self, **options):
+ if self.userid:
+ options.setdefault('userid', self.userid)
+ if self.password:
+ options.setdefault('password', self.password)
+ return Connection(transport=self.transport, **options)
+
+ def do_connect(self):
+ self.connection = self.get_connection(**self.connection_options)
+ try:
+ self.connection.connect()
+ self.after_connect(self.connection)
+ except self.connection.connection_errors:
+ self.skip_test_reason = '{0} transport cannot connect'.format(
+ self.transport,
+ )
+ else:
+ self.connected = True
+
+ def verify_alive(self):
+ if self.transport:
+ if not self.connected:
+ raise SkipTest(self.skip_test_reason)
+ return True
+
+ def purge_consumer(self, consumer):
+ return self.purge([queue.name for queue in consumer.queues])
+
+ def test_produce__consume(self):
+ if not self.verify_alive():
+ return
+ chan1 = self.connection.channel()
+ consumer = chan1.Consumer(self.queue)
+ self.purge_consumer(consumer)
+ producer = chan1.Producer(self.exchange)
+ producer.publish({'foo': 'bar'}, routing_key=self.prefix)
+ message = consumeN(self.connection, consumer)
+ self.assertDictEqual(message[0], {'foo': 'bar'})
+ chan1.close()
+ self.purge([self.queue.name])
+
+ def test_purge(self):
+ if not self.verify_alive():
+ return
+ chan1 = self.connection.channel()
+ consumer = chan1.Consumer(self.queue)
+ self.purge_consumer(consumer)
+
+ producer = chan1.Producer(self.exchange)
+ for i in range(10):
+ producer.publish({'foo': 'bar'}, routing_key=self.prefix)
+ if self.reliable_purge:
+ self.assertEqual(consumer.purge(), 10)
+ self.assertEqual(consumer.purge(), 0)
+ else:
+ purged = 0
+ while purged < 9:
+ purged += self.purge_consumer(consumer)
+
+ def _digest(self, data):
+ return sha.new(data).hexdigest()
+
+ def test_produce__consume_large_messages(
+ self, bytes=1048576, n=10,
+ charset=string.punctuation + string.ascii_letters + string.digits):
+ if not self.verify_alive():
+ return
+ bytes = min(x for x in [bytes, self.message_size_limit] if x)
+ messages = [''.join(random.choice(charset)
+ for j in range(bytes)) + '--%s' % n
+ for i in range(n)]
+ digests = []
+ chan1 = self.connection.channel()
+ consumer = chan1.Consumer(self.queue)
+ self.purge_consumer(consumer)
+ producer = chan1.Producer(self.exchange)
+ for i, message in enumerate(messages):
+ producer.publish({'text': message,
+ 'i': i}, routing_key=self.prefix)
+ digests.append(self._digest(message))
+
+ received = [(msg['i'], msg['text'])
+ for msg in consumeN(self.connection, consumer, n)]
+ self.assertEqual(len(received), n)
+ ordering = [i for i, _ in received]
+ if ordering != list(range(n)) and not self.suppress_disorder_warning:
+ warnings.warn(
+ '%s did not deliver messages in FIFO order: %r' % (
+ self.transport, ordering))
+
+ for i, text in received:
+ if text != messages[i]:
+ raise AssertionError('%i: %r is not %r' % (
+ i, text[-100:], messages[i][-100:]))
+ self.assertEqual(self._digest(text), digests[i])
+
+ chan1.close()
+ self.purge([self.queue.name])
+
+ def P(self, rest):
+ return '%s%s%s' % (self.prefix, self.sep, rest)
+
+ def test_produce__consume_multiple(self):
+ if not self.verify_alive():
+ return
+ chan1 = self.connection.channel()
+ producer = chan1.Producer(self.exchange)
+ b1 = Queue(self.P('b1'), self.exchange, 'b1')(chan1)
+ b2 = Queue(self.P('b2'), self.exchange, 'b2')(chan1)
+ b3 = Queue(self.P('b3'), self.exchange, 'b3')(chan1)
+ [q.declare() for q in (b1, b2, b3)]
+ self.purge([b1.name, b2.name, b3.name])
+
+ producer.publish('b1', routing_key='b1')
+ producer.publish('b2', routing_key='b2')
+ producer.publish('b3', routing_key='b3')
+ chan1.close()
+
+ chan2 = self.connection.channel()
+ consumer = chan2.Consumer([b1, b2, b3])
+ messages = consumeN(self.connection, consumer, 3)
+ self.assertItemsEqual(messages, ['b1', 'b2', 'b3'])
+ chan2.close()
+ self.purge([self.P('b1'), self.P('b2'), self.P('b3')])
+
+ def test_timeout(self):
+ if not self.verify_alive():
+ return
+ chan = self.connection.channel()
+ self.purge([self.queue.name])
+ consumer = chan.Consumer(self.queue)
+ self.assertRaises(
+ socket.timeout, self.connection.drain_events, timeout=0.3,
+ )
+ consumer.cancel()
+ chan.close()
+
+ def test_basic_get(self):
+ if not self.verify_alive():
+ return
+ chan1 = self.connection.channel()
+ producer = chan1.Producer(self.exchange)
+ chan2 = self.connection.channel()
+ queue = Queue(self.P('basic_get'), self.exchange, 'basic_get')
+ queue = queue(chan2)
+ queue.declare()
+ producer.publish({'basic.get': 'this'}, routing_key='basic_get')
+ chan1.close()
+
+ for i in range(self.event_loop_max):
+ m = queue.get()
+ if m:
+ break
+ time.sleep(0.1)
+ self.assertEqual(m.payload, {'basic.get': 'this'})
+ self.purge([queue.name])
+ chan2.close()
+
+ def test_cyclic_reference_transport(self):
+ if not self.verify_alive():
+ return
+
+ def _createref():
+ conn = self.get_connection()
+ conn.transport
+ conn.close()
+ return weakref.ref(conn)
+
+ self.assertIsNone(_createref()())
+
+ def test_cyclic_reference_connection(self):
+ if not self.verify_alive():
+ return
+
+ def _createref():
+ conn = self.get_connection()
+ conn.connect()
+ conn.close()
+ return weakref.ref(conn)
+
+ self.assertIsNone(_createref()())
+
+ def test_cyclic_reference_channel(self):
+ if not self.verify_alive():
+ return
+
+ def _createref():
+ conn = self.get_connection()
+ conn.connect()
+ chanrefs = []
+ try:
+ for i in range(100):
+ channel = conn.channel()
+ chanrefs.append(weakref.ref(channel))
+ channel.close()
+ finally:
+ conn.close()
+ return chanrefs
+
+ for chanref in _createref():
+ self.assertIsNone(chanref())
+
+ def tearDown(self):
+ if self.transport and self.connected:
+ self.connection.close()