summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-17 09:03:52 -0700
committerDana Powers <dana.powers@gmail.com>2016-06-18 13:21:36 -0700
commit6d3f31d276ad823bc466d16284f4894104db3a83 (patch)
tree545b9148d0dd1ab53a4debcd2a116c77bfebcddf
parentde7eef4718fd99f2a496e2effcece39a14ac7f9c (diff)
downloadkafka-python-at_exit_weakref.tar.gz
Use weakref when registering a producer.close atexit to fix normal gcat_exit_weakref
-rw-r--r--kafka/producer/kafka.py40
-rw-r--r--kafka/util.py10
2 files changed, 47 insertions, 3 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0793c80..2185869 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -5,12 +5,13 @@ import copy
import logging
import threading
import time
+import weakref
+from .. import errors as Errors
from ..client_async import KafkaClient
-from ..structs import TopicPartition
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
-from .. import errors as Errors
+from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
from .sender import Sender
@@ -293,14 +294,47 @@ class KafkaProducer(object):
self._sender.daemon = True
self._sender.start()
self._closed = False
- atexit.register(self.close, timeout=0)
+
+ self._cleanup = self._cleanup_factory()
+ atexit.register(self._cleanup)
log.debug("Kafka producer started")
+ def _cleanup_factory(self):
+ """Build a cleanup clojure that doesn't increase our ref count"""
+ _self = weakref.proxy(self)
+ def wrapper():
+ try:
+ _self.close()
+ except (ReferenceError, AttributeError):
+ pass
+ return wrapper
+
+ def _unregister_cleanup(self):
+ if getattr(self, '_cleanup'):
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(self._cleanup) # pylint: disable=no-member
+
+ # py2 requires removing from private attribute...
+ else:
+
+ # ValueError on list.remove() if the exithandler no longer exists
+ # but that is fine here
+ try:
+ atexit._exithandlers.remove( # pylint: disable=no-member
+ (self._cleanup, (), {}))
+ except ValueError:
+ pass
+ self._cleanup = None
+
def __del__(self):
self.close(timeout=0)
def close(self, timeout=None):
"""Close this producer."""
+
+ # drop our atexit handler now to avoid leaks
+ self._unregister_cleanup()
+
if not hasattr(self, '_closed') or self._closed:
log.info('Kafka producer closed')
return
diff --git a/kafka/util.py b/kafka/util.py
index 18c39a4..b3a72f3 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,3 +1,4 @@
+import atexit
import binascii
import collections
import struct
@@ -188,3 +189,12 @@ class WeakMethod(object):
if not isinstance(other, WeakMethod):
return False
return self._target_id == other._target_id and self._method_id == other._method_id
+
+
+def try_method_on_system_exit(obj, method, *args, **kwargs):
+ def wrapper(_obj, _meth, *args, **kwargs):
+ try:
+ getattr(_obj, _meth)(*args, **kwargs)
+ except (ReferenceError, AttributeError):
+ pass
+ atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)