summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2017-02-09 12:27:16 -0800
committerDana Powers <dana.powers@gmail.com>2017-02-09 12:27:16 -0800
commit8fde79dbb5a3793b1a9ebd10e032d5f3dd535645 (patch)
treea991daae07aa142d936b37a2af7f55030355357b
parente825483d49bda41f13420311cbc9ffd59f7cee3d (diff)
downloadkafka-python-8fde79dbb5a3793b1a9ebd10e032d5f3dd535645.tar.gz
PEP-8: Spacing & removed unused imports (#899)
-rw-r--r--kafka/client.py25
-rw-r--r--kafka/client_async.py22
-rw-r--r--kafka/consumer/fetcher.py16
-rw-r--r--kafka/consumer/group.py6
-rw-r--r--kafka/coordinator/base.py12
-rw-r--r--kafka/producer/base.py4
-rw-r--r--kafka/producer/kafka.py8
-rw-r--r--kafka/protocol/fetch.py2
-rw-r--r--kafka/protocol/legacy.py14
-rw-r--r--kafka/protocol/message.py7
-rw-r--r--kafka/protocol/struct.py3
-rw-r--r--test/test_fetcher.py6
12 files changed, 58 insertions, 67 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 46955e2..ff0169b 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -248,7 +248,6 @@ class SimpleClient(object):
failed_payloads(broker_payloads)
continue
-
host, port, afi = get_ip_port_afi(broker.host)
try:
conn = self._get_conn(host, broker.port, afi)
@@ -348,20 +347,20 @@ class SimpleClient(object):
# Send the list of request payloads and collect the responses and
# errors
responses = {}
- requestId = self._next_id()
- log.debug('Request %s to %s: %s', requestId, broker, payloads)
+ request_id = self._next_id()
+ log.debug('Request %s to %s: %s', request_id, broker, payloads)
request = encoder_fn(client_id=self.client_id,
- correlation_id=requestId, payloads=payloads)
+ correlation_id=request_id, payloads=payloads)
# Send the request, recv the response
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
- conn.send(requestId, request)
+ conn.send(request_id, request)
except ConnectionError as e:
log.warning('ConnectionError attempting to send request %s '
- 'to server %s: %s', requestId, broker, e)
+ 'to server %s: %s', request_id, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
@@ -375,18 +374,18 @@ class SimpleClient(object):
# ProduceRequest w/ acks = 0
if decoder_fn is None:
log.debug('Request %s does not expect a response '
- '(skipping conn.recv)', requestId)
+ '(skipping conn.recv)', request_id)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
return []
try:
- response = conn.recv(requestId)
+ response = conn.recv(request_id)
except ConnectionError as e:
log.warning('ConnectionError attempting to receive a '
'response to request %s from server %s: %s',
- requestId, broker, e)
+ request_id, broker, e)
for payload in payloads:
topic_partition = (payload.topic, payload.partition)
@@ -399,7 +398,7 @@ class SimpleClient(object):
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
- log.debug('Response %s: %s', requestId, _resps)
+ log.debug('Response %s: %s', request_id, _resps)
# Return responses in the same order as provided
return [responses[tp] for tp in original_ordering]
@@ -473,8 +472,8 @@ class SimpleClient(object):
def has_metadata_for_topic(self, topic):
return (
- topic in self.topic_partitions
- and len(self.topic_partitions[topic]) > 0
+ topic in self.topic_partitions
+ and len(self.topic_partitions[topic]) > 0
)
def get_partition_ids_for_topic(self, topic):
@@ -487,7 +486,7 @@ class SimpleClient(object):
def topics(self):
return list(self.topic_partitions.keys())
- def ensure_topic_exists(self, topic, timeout = 30):
+ def ensure_topic_exists(self, topic, timeout=30):
start_time = time.time()
while not self.has_metadata_for_topic(topic):
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e94b65d..1513f39 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -10,7 +10,7 @@ import threading
# selectors in stdlib as of py3.4
try:
- import selectors # pylint: disable=import-error
+ import selectors # pylint: disable=import-error
except ImportError:
# vendored backport module
from .vendor import selectors34 as selectors
@@ -175,7 +175,7 @@ class KafkaClient(object):
self.config['api_version'], str(self.API_VERSIONS)))
self.cluster = ClusterMetadata(**self.config)
- self._topics = set() # empty set will fetch all topic metadata
+ self._topics = set() # empty set will fetch all topic metadata
self._metadata_refresh_in_progress = False
self._last_no_node_available_ms = 0
self._selector = self.config['selector']()
@@ -343,7 +343,7 @@ class KafkaClient(object):
return self._conns[node_id].connected()
def close(self, node_id=None):
- """Closes one or all broker connections.
+ """Close one or all broker connections.
Arguments:
node_id (int, optional): the id of the node to close
@@ -381,7 +381,7 @@ class KafkaClient(object):
def connection_delay(self, node_id):
"""
- Returns the number of milliseconds to wait, based on the connection
+ Return the number of milliseconds to wait, based on the connection
state, before attempting to send data. When disconnected, this respects
the reconnect backoff time. When connecting, returns 0 to allow
non-blocking connect to finish. When connected, returns a very large
@@ -507,7 +507,7 @@ class KafkaClient(object):
metadata_timeout_ms,
self._delayed_tasks.next_at() * 1000,
self.config['request_timeout_ms'])
- timeout = max(0, timeout / 1000.0) # avoid negative timeouts
+ timeout = max(0, timeout / 1000.0) # avoid negative timeouts
responses.extend(self._poll(timeout, sleep=sleep))
@@ -562,7 +562,7 @@ class KafkaClient(object):
# Accumulate as many responses as the connection has pending
while conn.in_flight_requests:
- response = conn.recv() # Note: conn.recv runs callbacks / errbacks
+ response = conn.recv() # Note: conn.recv runs callbacks / errbacks
# Incomplete responses are buffered internally
# while conn.in_flight_requests retains the request
@@ -770,9 +770,9 @@ class KafkaClient(object):
self._delayed_tasks.remove(task)
def check_version(self, node_id=None, timeout=2, strict=False):
- """Attempt to guess a broker version
+ """Attempt to guess the version of a Kafka broker.
- Note: it is possible that this method blocks longer than the
+ Note: It is possible that this method blocks longer than the
specified timeout. This can happen if the entire cluster
is down and the client enters a bootstrap backoff sleep.
This is only possible if node_id is None.
@@ -831,9 +831,9 @@ class KafkaClient(object):
class DelayedTaskQueue(object):
# see https://docs.python.org/2/library/heapq.html
def __init__(self):
- self._tasks = [] # list of entries arranged in a heap
- self._task_map = {} # mapping of tasks to entries
- self._counter = itertools.count() # unique sequence count
+ self._tasks = [] # list of entries arranged in a heap
+ self._task_map = {} # mapping of tasks to entries
+ self._counter = itertools.count() # unique sequence count
def add(self, task, at):
"""Add a task to run at a later time.
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 00d26c6..73daa36 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -44,7 +44,7 @@ class Fetcher(six.Iterator):
'max_poll_records': sys.maxsize,
'check_crcs': True,
'skip_double_compressed_messages': False,
- 'iterator_refetch_records': 1, # undocumented -- interface may change
+ 'iterator_refetch_records': 1, # undocumented -- interface may change
'metric_group_prefix': 'consumer',
'api_version': (0, 8, 0),
}
@@ -91,10 +91,10 @@ class Fetcher(six.Iterator):
self._client = client
self._subscriptions = subscriptions
- self._records = collections.deque() # (offset, topic_partition, messages)
+ self._records = collections.deque() # (offset, topic_partition, messages)
self._unauthorized_topics = set()
- self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
- self._record_too_large_partitions = dict() # {topic_partition: offset}
+ self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
+ self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
@@ -217,7 +217,7 @@ class Fetcher(six.Iterator):
return future.value
if not future.retriable():
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
if future.exception.invalid_metadata:
refresh_future = self._client.cluster.request_update()
@@ -494,10 +494,10 @@ class Fetcher(six.Iterator):
# of a compressed message depends on the
# typestamp type of the wrapper message:
- if msg.timestamp_type == 0: # CREATE_TIME (0)
+ if msg.timestamp_type == 0: # CREATE_TIME (0)
inner_timestamp = inner_msg.timestamp
- elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
+ elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
inner_timestamp = msg.timestamp
else:
@@ -673,7 +673,7 @@ class Fetcher(six.Iterator):
requests = {}
for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[version](
- -1, # replica_id
+ -1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
partition_data.items())
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 10d293c..47c721f 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -239,7 +239,7 @@ class KafkaConsumer(six.Iterator):
'ssl_password': None,
'api_version': None,
'api_version_auto_timeout_ms': 2000,
- 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet
+ 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
@@ -831,8 +831,8 @@ class KafkaConsumer(six.Iterator):
NoOffsetForPartitionError: If no offset is stored for a given
partition and no offset reset policy is defined.
"""
- if (self.config['api_version'] >= (0, 8, 1)
- and self.config['group_id'] is not None):
+ if (self.config['api_version'] >= (0, 8, 1) and
+ self.config['group_id'] is not None):
# Refresh commits for all assigned partitions
self._coordinator.refresh_committed_offsets_if_needed()
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index e4ebcb0..66d7e6c 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -15,7 +15,7 @@ from ..metrics import AnonMeasurable
from ..metrics.stats import Avg, Count, Max, Rate
from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest
from ..protocol.group import (HeartbeatRequest, JoinGroupRequest,
- LeaveGroupRequest, SyncGroupRequest)
+ LeaveGroupRequest, SyncGroupRequest)
log = logging.getLogger('kafka.coordinator')
@@ -220,7 +220,7 @@ class BaseCoordinator(object):
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update)
else:
- raise future.exception # pylint: disable-msg=raising-bad-type
+ raise future.exception # pylint: disable-msg=raising-bad-type
def need_rejoin(self):
"""Check whether the group should be rejoined (e.g. if metadata changes)
@@ -270,7 +270,7 @@ class BaseCoordinator(object):
Errors.IllegalGenerationError)):
continue
elif not future.retriable():
- raise exception # pylint: disable-msg=raising-bad-type
+ raise exception # pylint: disable-msg=raising-bad-type
time.sleep(self.config['retry_backoff_ms'] / 1000)
def _send_join_group_request(self):
@@ -428,7 +428,7 @@ class BaseCoordinator(object):
error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
log.info("Successfully joined group %s with generation %s",
- self.group_id, self.generation)
+ self.group_id, self.generation)
self.sensors.sync_latency.record((time.time() - send_time) * 1000)
future.success(response.member_assignment)
return
@@ -554,7 +554,7 @@ class BaseCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
- log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member
+ log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_heartbeat_response, future, time.time())
@@ -627,7 +627,7 @@ class HeartbeatTask(object):
def __call__(self):
if (self._coordinator.generation < 0 or
- self._coordinator.need_rejoin()):
+ self._coordinator.need_rejoin()):
# no need to send the heartbeat we're not using auto-assignment
# or if we are awaiting a rebalance
log.info("Skipping heartbeat: no auto-assignment"
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 30b6fd7..4079e22 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -282,7 +282,7 @@ class Producer(object):
codec_compresslevel=None,
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
async=False,
- batch_send=False, # deprecated, use async
+ batch_send=False, # deprecated, use async
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
@@ -452,7 +452,7 @@ class Producer(object):
# py3 supports unregistering
if hasattr(atexit, 'unregister'):
- atexit.unregister(self._cleanup_func) # pylint: disable=no-member
+ atexit.unregister(self._cleanup_func) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 785919b..98d4426 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -265,7 +265,7 @@ class KafkaProducer(object):
'linger_ms': 0,
'partitioner': DefaultPartitioner(),
'buffer_memory': 33554432,
- 'connections_max_idle_ms': 600000, # not implemented yet
+ 'connections_max_idle_ms': 600000, # not implemented yet
'max_block_ms': 60000,
'max_request_size': 1048576,
'metadata_max_age_ms': 300000,
@@ -296,7 +296,7 @@ class KafkaProducer(object):
}
def __init__(self, **configs):
- log.debug("Starting the Kafka producer") # trace
+ log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
@@ -369,7 +369,7 @@ class KafkaProducer(object):
def _unregister_cleanup(self):
if getattr(self, '_cleanup', None):
if hasattr(atexit, 'unregister'):
- atexit.unregister(self._cleanup) # pylint: disable=no-member
+ atexit.unregister(self._cleanup) # pylint: disable=no-member
# py2 requires removing from private attribute...
else:
@@ -549,7 +549,7 @@ class KafkaProducer(object):
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
"""
- log.debug("Flushing accumulated records in producer.") # trace
+ log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
self._sender.wakeup()
self._accumulator.await_flush_completion(timeout=timeout)
diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py
index 79b010f..6a9ad5b 100644
--- a/kafka/protocol/fetch.py
+++ b/kafka/protocol/fetch.py
@@ -37,7 +37,7 @@ class FetchResponse_v1(Struct):
class FetchResponse_v2(Struct):
API_KEY = 1
API_VERSION = 2
- SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
+ SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally
class FetchResponse_v3(Struct):
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index f2ae44a..6d9329d 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -3,9 +3,7 @@ from __future__ import absolute_import
import logging
import struct
-from kafka.vendor import six # pylint: disable=import-error
-
-from kafka.vendor.six.moves import xrange # pylint: disable=import-error
+from kafka.vendor import six # pylint: disable=import-error
import kafka.protocol.commit
import kafka.protocol.fetch
@@ -15,13 +13,12 @@ import kafka.protocol.offset
import kafka.protocol.produce
import kafka.structs
-from kafka.codec import (
- gzip_encode, gzip_decode, snappy_encode, snappy_decode)
-from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError
+from kafka.codec import gzip_encode, snappy_encode
+from kafka.errors import ProtocolError, UnsupportedCodecError
from kafka.structs import ConsumerMetadataResponse
from kafka.util import (
- crc32, read_short_string, read_int_string, relative_unpack,
- write_short_string, write_int_string, group_by_topic_and_partition)
+ crc32, read_short_string, relative_unpack,
+ write_int_string, group_by_topic_and_partition)
log = logging.getLogger(__name__)
@@ -320,7 +317,6 @@ class KafkaProtocol(object):
for partition, payload in six.iteritems(topic_payloads)])
for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
-
@classmethod
def decode_offset_commit_response(cls, response):
"""
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 36f03ca..bfad127 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -6,7 +6,6 @@ import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode,
lz4_decode, lz4_decode_old_kafka)
-from . import pickle
from .struct import Struct
from .types import (
Int8, Int32, Int64, Bytes, Schema, AbstractType
@@ -36,7 +35,7 @@ class Message(Struct):
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
TIMESTAMP_TYPE_MASK = 0x08
- HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
+ HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
timestamp=None):
@@ -127,7 +126,7 @@ class Message(Struct):
else:
raw_bytes = lz4_decode(self.value)
else:
- raise Exception('This should be impossible')
+ raise Exception('This should be impossible')
return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))
@@ -145,7 +144,7 @@ class MessageSet(AbstractType):
('offset', Int64),
('message', Bytes)
)
- HEADER_SIZE = 12 # offset + message_size
+ HEADER_SIZE = 12 # offset + message_size
@classmethod
def encode(cls, items):
diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py
index 602cfb8..a3d28d7 100644
--- a/kafka/protocol/struct.py
+++ b/kafka/protocol/struct.py
@@ -1,6 +1,5 @@
from __future__ import absolute_import
-#from collections import namedtuple
from io import BytesIO
from .abstract import AbstractType
@@ -23,7 +22,7 @@ class Struct(AbstractType):
self.encode = self._encode_self
@classmethod
- def encode(cls, item): # pylint: disable=E0202
+ def encode(cls, item): # pylint: disable=E0202
bits = []
for i, field in enumerate(cls.SCHEMA.fields):
bits.append(field.encode(item[i]))
diff --git a/test/test_fetcher.py b/test/test_fetcher.py
index fea3f7d..984de88 100644
--- a/test/test_fetcher.py
+++ b/test/test_fetcher.py
@@ -6,11 +6,9 @@ import pytest
from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
-import kafka.errors as Errors
-from kafka.future import Future
from kafka.metrics import Metrics
from kafka.protocol.fetch import FetchRequest
-from kafka.structs import TopicPartition, OffsetAndMetadata
+from kafka.structs import TopicPartition
@pytest.fixture
@@ -51,7 +49,7 @@ def test_send_fetches(fetcher, mocker):
]
mocker.patch.object(fetcher, '_create_fetch_requests',
- return_value = dict(enumerate(fetch_requests)))
+ return_value=dict(enumerate(fetch_requests)))
ret = fetcher.send_fetches()
for node, request in enumerate(fetch_requests):