From d7522b0fb79bffbe10a2548658a48829dd1a5c33 Mon Sep 17 00:00:00 2001 From: aisch Date: Tue, 16 Feb 2016 21:30:38 -0800 Subject: break up some circular references and close client wake pipe on __del__ --- kafka/util.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) (limited to 'kafka/util.py') diff --git a/kafka/util.py b/kafka/util.py index c6e77fa..7a11910 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -3,6 +3,7 @@ import collections import struct import sys from threading import Thread, Event +import weakref import six @@ -151,3 +152,39 @@ class ReentrantTimer(object): def __del__(self): self.stop() + + +class WeakMethod(object): + """ + Callable that weakly references a method and the object it is bound to. It + is based on http://stackoverflow.com/a/24287465. + + Arguments: + + object_dot_method: A bound instance method (i.e. 'object.method'). + """ + def __init__(self, object_dot_method): + try: + self.target = weakref.ref(object_dot_method.__self__) + except AttributeError: + self.target = weakref.ref(object_dot_method.im_self) + self._target_id = id(self.target()) + try: + self.method = weakref.ref(object_dot_method.__func__) + except AttributeError: + self.method = weakref.ref(object_dot_method.im_func) + self._method_id = id(self.method()) + + def __call__(self, *args, **kwargs): + """ + Calls the method on target with args and kwargs. + """ + return self.method()(self.target(), *args, **kwargs) + + def __hash__(self): + return hash(self.target) ^ hash(self.method) + + def __eq__(self, other): + if not isinstance(other, WeakMethod): + return False + return self._target_id == other._target_id and self._method_id == other._method_id -- cgit v1.2.1