summaryrefslogtreecommitdiff
path: root/kombu
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2013-04-30 13:19:35 +0100
committerAsk Solem <ask@celeryproject.org>2013-04-30 13:19:35 +0100
commitfee11d1ea9733a266a5b845872df13e6166804ac (patch)
tree6e2ef6d48462fee51b2d57d162cdd14fc01d7bc3 /kombu
parentefa14fc7d423ea00f4ec7662a26279a225e33a9b (diff)
parent778143b34b57feec469a26fd0976b49e25a8150f (diff)
downloadkombu-fee11d1ea9733a266a5b845872df13e6166804ac.tar.gz
Merge branch '2.5'
Conflicts: kombu/pidbox.py kombu/utils/__init__.py kombu/utils/eventio.py
Diffstat (limited to 'kombu')
-rw-r--r--kombu/abstract.py4
-rw-r--r--kombu/connection.py62
-rw-r--r--kombu/pidbox.py13
-rw-r--r--kombu/tests/test_entities.py1
-rw-r--r--kombu/tests/test_utils.py4
-rw-r--r--kombu/transport/librabbitmq.py15
-rw-r--r--kombu/utils/__init__.py2
-rw-r--r--kombu/utils/eventio.py7
8 files changed, 78 insertions, 30 deletions
diff --git a/kombu/abstract.py b/kombu/abstract.py
index b0143c80..6dff8486 100644
--- a/kombu/abstract.py
+++ b/kombu/abstract.py
@@ -1,6 +1,6 @@
"""
-kombu.compression
-=================
+kombu.abstract
+==============
Object utilities.
diff --git a/kombu/connection.py b/kombu/connection.py
index b291f1ef..7c4e2e55 100644
--- a/kombu/connection.py
+++ b/kombu/connection.py
@@ -329,6 +329,29 @@ class Connection(object):
self._debug('closed')
self._closed = True
+ def collect(self, socket_timeout=None):
+ # amqp requires communication to close, we don't need that just
+ # to clear out references, Transport._collect can also be implemented
+ # by other transports that want fast after fork
+ try:
+ gc_transport = self._transport._collect
+ except AttributeError:
+ _timeo = socket.getdefaulttimeout()
+ socket.setdefaulttimeout(socket_timeout)
+ try:
+ self._close()
+ except socket.timeout:
+ pass
+ finally:
+ socket.setdefaulttimeout(_timeo)
+ else:
+ gc_transport(self._connection)
+ if self._transport:
+ self._transport.client = None
+ self._transport = None
+ self.declared_entities.clear()
+ self._connection = None
+
def release(self):
"""Close the connection (if open)."""
self._close()
@@ -914,6 +937,9 @@ class Resource(object):
else:
self.close_resource(resource)
+ def collect_resource(self, resource):
+ pass
+
def force_close_all(self):
"""Closes and removes all resources in the pool (also those in use).
@@ -923,32 +949,27 @@ class Resource(object):
"""
dirty = self._dirty
resource = self._resource
- while 1:
+ while 1: # - acquired
try:
dres = dirty.pop()
except KeyError:
break
try:
- self.close_resource(dres)
+ self.collect_resource(dres)
except AttributeError: # Issue #78
pass
-
- mutex = getattr(resource, 'mutex', None)
- if mutex:
- mutex.acquire()
- try:
- while 1:
- try:
- res = resource.queue.pop()
- except IndexError:
- break
- try:
- self.close_resource(res)
- except AttributeError:
- pass # Issue #78
- finally:
- if mutex: # pragma: no cover
- mutex.release()
+ while 1: # - available
+ # deque supports '.clear', but lists do not, so for that
+ # reason we use pop here, so that the underlying object can
+ # be any object supporting '.pop' and '.append'.
+ try:
+ res = resource.queue.pop()
+ except IndexError:
+ break
+ try:
+ self.collect_resource(res)
+ except AttributeError:
+ pass # Issue #78
if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover
_orig_acquire = acquire
@@ -997,6 +1018,9 @@ class ConnectionPool(Resource):
def close_resource(self, resource):
resource._close()
+ def collect_resource(self, resource, socket_timeout=0.1):
+ return resource.collect(socket_timeout)
+
@contextmanager
def acquire_channel(self, block=False):
with self.acquire(block=block) as connection:
diff --git a/kombu/pidbox.py b/kombu/pidbox.py
index ada35116..949754bf 100644
--- a/kombu/pidbox.py
+++ b/kombu/pidbox.py
@@ -19,6 +19,7 @@ from time import time
from . import Exchange, Queue, Consumer, Producer
from .clocks import LamportClock
from .common import maybe_declare, oid_from
+from .exceptions import InconsistencyError
from .five import range
from .utils import cached_property, kwdict, uuid
@@ -215,9 +216,15 @@ class Mailbox(object):
delivery_mode='transient',
durable=False)
producer = Producer(chan, auto_declare=False)
- producer.publish(reply, exchange=exchange, routing_key=routing_key,
- declare=[exchange], headers={
- 'ticket': ticket, 'clock': self.clock.forward()})
+ try:
+ producer.publish(
+ reply, exchange=exchange, routing_key=routing_key,
+ declare=[exchange], headers={
+ 'ticket': ticket, 'clock': self.clock.forward(),
+ },
+ )
+ except InconsistencyError:
+ pass # queue probably deleted and no one is expecting a reply.
def _publish(self, type, arguments, destination=None,
reply_ticket=None, channel=None, timeout=None):
diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py
index 20f4446e..bc4a79c1 100644
--- a/kombu/tests/test_entities.py
+++ b/kombu/tests/test_entities.py
@@ -15,6 +15,7 @@ from .utils import Mock
def get_conn():
return Connection(transport=Transport)
+
class test_binding(TestCase):
def test_constructor(self):
diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py
index fcc03aab..389f84b7 100644
--- a/kombu/tests/test_utils.py
+++ b/kombu/tests/test_utils.py
@@ -216,7 +216,7 @@ class test_retry_over_time(TestCase):
self.myfun, self.Predicate,
max_retries=1, errback=self.errback, interval_max=14,
)
- self.assertEqual(self.index, 2)
+ self.assertEqual(self.index, 1)
# no errback
self.assertRaises(
self.Predicate, utils.retry_over_time,
@@ -231,7 +231,7 @@ class test_retry_over_time(TestCase):
self.myfun, self.Predicate,
max_retries=0, errback=self.errback, interval_max=14,
)
- self.assertEqual(self.index, 1)
+ self.assertEqual(self.index, 0)
class test_cached_property(TestCase):
diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py
index ee6246c1..4ada8a55 100644
--- a/kombu/transport/librabbitmq.py
+++ b/kombu/transport/librabbitmq.py
@@ -9,6 +9,7 @@ kombu.transport.librabbitmq
"""
from __future__ import absolute_import
+import os
import socket
try:
@@ -119,8 +120,22 @@ class Transport(base.Transport):
def close_connection(self, connection):
"""Close the AMQP broker connection."""
+ self.client.drain_events = None
connection.close()
+ def _collect(self, connection):
+ if connection is not None:
+ for channel in connection.channels.itervalues():
+ channel.connection = None
+ try:
+ os.close(connection.fileno())
+ except OSError:
+ pass
+ connection.channels.clear()
+ connection.callbacks.clear()
+ self.client.drain_events = None
+ self.client = None
+
def verify_connection(self, connection):
return connection.connected
diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py
index 55cead1c..7c4acbe7 100644
--- a/kombu/utils/__init__.py
+++ b/kombu/utils/__init__.py
@@ -219,7 +219,7 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
try:
return fun(*args, **kwargs)
except catch as exc:
- if max_retries is not None and retries > max_retries:
+ if max_retries is not None and retries >= max_retries:
raise
if callback:
callback()
diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py
index 67aef059..4343892f 100644
--- a/kombu/utils/eventio.py
+++ b/kombu/utils/eventio.py
@@ -198,7 +198,7 @@ class _select(Poller):
for fd in self._rfd | self._wfd | self._efd:
try:
_selectf([fd], [], [], 0)
- except _selecterr as exc:
+ except (_selecterr, socket.error) as exc:
if get_errno(exc) in SELECT_BAD_FD:
self.unregister(fd)
@@ -212,11 +212,12 @@ class _select(Poller):
read, write, error = _selectf(
self._rfd, self._wfd, self._efd, timeout,
)
- except _selecterr as exc:
+ except (_selecterr, socket.error) as exc:
if get_errno(exc) == errno.EINTR:
return
elif get_errno(exc) in SELECT_BAD_FD:
- self._remove_bad()
+ return self._remove_bad()
+ raise
events = {}
for fd in read: