summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-11-03 15:11:28 +0000
committerAsk Solem <ask@celeryproject.org>2011-11-03 15:11:28 +0000
commita7a7f21bc520bc2c99f5b09722290b0e43436fba (patch)
treea17c4495676d523ac094c7a712902962c1c3bbdb
parente9bfde76ac6757f5ef12b16f48699b0dcc45b32f (diff)
parentf3be023b3c0ce53cbb473754e51fd2a092507194 (diff)
downloadkombu-a7a7f21bc520bc2c99f5b09722290b0e43436fba.tar.gz
Merge branch 'toofishes/connection-tests-changes'
-rw-r--r--kombu/connection.py20
-rw-r--r--kombu/tests/test_connection.py62
-rw-r--r--kombu/tests/test_utils.py69
-rw-r--r--kombu/utils/__init__.py2
4 files changed, 105 insertions, 48 deletions
diff --git a/kombu/connection.py b/kombu/connection.py
index 5719e536..d02a63c9 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -48,7 +48,7 @@ def parse_url(url):
hostname, _, port = partition(netloc, ':')
path = parts.path or ""
if path and path[0] == '/':
- path = path[path.index('/') + 1:]
+ path = path[1:]
return dict({"hostname": hostname,
"port": port and int(port) or None,
"userid": userid or None,
@@ -267,10 +267,8 @@ class BrokerConnection(object):
"""
- max_retries = max_retries or 0
-
@wraps(fun)
- def _insured(*args, **kwargs):
+ def _ensured(*args, **kwargs):
got_connection = 0
for retries in count(0):
try:
@@ -278,14 +276,16 @@ class BrokerConnection(object):
except self.connection_errors + self.channel_errors, exc:
self._debug("ensure got exception: %r" % (exc, ),
exc_info=sys.exc_info())
- if got_connection or \
- max_retries and retries > max_retries:
+ if got_connection:
+ raise
+ if max_retries is not None and retries > max_retries:
raise
errback and errback(exc, 0)
self._connection = None
self.close()
- remaining_retries = max_retries and \
- max(max_retries - retries, 1)
+ remaining_retries = None
+ if max_retries is not None:
+ remaining_retries = max(max_retries - retries, 1)
self.ensure_connection(errback,
remaining_retries,
interval_start,
@@ -298,8 +298,8 @@ class BrokerConnection(object):
on_revive(new_channel)
got_connection += 1
- _insured.func_name = _insured.__name__ = "%s(insured)" % fun.__name__
- return _insured
+ _ensured.func_name = _ensured.__name__ = "%s(ensured)" % fun.__name__
+ return _ensured
def autoretry(self, fun, channel=None, **ensure_options):
"""Decorator for functions supporting a ``channel`` keyword argument.
diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py
index b01fbd44..d5bd3dce 100644
--- a/kombu/tests/test_connection.py
+++ b/kombu/tests/test_connection.py
@@ -1,11 +1,43 @@
import pickle
from kombu.tests.utils import unittest
-from kombu.connection import BrokerConnection, Resource
+from kombu.connection import BrokerConnection, Resource, parse_url
from kombu.tests.mocks import Transport
+class test_connection_utils(unittest.TestCase):
+
+ def setUp(self):
+ self.url = "amqp://user:pass@localhost:5672/my/vhost"
+ self.nopass = "amqp://user@localhost:5672/my/vhost"
+ self.expected = {
+ "transport": "amqp",
+ "userid": "user",
+ "password": "pass",
+ "hostname": "localhost",
+ "port": 5672,
+ "virtual_host": "my/vhost",
+ }
+
+ def test_parse_url(self):
+ result = parse_url(self.url)
+ self.assertDictEqual(result, self.expected)
+
+ def test_parse_generated_as_uri(self):
+ conn = BrokerConnection(self.url)
+ info = conn.info()
+ for k, v in self.expected.items():
+ self.assertEqual(v, self.expected[k])
+ # by default almost the same- no password
+ self.assertEqual(conn.as_uri(), self.nopass)
+ self.assertEqual(conn.as_uri(include_password=True), self.url)
+
+ def test_bogus_scheme(self):
+ conn = BrokerConnection("bogus://localhost:7421")
+ # second parameter must be a callable, thus this little hack
+ self.assertRaises(KeyError, lambda: conn.transport)
+
class test_Connection(unittest.TestCase):
def setUp(self):
@@ -53,6 +85,34 @@ class test_Connection(unittest.TestCase):
def test_ensure_connection(self):
self.assertTrue(self.conn.ensure_connection())
+ def test_ensure_success(self):
+ def publish():
+ return "foobar"
+
+ ensured = self.conn.ensure(None, publish)
+ self.assertEqual(ensured(), "foobar")
+
+ def test_ensure_failure(self):
+ class _CustomError(Exception):
+ pass
+
+ def publish():
+ raise _CustomError("bar")
+
+ ensured = self.conn.ensure(None, publish)
+ self.assertRaises(_CustomError, ensured)
+
+ def test_ensure_connection_failure(self):
+ class _ConnectionError(Exception):
+ pass
+
+ def publish():
+ raise _ConnectionError("failed connection")
+
+ self.conn.transport.connection_errors = (_ConnectionError,)
+ ensured = self.conn.ensure(self.conn, publish)
+ self.assertRaises(_ConnectionError, ensured)
+
def test_SimpleQueue(self):
conn = self.conn
q = conn.SimpleQueue("foo")
diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py
index 5daf75a2..7c170ddc 100644
--- a/kombu/tests/test_utils.py
+++ b/kombu/tests/test_utils.py
@@ -161,17 +161,12 @@ class test_emergency_dump_state(unittest.TestCase):
self.assertFalse(stdout.getvalue())
-_tried_to_sleep = [None]
-
-
def insomnia(fun):
@wraps(fun)
def _inner(*args, **kwargs):
- _tried_to_sleep[0] = None
-
def mysleep(i):
- _tried_to_sleep[0] = i
+ pass
prev_sleep = utils.sleep
utils.sleep = mysleep
@@ -185,37 +180,39 @@ def insomnia(fun):
class test_retry_over_time(unittest.TestCase):
+ def setUp(self):
+ self.index = 0
+
+ class Predicate(Exception):
+ pass
+
+ def myfun(self):
+ if self.index < 9:
+ raise self.Predicate()
+ return 42
+
+ def errback(self, exc, interval):
+ sleepvals = (None, 2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 16.0)
+ self.index += 1
+ self.assertEqual(interval, sleepvals[self.index])
+
@insomnia
def test_simple(self):
- index = [0]
+ x = utils.retry_over_time(self.myfun, self.Predicate,
+ errback=self.errback, interval_max=14)
+ self.assertEqual(x, 42)
+ self.assertEqual(self.index, 9)
- class Predicate(Exception):
- pass
+ @insomnia
+ def test_retry_once(self):
+ self.assertRaises(self.Predicate, utils.retry_over_time,
+ self.myfun, self.Predicate,
+ max_retries=1, errback=self.errback, interval_max=14)
+ self.assertEqual(self.index, 2)
- def myfun():
- sleepvals = {0: None,
- 1: 2.0,
- 2: 4.0,
- 3: 6.0,
- 4: 8.0,
- 5: 10.0,
- 6: 12.0,
- 7: 14.0,
- 8: 16.0,
- 9: 16.0}
- self.assertEqual(_tried_to_sleep[0], sleepvals[index[0]])
- if index[0] < 9:
- raise Predicate()
- return 42
-
- def errback(exc, interval):
- index[0] += 1
-
- x = utils.retry_over_time(myfun, Predicate, errback=errback,
- interval_max=14)
- self.assertEqual(x, 42)
- _tried_to_sleep[0] = None
- index[0] = 0
- self.assertRaises(Predicate,
- utils.retry_over_time, myfun, Predicate,
- max_retries=1, errback=errback, interval_max=14)
+ @insomnia
+ def test_retry_never(self):
+ self.assertRaises(self.Predicate, utils.retry_over_time,
+ self.myfun, self.Predicate,
+ max_retries=0, errback=self.errback, interval_max=14)
+ self.assertEqual(self.index, 1)
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index c8db389c..3d64e71d 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -142,7 +142,7 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
try:
return fun(*args, **kwargs)
except catch, exc:
- if max_retries and retries > max_retries:
+ if max_retries is not None and retries > max_retries:
raise
if errback:
errback(exc, interval)