diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-06-17 09:03:52 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-06-18 13:21:36 -0700 |
commit | 6d3f31d276ad823bc466d16284f4894104db3a83 (patch) | |
tree | 545b9148d0dd1ab53a4debcd2a116c77bfebcddf | |
parent | de7eef4718fd99f2a496e2effcece39a14ac7f9c (diff) | |
download | kafka-python-at_exit_weakref.tar.gz |
Use weakref when registering a producer.close atexit to fix normal gcat_exit_weakref
-rw-r--r-- | kafka/producer/kafka.py | 40 | ||||
-rw-r--r-- | kafka/util.py | 10 |
2 files changed, 47 insertions, 3 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0793c80..2185869 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -5,12 +5,13 @@ import copy import logging import threading import time +import weakref +from .. import errors as Errors from ..client_async import KafkaClient -from ..structs import TopicPartition from ..partitioner.default import DefaultPartitioner from ..protocol.message import Message, MessageSet -from .. import errors as Errors +from ..structs import TopicPartition from .future import FutureRecordMetadata, FutureProduceResult from .record_accumulator import AtomicInteger, RecordAccumulator from .sender import Sender @@ -293,14 +294,47 @@ class KafkaProducer(object): self._sender.daemon = True self._sender.start() self._closed = False - atexit.register(self.close, timeout=0) + + self._cleanup = self._cleanup_factory() + atexit.register(self._cleanup) log.debug("Kafka producer started") + def _cleanup_factory(self): + """Build a cleanup clojure that doesn't increase our ref count""" + _self = weakref.proxy(self) + def wrapper(): + try: + _self.close() + except (ReferenceError, AttributeError): + pass + return wrapper + + def _unregister_cleanup(self): + if getattr(self, '_cleanup'): + if hasattr(atexit, 'unregister'): + atexit.unregister(self._cleanup) # pylint: disable=no-member + + # py2 requires removing from private attribute... + else: + + # ValueError on list.remove() if the exithandler no longer exists + # but that is fine here + try: + atexit._exithandlers.remove( # pylint: disable=no-member + (self._cleanup, (), {})) + except ValueError: + pass + self._cleanup = None + def __del__(self): self.close(timeout=0) def close(self, timeout=None): """Close this producer.""" + + # drop our atexit handler now to avoid leaks + self._unregister_cleanup() + if not hasattr(self, '_closed') or self._closed: log.info('Kafka producer closed') return diff --git a/kafka/util.py b/kafka/util.py index 18c39a4..b3a72f3 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,3 +1,4 @@ +import atexit import binascii import collections import struct @@ -188,3 +189,12 @@ class WeakMethod(object): if not isinstance(other, WeakMethod): return False return self._target_id == other._target_id and self._method_id == other._method_id + + +def try_method_on_system_exit(obj, method, *args, **kwargs): + def wrapper(_obj, _meth, *args, **kwargs): + try: + getattr(_obj, _meth)(*args, **kwargs) + except (ReferenceError, AttributeError): + pass + atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs) |