summaryrefslogtreecommitdiff
path: root/kafka/util.py
diff options
context:
space:
mode:
authorDavid Arthur <mumrah@gmail.com>2013-03-30 00:28:00 -0400
committerDavid Arthur <mumrah@gmail.com>2013-04-02 20:19:30 -0400
commitb6d98c07b418b16061ae92392947d5dd6958a708 (patch)
treee777fcf3019ef0ddc6c278ef733c487f5b0532c3 /kafka/util.py
parent3499e2f6ead76e1c2db6ac754358bd57f9a15268 (diff)
downloadkafka-python-b6d98c07b418b16061ae92392947d5dd6958a708.tar.gz
Big code re-org
Diffstat (limited to 'kafka/util.py')
-rw-r--r--kafka/util.py25
1 files changed, 25 insertions, 0 deletions
diff --git a/kafka/util.py b/kafka/util.py
index 715fb28..5dc6bc2 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -1,6 +1,7 @@
from collections import defaultdict
from itertools import groupby
import struct
+from threading import Timer
def write_int_string(s):
if s is None:
@@ -56,3 +57,27 @@ class BufferUnderflowError(Exception):
class ChecksumError(Exception):
pass
+
+class ReentrantTimer(object):
+ """
+ A timer that can be restarted, unlike threading.Timer (although this uses threading.Timer)
+
+ t: timer interval in milliseconds
+ fn: a callable to invoke
+ """
+ def __init__(self, t, fn):
+ self.timer = None
+ self.t = t
+ self.fn = fn
+
+ def start(self):
+ if self.timer is None:
+ self.timer = Timer(self.t / 1000., self.fn)
+ self.timer.start()
+ else:
+ self.timer.cancel()
+ self.timer = Timer(self.t / 1000., self.fn)
+ self.timer.start()
+
+ def stop(self):
+ self.timer.cancel()