diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-17 10:35:02 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-17 10:35:02 -0800 |
commit | bcdefd698d03af65413d9feff522b22dd3b7eebb (patch) | |
tree | 53b9e5f662a0f583f82513c6b4f6e549f7088478 | |
parent | 9f0db5d38b444f5a93da7bed4a19114aff8701e8 (diff) | |
parent | d7522b0fb79bffbe10a2548658a48829dd1a5c33 (diff) | |
download | kafka-python-bcdefd698d03af65413d9feff522b22dd3b7eebb.tar.gz |
Merge pull request #553 from aisch/kafka_client_fd_leak
break up some circular references and close client wake pipe on __del__
-rw-r--r-- | kafka/client_async.py | 4 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 12 | ||||
-rw-r--r-- | kafka/coordinator/consumer.py | 15 | ||||
-rw-r--r-- | kafka/util.py | 37 | ||||
-rw-r--r-- | test/test_coordinator.py | 3 |
5 files changed, 64 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index f048be9..8647e3e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -97,6 +97,10 @@ class KafkaClient(object): self._bootstrap(collect_hosts(self.config['bootstrap_servers'])) self._wake_r, self._wake_w = os.pipe() + def __del__(self): + os.close(self._wake_r) + os.close(self._wake_w) + def _bootstrap(self, hosts): # Exponential backoff if bootstrap fails backoff_ms = self.config['reconnect_backoff_ms'] * 2 ** self._bootstrap_fails diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 6efdfd0..c49c38b 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -2,6 +2,7 @@ import abc import copy import logging import time +import weakref import six @@ -85,9 +86,12 @@ class BaseCoordinator(object): self.rejoin_needed = True self.needs_join_prepare = True self.heartbeat = Heartbeat(**self.config) - self.heartbeat_task = HeartbeatTask(self) + self.heartbeat_task = HeartbeatTask(weakref.proxy(self)) #self.sensors = GroupCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + def __del__(self): + self.heartbeat_task.disable() + @abc.abstractmethod def protocol_type(self): """ @@ -572,6 +576,12 @@ class HeartbeatTask(object): self._client = coordinator._client self._request_in_flight = False + def disable(self): + try: + self._client.unschedule(self) + except KeyError: + pass + def reset(self): # start or restart the heartbeat task to be executed at the next chance self._heartbeat.reset_session_timeout() diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 515377a..d63d052 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -4,20 +4,20 @@ import copy import collections import logging import time +import weakref import six from .base import BaseCoordinator from .assignors.range import RangePartitionAssignor from .assignors.roundrobin import RoundRobinPartitionAssignor -from .protocol import ( - ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, - ConsumerProtocol) +from .protocol import ConsumerProtocol from ..common import OffsetAndMetadata, TopicPartition from ..future import Future from ..protocol.commit import ( OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0, OffsetFetchRequest_v0, OffsetFetchRequest_v1) +from ..util import WeakMethod import kafka.common as Errors @@ -83,7 +83,7 @@ class ConsumerCoordinator(BaseCoordinator): self._partitions_per_topic = {} self._cluster = client.cluster self._cluster.request_update() - self._cluster.add_listener(self._handle_metadata_update) + self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) self._auto_commit_task = None if self.config['enable_auto_commit']: @@ -95,13 +95,18 @@ class ConsumerCoordinator(BaseCoordinator): log.warning('group_id is None: disabling auto-commit.') else: interval = self.config['auto_commit_interval_ms'] / 1000.0 - self._auto_commit_task = AutoCommitTask(self, interval) + self._auto_commit_task = AutoCommitTask(weakref.proxy(self), interval) # metrics=None, # metric_group_prefix=None, # metric_tags=None, # self.sensors = ConsumerCoordinatorMetrics(metrics, metric_group_prefix, metric_tags) + def __del__(self): + if self._auto_commit_task: + self._auto_commit_task.disable() + self._cluster.remove_listener(WeakMethod(self._handle_metadata_update)) + def protocol_type(self): return ConsumerProtocol.PROTOCOL_TYPE diff --git a/kafka/util.py b/kafka/util.py index c6e77fa..7a11910 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -3,6 +3,7 @@ import collections import struct import sys from threading import Thread, Event +import weakref import six @@ -151,3 +152,39 @@ class ReentrantTimer(object): def __del__(self): self.stop() + + +class WeakMethod(object): + """ + Callable that weakly references a method and the object it is bound to. It + is based on http://stackoverflow.com/a/24287465. + + Arguments: + + object_dot_method: A bound instance method (i.e. 'object.method'). + """ + def __init__(self, object_dot_method): + try: + self.target = weakref.ref(object_dot_method.__self__) + except AttributeError: + self.target = weakref.ref(object_dot_method.im_self) + self._target_id = id(self.target()) + try: + self.method = weakref.ref(object_dot_method.__func__) + except AttributeError: + self.method = weakref.ref(object_dot_method.im_func) + self._method_id = id(self.method()) + + def __call__(self, *args, **kwargs): + """ + Calls the method on target with args and kwargs. + """ + return self.method()(self.target(), *args, **kwargs) + + def __hash__(self): + return hash(self.target) ^ hash(self.method) + + def __eq__(self, other): + if not isinstance(other, WeakMethod): + return False + return self._target_id == other._target_id and self._method_id == other._method_id diff --git a/test/test_coordinator.py b/test/test_coordinator.py index bf48923..e0906c7 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -19,6 +19,7 @@ from kafka.protocol.commit import ( OffsetCommitResponse, OffsetFetchRequest_v0, OffsetFetchRequest_v1, OffsetFetchResponse) from kafka.protocol.metadata import MetadataResponse +from kafka.util import WeakMethod import kafka.common as Errors @@ -46,7 +47,7 @@ def test_init(conn): # metadata update on init assert cli.cluster._need_update is True - assert coordinator._handle_metadata_update in cli.cluster._listeners + assert WeakMethod(coordinator._handle_metadata_update) in cli.cluster._listeners @pytest.mark.parametrize("api_version", [(0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9)]) |