diff options
author | Ask Solem <ask@celeryproject.org> | 2013-04-30 13:19:35 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2013-04-30 13:19:35 +0100 |
commit | fee11d1ea9733a266a5b845872df13e6166804ac (patch) | |
tree | 6e2ef6d48462fee51b2d57d162cdd14fc01d7bc3 /kombu | |
parent | efa14fc7d423ea00f4ec7662a26279a225e33a9b (diff) | |
parent | 778143b34b57feec469a26fd0976b49e25a8150f (diff) | |
download | kombu-fee11d1ea9733a266a5b845872df13e6166804ac.tar.gz |
Merge branch '2.5'
Conflicts:
kombu/pidbox.py
kombu/utils/__init__.py
kombu/utils/eventio.py
Diffstat (limited to 'kombu')
-rw-r--r-- | kombu/abstract.py | 4 | ||||
-rw-r--r-- | kombu/connection.py | 62 | ||||
-rw-r--r-- | kombu/pidbox.py | 13 | ||||
-rw-r--r-- | kombu/tests/test_entities.py | 1 | ||||
-rw-r--r-- | kombu/tests/test_utils.py | 4 | ||||
-rw-r--r-- | kombu/transport/librabbitmq.py | 15 | ||||
-rw-r--r-- | kombu/utils/__init__.py | 2 | ||||
-rw-r--r-- | kombu/utils/eventio.py | 7 |
8 files changed, 78 insertions, 30 deletions
diff --git a/kombu/abstract.py b/kombu/abstract.py index b0143c80..6dff8486 100644 --- a/kombu/abstract.py +++ b/kombu/abstract.py @@ -1,6 +1,6 @@ """ -kombu.compression -================= +kombu.abstract +============== Object utilities. diff --git a/kombu/connection.py b/kombu/connection.py index b291f1ef..7c4e2e55 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -329,6 +329,29 @@ class Connection(object): self._debug('closed') self._closed = True + def collect(self, socket_timeout=None): + # amqp requires communication to close, we don't need that just + # to clear out references, Transport._collect can also be implemented + # by other transports that want fast after fork + try: + gc_transport = self._transport._collect + except AttributeError: + _timeo = socket.getdefaulttimeout() + socket.setdefaulttimeout(socket_timeout) + try: + self._close() + except socket.timeout: + pass + finally: + socket.setdefaulttimeout(_timeo) + else: + gc_transport(self._connection) + if self._transport: + self._transport.client = None + self._transport = None + self.declared_entities.clear() + self._connection = None + def release(self): """Close the connection (if open).""" self._close() @@ -914,6 +937,9 @@ class Resource(object): else: self.close_resource(resource) + def collect_resource(self, resource): + pass + def force_close_all(self): """Closes and removes all resources in the pool (also those in use). @@ -923,32 +949,27 @@ class Resource(object): """ dirty = self._dirty resource = self._resource - while 1: + while 1: # - acquired try: dres = dirty.pop() except KeyError: break try: - self.close_resource(dres) + self.collect_resource(dres) except AttributeError: # Issue #78 pass - - mutex = getattr(resource, 'mutex', None) - if mutex: - mutex.acquire() - try: - while 1: - try: - res = resource.queue.pop() - except IndexError: - break - try: - self.close_resource(res) - except AttributeError: - pass # Issue #78 - finally: - if mutex: # pragma: no cover - mutex.release() + while 1: # - available + # deque supports '.clear', but lists do not, so for that + # reason we use pop here, so that the underlying object can + # be any object supporting '.pop' and '.append'. + try: + res = resource.queue.pop() + except IndexError: + break + try: + self.collect_resource(res) + except AttributeError: + pass # Issue #78 if os.environ.get('KOMBU_DEBUG_POOL'): # pragma: no cover _orig_acquire = acquire @@ -997,6 +1018,9 @@ class ConnectionPool(Resource): def close_resource(self, resource): resource._close() + def collect_resource(self, resource, socket_timeout=0.1): + return resource.collect(socket_timeout) + @contextmanager def acquire_channel(self, block=False): with self.acquire(block=block) as connection: diff --git a/kombu/pidbox.py b/kombu/pidbox.py index ada35116..949754bf 100644 --- a/kombu/pidbox.py +++ b/kombu/pidbox.py @@ -19,6 +19,7 @@ from time import time from . import Exchange, Queue, Consumer, Producer from .clocks import LamportClock from .common import maybe_declare, oid_from +from .exceptions import InconsistencyError from .five import range from .utils import cached_property, kwdict, uuid @@ -215,9 +216,15 @@ class Mailbox(object): delivery_mode='transient', durable=False) producer = Producer(chan, auto_declare=False) - producer.publish(reply, exchange=exchange, routing_key=routing_key, - declare=[exchange], headers={ - 'ticket': ticket, 'clock': self.clock.forward()}) + try: + producer.publish( + reply, exchange=exchange, routing_key=routing_key, + declare=[exchange], headers={ + 'ticket': ticket, 'clock': self.clock.forward(), + }, + ) + except InconsistencyError: + pass # queue probably deleted and no one is expecting a reply. def _publish(self, type, arguments, destination=None, reply_ticket=None, channel=None, timeout=None): diff --git a/kombu/tests/test_entities.py b/kombu/tests/test_entities.py index 20f4446e..bc4a79c1 100644 --- a/kombu/tests/test_entities.py +++ b/kombu/tests/test_entities.py @@ -15,6 +15,7 @@ from .utils import Mock def get_conn(): return Connection(transport=Transport) + class test_binding(TestCase): def test_constructor(self): diff --git a/kombu/tests/test_utils.py b/kombu/tests/test_utils.py index fcc03aab..389f84b7 100644 --- a/kombu/tests/test_utils.py +++ b/kombu/tests/test_utils.py @@ -216,7 +216,7 @@ class test_retry_over_time(TestCase): self.myfun, self.Predicate, max_retries=1, errback=self.errback, interval_max=14, ) - self.assertEqual(self.index, 2) + self.assertEqual(self.index, 1) # no errback self.assertRaises( self.Predicate, utils.retry_over_time, @@ -231,7 +231,7 @@ class test_retry_over_time(TestCase): self.myfun, self.Predicate, max_retries=0, errback=self.errback, interval_max=14, ) - self.assertEqual(self.index, 1) + self.assertEqual(self.index, 0) class test_cached_property(TestCase): diff --git a/kombu/transport/librabbitmq.py b/kombu/transport/librabbitmq.py index ee6246c1..4ada8a55 100644 --- a/kombu/transport/librabbitmq.py +++ b/kombu/transport/librabbitmq.py @@ -9,6 +9,7 @@ kombu.transport.librabbitmq """ from __future__ import absolute_import +import os import socket try: @@ -119,8 +120,22 @@ class Transport(base.Transport): def close_connection(self, connection): """Close the AMQP broker connection.""" + self.client.drain_events = None connection.close() + def _collect(self, connection): + if connection is not None: + for channel in connection.channels.itervalues(): + channel.connection = None + try: + os.close(connection.fileno()) + except OSError: + pass + connection.channels.clear() + connection.callbacks.clear() + self.client.drain_events = None + self.client = None + def verify_connection(self, connection): return connection.connected diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 55cead1c..7c4acbe7 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -219,7 +219,7 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, try: return fun(*args, **kwargs) except catch as exc: - if max_retries is not None and retries > max_retries: + if max_retries is not None and retries >= max_retries: raise if callback: callback() diff --git a/kombu/utils/eventio.py b/kombu/utils/eventio.py index 67aef059..4343892f 100644 --- a/kombu/utils/eventio.py +++ b/kombu/utils/eventio.py @@ -198,7 +198,7 @@ class _select(Poller): for fd in self._rfd | self._wfd | self._efd: try: _selectf([fd], [], [], 0) - except _selecterr as exc: + except (_selecterr, socket.error) as exc: if get_errno(exc) in SELECT_BAD_FD: self.unregister(fd) @@ -212,11 +212,12 @@ class _select(Poller): read, write, error = _selectf( self._rfd, self._wfd, self._efd, timeout, ) - except _selecterr as exc: + except (_selecterr, socket.error) as exc: if get_errno(exc) == errno.EINTR: return elif get_errno(exc) in SELECT_BAD_FD: - self._remove_bad() + return self._remove_bad() + raise events = {} for fd in read: |