diff options
Diffstat (limited to 'kazoo/handlers/threading.py')
-rw-r--r-- | kazoo/handlers/threading.py | 35 |
1 files changed, 15 insertions, 20 deletions
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py index 210c31e..b9acd87 100644 --- a/kazoo/handlers/threading.py +++ b/kazoo/handlers/threading.py @@ -12,21 +12,16 @@ environments that use threads. """ from __future__ import absolute_import +import atexit import logging +import queue import socket import threading import time -import six - -import kazoo.python2atexit as python2atexit from kazoo.handlers import utils from kazoo.handlers.utils import selector_select -try: - import Queue -except ImportError: # pragma: nocover - import queue as Queue # sentinel objects _STOP = object() @@ -35,11 +30,11 @@ log = logging.getLogger(__name__) def _to_fileno(obj): - if isinstance(obj, six.integer_types): + if isinstance(obj, int): fd = int(obj) elif hasattr(obj, "fileno"): fd = obj.fileno() - if not isinstance(fd, six.integer_types): + if not isinstance(fd, int): raise TypeError("fileno() returned a non-integer") fd = int(fd) else: @@ -98,8 +93,8 @@ class SequentialThreadingHandler(object): name = "sequential_threading_handler" timeout_exception = KazooTimeoutError sleep_func = staticmethod(time.sleep) - queue_impl = Queue.Queue - queue_empty = Queue.Empty + queue_impl = queue.Queue + queue_empty = queue.Empty def __init__(self): """Create a :class:`SequentialThreadingHandler` instance""" @@ -113,11 +108,11 @@ class SequentialThreadingHandler(object): def running(self): return self._running - def _create_thread_worker(self, queue): + def _create_thread_worker(self, work_queue): def _thread_worker(): # pragma: nocover while True: try: - func = queue.get() + func = work_queue.get() try: if func is _STOP: break @@ -125,7 +120,7 @@ class SequentialThreadingHandler(object): except Exception: log.exception("Exception in worker queue thread") finally: - queue.task_done() + work_queue.task_done() del func # release before possible idle except self.queue_empty: continue @@ -142,11 +137,11 @@ class SequentialThreadingHandler(object): # Spawn our worker threads, we have # - A callback worker for watch events to be called # - A completion worker for completion events to be called - for queue in (self.completion_queue, self.callback_queue): - w = self._create_thread_worker(queue) + for work_queue in (self.completion_queue, self.callback_queue): + w = self._create_thread_worker(work_queue) self._workers.append(w) self._running = True - python2atexit.register(self.stop) + atexit.register(self.stop) def stop(self): """Stop the worker threads and empty all queues.""" @@ -156,8 +151,8 @@ class SequentialThreadingHandler(object): self._running = False - for queue in (self.completion_queue, self.callback_queue): - queue.put(_STOP) + for work_queue in (self.completion_queue, self.callback_queue): + work_queue.put(_STOP) self._workers.reverse() while self._workers: @@ -167,7 +162,7 @@ class SequentialThreadingHandler(object): # Clear the queues self.callback_queue = self.queue_impl() self.completion_queue = self.queue_impl() - python2atexit.unregister(self.stop) + atexit.unregister(self.stop) def select(self, *args, **kwargs): return selector_select(*args, **kwargs) |