summaryrefslogtreecommitdiff
path: root/kazoo/handlers/threading.py
diff options
context:
space:
mode:
Diffstat (limited to 'kazoo/handlers/threading.py')
-rw-r--r--kazoo/handlers/threading.py35
1 files changed, 15 insertions, 20 deletions
diff --git a/kazoo/handlers/threading.py b/kazoo/handlers/threading.py
index 210c31e..b9acd87 100644
--- a/kazoo/handlers/threading.py
+++ b/kazoo/handlers/threading.py
@@ -12,21 +12,16 @@ environments that use threads.
"""
from __future__ import absolute_import
+import atexit
import logging
+import queue
import socket
import threading
import time
-import six
-
-import kazoo.python2atexit as python2atexit
from kazoo.handlers import utils
from kazoo.handlers.utils import selector_select
-try:
- import Queue
-except ImportError: # pragma: nocover
- import queue as Queue
# sentinel objects
_STOP = object()
@@ -35,11 +30,11 @@ log = logging.getLogger(__name__)
def _to_fileno(obj):
- if isinstance(obj, six.integer_types):
+ if isinstance(obj, int):
fd = int(obj)
elif hasattr(obj, "fileno"):
fd = obj.fileno()
- if not isinstance(fd, six.integer_types):
+ if not isinstance(fd, int):
raise TypeError("fileno() returned a non-integer")
fd = int(fd)
else:
@@ -98,8 +93,8 @@ class SequentialThreadingHandler(object):
name = "sequential_threading_handler"
timeout_exception = KazooTimeoutError
sleep_func = staticmethod(time.sleep)
- queue_impl = Queue.Queue
- queue_empty = Queue.Empty
+ queue_impl = queue.Queue
+ queue_empty = queue.Empty
def __init__(self):
"""Create a :class:`SequentialThreadingHandler` instance"""
@@ -113,11 +108,11 @@ class SequentialThreadingHandler(object):
def running(self):
return self._running
- def _create_thread_worker(self, queue):
+ def _create_thread_worker(self, work_queue):
def _thread_worker(): # pragma: nocover
while True:
try:
- func = queue.get()
+ func = work_queue.get()
try:
if func is _STOP:
break
@@ -125,7 +120,7 @@ class SequentialThreadingHandler(object):
except Exception:
log.exception("Exception in worker queue thread")
finally:
- queue.task_done()
+ work_queue.task_done()
del func # release before possible idle
except self.queue_empty:
continue
@@ -142,11 +137,11 @@ class SequentialThreadingHandler(object):
# Spawn our worker threads, we have
# - A callback worker for watch events to be called
# - A completion worker for completion events to be called
- for queue in (self.completion_queue, self.callback_queue):
- w = self._create_thread_worker(queue)
+ for work_queue in (self.completion_queue, self.callback_queue):
+ w = self._create_thread_worker(work_queue)
self._workers.append(w)
self._running = True
- python2atexit.register(self.stop)
+ atexit.register(self.stop)
def stop(self):
"""Stop the worker threads and empty all queues."""
@@ -156,8 +151,8 @@ class SequentialThreadingHandler(object):
self._running = False
- for queue in (self.completion_queue, self.callback_queue):
- queue.put(_STOP)
+ for work_queue in (self.completion_queue, self.callback_queue):
+ work_queue.put(_STOP)
self._workers.reverse()
while self._workers:
@@ -167,7 +162,7 @@ class SequentialThreadingHandler(object):
# Clear the queues
self.callback_queue = self.queue_impl()
self.completion_queue = self.queue_impl()
- python2atexit.unregister(self.stop)
+ atexit.unregister(self.stop)
def select(self, *args, **kwargs):
return selector_select(*args, **kwargs)