summaryrefslogtreecommitdiff
path: root/kombu/tests/test_connection.py
diff options
context:
space:
mode:
authorAsk Solem <askh@opera.com>2010-11-09 15:27:43 +0100
committerAsk Solem <askh@opera.com>2010-11-09 15:27:43 +0100
commit402260bf9f3c39986d2fa611758b7d20ddfef9fb (patch)
tree4a27adf0f5f64ecbb62cb5a6df0e04e17ae3db11 /kombu/tests/test_connection.py
parent9b103e43b6ad84310a796d28a623f2c62713fd15 (diff)
downloadkombu-402260bf9f3c39986d2fa611758b7d20ddfef9fb.tar.gz
94% coverage of the core library, but still missing coverage of compat, pidbox, and functional tests for concrete transports beanstalk, redis, couchdb, pika and mongodb
Diffstat (limited to 'kombu/tests/test_connection.py')
-rw-r--r--kombu/tests/test_connection.py76
1 files changed, 73 insertions, 3 deletions
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index 00f62e7f..a6118579 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -1,14 +1,18 @@
+import pickle
import unittest2 as unittest
-from kombu.connection import BrokerConnection
+from kombu.connection import BrokerConnection, Resource
from kombu.tests.mocks import Transport
class test_Connection(unittest.TestCase):
+ def setUp(self):
+ self.conn = BrokerConnection(port=5672, transport=Transport)
+
def test_establish_connection(self):
- conn = BrokerConnection(port=5672, transport=Transport)
+ conn = self.conn
conn.connect()
self.assertTrue(conn.connection.connected)
self.assertEqual(conn.host, "localhost:5672")
@@ -21,7 +25,7 @@ class test_Connection(unittest.TestCase):
self.assertIsInstance(conn.transport, Transport)
def test__enter____exit__(self):
- conn = BrokerConnection(transport=Transport)
+ conn = self.conn
context = conn.__enter__()
self.assertIs(context, conn)
conn.connect()
@@ -30,6 +34,68 @@ class test_Connection(unittest.TestCase):
self.assertIsNone(conn.connection)
conn.close() # again
+ def test_close_survives_connerror(self):
+
+ class _CustomError(Exception):
+ pass
+
+ class MyTransport(Transport):
+ connection_errors = (_CustomError, )
+
+ def close_connection(self, connection):
+ raise _CustomError("foo")
+
+ conn = BrokerConnection(transport=MyTransport)
+ conn.connect()
+ conn.close()
+ self.assertTrue(conn._closed)
+
+ def test_ensure_connection(self):
+ self.assertTrue(self.conn.ensure_connection())
+
+ def test_SimpleQueue(self):
+ conn = self.conn
+ q = conn.SimpleQueue("foo")
+ self.assertTrue(q.channel)
+ self.assertTrue(q.channel_autoclose)
+ chan = conn.channel()
+ q2 = conn.SimpleQueue("foo", channel=chan)
+ self.assertIs(q2.channel, chan)
+ self.assertFalse(q2.channel_autoclose)
+
+ def test_SimpleBuffer(self):
+ conn = self.conn
+ q = conn.SimpleBuffer("foo")
+ self.assertTrue(q.channel)
+ self.assertTrue(q.channel_autoclose)
+ chan = conn.channel()
+ q2 = conn.SimpleBuffer("foo", channel=chan)
+ self.assertIs(q2.channel, chan)
+ self.assertFalse(q2.channel_autoclose)
+
+ def test__repr__(self):
+ self.assertTrue(repr(self.conn))
+
+ def test__reduce__(self):
+ x = pickle.loads(pickle.dumps(self.conn))
+ self.assertDictEqual(x.info(), self.conn.info())
+
+ def test_channel_errors(self):
+
+ class MyTransport(Transport):
+ channel_errors = (KeyError, ValueError)
+
+ conn = BrokerConnection(transport=MyTransport)
+ self.assertTupleEqual(conn.channel_errors, (KeyError, ValueError))
+
+ def test_connection_errors(self):
+
+ class MyTransport(Transport):
+ connection_errors = (KeyError, ValueError)
+
+ conn = BrokerConnection(transport=MyTransport)
+ self.assertTupleEqual(conn.connection_errors, (KeyError, ValueError))
+
class ResourceCase(unittest.TestCase):
abstract = True
@@ -41,6 +107,10 @@ class ResourceCase(unittest.TestCase):
self.assertEqual(P._resource.qsize(), avail)
self.assertEqual(len(P._dirty), dirty)
+ def test_setup(self):
+ if self.abstract:
+ self.assertRaises(NotImplementedError, Resource)
+
def test_acquire__release(self):
if self.abstract:
return