From b6d98c07b418b16061ae92392947d5dd6958a708 Mon Sep 17 00:00:00 2001 From: David Arthur Date: Sat, 30 Mar 2013 00:28:00 -0400 Subject: Big code re-org --- kafka/util.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) (limited to 'kafka/util.py') diff --git a/kafka/util.py b/kafka/util.py index 715fb28..5dc6bc2 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -1,6 +1,7 @@ from collections import defaultdict from itertools import groupby import struct +from threading import Timer def write_int_string(s): if s is None: @@ -56,3 +57,27 @@ class BufferUnderflowError(Exception): class ChecksumError(Exception): pass + +class ReentrantTimer(object): + """ + A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer) + + t: timer interval in milliseconds + fn: a callable to invoke + """ + def __init__(self, t, fn): + self.timer = None + self.t = t + self.fn = fn + + def start(self): + if self.timer is None: + self.timer = Timer(self.t / 1000., self.fn) + self.timer.start() + else: + self.timer.cancel() + self.timer = Timer(self.t / 1000., self.fn) + self.timer.start() + + def stop(self): + self.timer.cancel() -- cgit v1.2.1