summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/kafka.py40
-rw-r--r--kafka/util.py10
-rw-r--r--test/test_producer.py14
3 files changed, 61 insertions, 3 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0793c80..2185869 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -5,12 +5,13 @@ import copy
import logging
import threading
import time
+import weakref
+from .. import errors as Errors
from ..client_async import KafkaClient
-from ..structs import TopicPartition
from ..partitioner.default import DefaultPartitioner
from ..protocol.message import Message, MessageSet
-from .. import errors as Errors
+from ..structs import TopicPartition
from .future import FutureRecordMetadata, FutureProduceResult
from .record_accumulator import AtomicInteger, RecordAccumulator
from .sender import Sender
@@ -293,14 +294,47 @@ class KafkaProducer(object):
self._sender.daemon = True
self._sender.start()
self._closed = False
- atexit.register(self.close, timeout=0)
+
+ self._cleanup = self._cleanup_factory()
+ atexit.register(self._cleanup)
log.debug("Kafka producer started")
+ def _cleanup_factory(self):
+ """Build a cleanup clojure that doesn't increase our ref count"""
+ _self = weakref.proxy(self)
+ def wrapper():
+ try:
+ _self.close()
+ except (ReferenceError, AttributeError):
+ pass
+ return wrapper
+
+ def _unregister_cleanup(self):
+ if getattr(self, '_cleanup'):
+ if hasattr(atexit, 'unregister'):
+ atexit.unregister(self._cleanup) # pylint: disable=no-member
+
+ # py2 requires removing from private attribute...
+ else:
+
+ # ValueError on list.remove() if the exithandler no longer exists
+ # but that is fine here
+ try:
+ atexit._exithandlers.remove( # pylint: disable=no-member
+ (self._cleanup, (), {}))
+ except ValueError:
+ pass
+ self._cleanup = None
+
def __del__(self):
self.close(timeout=0)
def close(self, timeout=None):
"""Close this producer."""
+
+ # drop our atexit handler now to avoid leaks
+ self._unregister_cleanup()
+
if not hasattr(self, '_closed') or self._closed:
log.info('Kafka producer closed')
return
diff --git a/kafka/util.py b/kafka/util.py
index 18c39a4..b3a72f3 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,3 +1,4 @@
+import atexit
import binascii
import collections
import struct
@@ -188,3 +189,12 @@ class WeakMethod(object):
if not isinstance(other, WeakMethod):
return False
return self._target_id == other._target_id and self._method_id == other._method_id
+
+
+def try_method_on_system_exit(obj, method, *args, **kwargs):
+ def wrapper(_obj, _meth, *args, **kwargs):
+ try:
+ getattr(_obj, _meth)(*args, **kwargs)
+ except (ReferenceError, AttributeError):
+ pass
+ atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)
diff --git a/test/test_producer.py b/test/test_producer.py
index f11bb05..125737b 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -1,4 +1,7 @@
+import gc
+import platform
import sys
+import threading
import pytest
@@ -64,3 +67,14 @@ def test_end_to_end(kafka_broker, compression):
break
assert msgs == set(['msg %d' % i for i in range(messages)])
+
+
+@pytest.mark.skipif(platform.python_implementation() != 'CPython',
+ reason='Test relies on CPython-specific gc policies')
+def test_kafka_producer_gc_cleanup():
+ threads = threading.active_count()
+ producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection
+ assert threading.active_count() == threads + 1
+ del(producer)
+ gc.collect()
+ assert threading.active_count() == threads