summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2017-02-09 12:27:16 -0800
committerDana Powers <dana.powers@gmail.com>2017-02-09 12:27:16 -0800
commit8fde79dbb5a3793b1a9ebd10e032d5f3dd535645 (patch)
treea991daae07aa142d936b37a2af7f55030355357b /kafka/client_async.py
parente825483d49bda41f13420311cbc9ffd59f7cee3d (diff)
downloadkafka-python-8fde79dbb5a3793b1a9ebd10e032d5f3dd535645.tar.gz
PEP-8: Spacing & removed unused imports (#899)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py22
1 files changed, 11 insertions, 11 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e94b65d..1513f39 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -10,7 +10,7 @@ import threading
# selectors in stdlib as of py3.4
try:
- import selectors # pylint: disable=import-error
+ import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from .vendor import selectors34 as selectors
@@ -175,7 +175,7 @@ class KafkaClient(object):
self.config['api_version'], str(self.API_VERSIONS)))
self.cluster = ClusterMetadata(**self.config)
- self._topics = set() # empty set will fetch all topic metadata
+ self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._last_no_node_available_ms = 0
self._selector = self.config['selector']()
@@ -343,7 +343,7 @@ class KafkaClient(object):
return self._conns[node_id].connected()
def close(self, node_id=None):
- """Closes one or all broker connections.
+ """Close one or all broker connections.
Arguments:
node_id (int, optional): the id of the node to close
@@ -381,7 +381,7 @@ class KafkaClient(object):
def connection_delay(self, node_id):
"""
- Returns the number of milliseconds to wait, based on the connection
+ Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting, returns 0 to allow
non-blocking connect to finish. When connected, returns a very large
@@ -507,7 +507,7 @@ class KafkaClient(object):
metadata_timeout_ms,
self._delayed_tasks.next_at() * 1000,
self.config['request_timeout_ms'])
- timeout = max(0, timeout / 1000.0) # avoid negative timeouts
+ timeout = max(0, timeout / 1000.0) # avoid negative timeouts
responses.extend(self._poll(timeout, sleep=sleep))
@@ -562,7 +562,7 @@ class KafkaClient(object):
# Accumulate as many responses as the connection has pending
while conn.in_flight_requests:
- response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+ response = conn.recv() # Note: conn.recv runs callbacks / errbacks
# Incomplete responses are buffered internally
# while conn.in_flight_requests retains the request
@@ -770,9 +770,9 @@ class KafkaClient(object):
self._delayed_tasks.remove(task)
def check_version(self, node_id=None, timeout=2, strict=False):
- """Attempt to guess a broker version
+ """Attempt to guess the version of a Kafka broker.
- Note: it is possible that this method blocks longer than the
+ Note: It is possible that this method blocks longer than the
specified timeout. This can happen if the entire cluster
is down and the client enters a bootstrap backoff sleep.
This is only possible if node_id is None.
@@ -831,9 +831,9 @@ class KafkaClient(object):
class DelayedTaskQueue(object):
# see https://docs.python.org/2/library/heapq.html
def __init__(self):
- self._tasks = [] # list of entries arranged in a heap
- self._task_map = {} # mapping of tasks to entries
- self._counter = itertools.count() # unique sequence count
+ self._tasks = [] # list of entries arranged in a heap
+ self._task_map = {} # mapping of tasks to entries
+ self._counter = itertools.count() # unique sequence count
def add(self, task, at):
"""Add a task to run at a later time.