summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-17 10:35:02 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-17 10:35:02 -0800
commitbcdefd698d03af65413d9feff522b22dd3b7eebb (patch)
tree53b9e5f662a0f583f82513c6b4f6e549f7088478
parent9f0db5d38b444f5a93da7bed4a19114aff8701e8 (diff)
parentd7522b0fb79bffbe10a2548658a48829dd1a5c33 (diff)
downloadkafka-python-bcdefd698d03af65413d9feff522b22dd3b7eebb.tar.gz
Merge pull request #553 from aisch/kafka_client_fd_leak
break up some circular references and close client wake pipe on __del__
-rw-r--r--kafka/client_async.py4
-rw-r--r--kafka/coordinator/base.py12
-rw-r--r--kafka/coordinator/consumer.py15
-rw-r--r--kafka/util.py37
-rw-r--r--test/test_coordinator.py3
5 files changed, 64 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index f048be9..8647e3e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -97,6 +97,10 @@ class KafkaClient(object):
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
self._wake_r, self._wake_w = os.pipe()
+ def __del__(self):
+ os.close(self._wake_r)
+ os.close(self._wake_w)
+
def _bootstrap(self, hosts):
# Exponential backoff if bootstrap fails
backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 6efdfd0..c49c38b 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -2,6 +2,7 @@ import abc
import copy
import logging
import time
+import weakref
import six
@@ -85,9 +86,12 @@ class BaseCoordinator(object):
self.rejoin_needed = True
self.needs_join_prepare = True
self.heartbeat = Heartbeat(**self.config)
- self.heartbeat_task = HeartbeatTask(self)
+ self.heartbeat_task = HeartbeatTask(weakref.proxy(self))
#self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ def __del__(self):
+ self.heartbeat_task.disable()
+
@abc.abstractmethod
def protocol_type(self):
"""
@@ -572,6 +576,12 @@ class HeartbeatTask(object):
self._client = coordinator._client
self._request_in_flight = False
+ def disable(self):
+ try:
+ self._client.unschedule(self)
+ except KeyError:
+ pass
+
def reset(self):
# start or restart the heartbeat task to be executed at the next chance
self._heartbeat.reset_session_timeout()
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 515377a..d63d052 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -4,20 +4,20 @@ import copy
import collections
import logging
import time
+import weakref
import six
from .base import BaseCoordinator
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
-from .protocol import (
- ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
- ConsumerProtocol)
+from .protocol import ConsumerProtocol
from ..common import OffsetAndMetadata, TopicPartition
from ..future import Future
from ..protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
+from ..util import WeakMethod
import kafka.common as Errors
@@ -83,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._partitions_per_topic = {}
self._cluster = client.cluster
self._cluster.request_update()
- self._cluster.add_listener(self._handle_metadata_update)
+ self._cluster.add_listener(WeakMethod(self._handle_metadata_update))
self._auto_commit_task = None
if self.config['enable_auto_commit']:
@@ -95,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator):
log.warning('group_id is None: disabling auto-commit.')
else:
interval = self.config['auto_commit_interval_ms'] / 1000.0
- self._auto_commit_task = AutoCommitTask(self, interval)
+ self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval)
# metrics=None,
# metric_group_prefix=None,
# metric_tags=None,
# self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags)
+ def __del__(self):
+ if self._auto_commit_task:
+ self._auto_commit_task.disable()
+ self._cluster.remove_listener(WeakMethod(self._handle_metadata_update))
+
def protocol_type(self):
return ConsumerProtocol.PROTOCOL_TYPE
diff --git a/kafka/util.py b/kafka/util.py
index c6e77fa..7a11910 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -3,6 +3,7 @@ import collections
import struct
import sys
from threading import Thread, Event
+import weakref
import six
@@ -151,3 +152,39 @@ class ReentrantTimer(object):
def __del__(self):
self.stop()
+
+
+class WeakMethod(object):
+ """
+ Callable that weakly references a method and the object it is bound to. It
+ is based on http://stackoverflow.com/a/24287465.
+
+ Arguments:
+
+ object_dot_method: A bound instance method (i.e. 'object.method').
+ """
+ def __init__(self, object_dot_method):
+ try:
+ self.target = weakref.ref(object_dot_method.__self__)
+ except AttributeError:
+ self.target = weakref.ref(object_dot_method.im_self)
+ self._target_id = id(self.target())
+ try:
+ self.method = weakref.ref(object_dot_method.__func__)
+ except AttributeError:
+ self.method = weakref.ref(object_dot_method.im_func)
+ self._method_id = id(self.method())
+
+ def __call__(self, *args, **kwargs):
+ """
+ Calls the method on target with args and kwargs.
+ """
+ return self.method()(self.target(), *args, **kwargs)
+
+ def __hash__(self):
+ return hash(self.target) ^ hash(self.method)
+
+ def __eq__(self, other):
+ if not isinstance(other, WeakMethod):
+ return False
+ return self._target_id == other._target_id and self._method_id == other._method_id
diff --git a/test/test_coordinator.py b/test/test_coordinator.py
index bf48923..e0906c7 100644
--- a/test/test_coordinator.py
+++ b/test/test_coordinator.py
@@ -19,6 +19,7 @@ from kafka.protocol.commit import (
OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1,
OffsetFetchResponse)
from kafka.protocol.metadata import MetadataResponse
+from kafka.util import WeakMethod
import kafka.common as Errors
@@ -46,7 +47,7 @@ def test_init(conn):
# metadata update on init
assert cli.cluster._need_update is True
- assert coordinator._handle_metadata_update in cli.cluster._listeners
+ assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners
@pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)])