diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2017-02-09 12:27:16 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2017-02-09 12:27:16 -0800 |
commit | 8fde79dbb5a3793b1a9ebd10e032d5f3dd535645 (patch) | |
tree | a991daae07aa142d936b37a2af7f55030355357b | |
parent | e825483d49bda41f13420311cbc9ffd59f7cee3d (diff) | |
download | kafka-python-8fde79dbb5a3793b1a9ebd10e032d5f3dd535645.tar.gz |
PEP-8: Spacing & removed unused imports (#899)
-rw-r--r-- | kafka/client.py | 25 | ||||
-rw-r--r-- | kafka/client_async.py | 22 | ||||
-rw-r--r-- | kafka/consumer/fetcher.py | 16 | ||||
-rw-r--r-- | kafka/consumer/group.py | 6 | ||||
-rw-r--r-- | kafka/coordinator/base.py | 12 | ||||
-rw-r--r-- | kafka/producer/base.py | 4 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 8 | ||||
-rw-r--r-- | kafka/protocol/fetch.py | 2 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 14 | ||||
-rw-r--r-- | kafka/protocol/message.py | 7 | ||||
-rw-r--r-- | kafka/protocol/struct.py | 3 | ||||
-rw-r--r-- | test/test_fetcher.py | 6 |
12 files changed, 58 insertions, 67 deletions
diff --git a/kafka/client.py b/kafka/client.py index 46955e2..ff0169b 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -248,7 +248,6 @@ class SimpleClient(object): failed_payloads(broker_payloads) continue - host, port, afi = get_ip_port_afi(broker.host) try: conn = self._get_conn(host, broker.port, afi) @@ -348,20 +347,20 @@ class SimpleClient(object): # Send the list of request payloads and collect the responses and # errors responses = {} - requestId = self._next_id() - log.debug('Request %s to %s: %s', requestId, broker, payloads) + request_id = self._next_id() + log.debug('Request %s to %s: %s', request_id, broker, payloads) request = encoder_fn(client_id=self.client_id, - correlation_id=requestId, payloads=payloads) + correlation_id=request_id, payloads=payloads) # Send the request, recv the response try: host, port, afi = get_ip_port_afi(broker.host) conn = self._get_conn(host, broker.port, afi) - conn.send(requestId, request) + conn.send(request_id, request) except ConnectionError as e: log.warning('ConnectionError attempting to send request %s ' - 'to server %s: %s', requestId, broker, e) + 'to server %s: %s', request_id, broker, e) for payload in payloads: topic_partition = (payload.topic, payload.partition) @@ -375,18 +374,18 @@ class SimpleClient(object): # ProduceRequest w/ acks = 0 if decoder_fn is None: log.debug('Request %s does not expect a response ' - '(skipping conn.recv)', requestId) + '(skipping conn.recv)', request_id) for payload in payloads: topic_partition = (payload.topic, payload.partition) responses[topic_partition] = None return [] try: - response = conn.recv(requestId) + response = conn.recv(request_id) except ConnectionError as e: log.warning('ConnectionError attempting to receive a ' 'response to request %s from server %s: %s', - requestId, broker, e) + request_id, broker, e) for payload in payloads: topic_partition = (payload.topic, payload.partition) @@ -399,7 +398,7 @@ class SimpleClient(object): payload_response.partition) responses[topic_partition] = payload_response _resps.append(payload_response) - log.debug('Response %s: %s', requestId, _resps) + log.debug('Response %s: %s', request_id, _resps) # Return responses in the same order as provided return [responses[tp] for tp in original_ordering] @@ -473,8 +472,8 @@ class SimpleClient(object): def has_metadata_for_topic(self, topic): return ( - topic in self.topic_partitions - and len(self.topic_partitions[topic]) > 0 + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 ) def get_partition_ids_for_topic(self, topic): @@ -487,7 +486,7 @@ class SimpleClient(object): def topics(self): return list(self.topic_partitions.keys()) - def ensure_topic_exists(self, topic, timeout = 30): + def ensure_topic_exists(self, topic, timeout=30): start_time = time.time() while not self.has_metadata_for_topic(topic): diff --git a/kafka/client_async.py b/kafka/client_async.py index e94b65d..1513f39 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -10,7 +10,7 @@ import threading # selectors in stdlib as of py3.4 try: - import selectors # pylint: disable=import-error + import selectors # pylint: disable=import-error except ImportError: # vendored backport module from .vendor import selectors34 as selectors @@ -175,7 +175,7 @@ class KafkaClient(object): self.config['api_version'], str(self.API_VERSIONS))) self.cluster = ClusterMetadata(**self.config) - self._topics = set() # empty set will fetch all topic metadata + self._topics = set() # empty set will fetch all topic metadata self._metadata_refresh_in_progress = False self._last_no_node_available_ms = 0 self._selector = self.config['selector']() @@ -343,7 +343,7 @@ class KafkaClient(object): return self._conns[node_id].connected() def close(self, node_id=None): - """Closes one or all broker connections. + """Close one or all broker connections. Arguments: node_id (int, optional): the id of the node to close @@ -381,7 +381,7 @@ class KafkaClient(object): def connection_delay(self, node_id): """ - Returns the number of milliseconds to wait, based on the connection + Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting, returns 0 to allow non-blocking connect to finish. When connected, returns a very large @@ -507,7 +507,7 @@ class KafkaClient(object): metadata_timeout_ms, self._delayed_tasks.next_at() * 1000, self.config['request_timeout_ms']) - timeout = max(0, timeout / 1000.0) # avoid negative timeouts + timeout = max(0, timeout / 1000.0) # avoid negative timeouts responses.extend(self._poll(timeout, sleep=sleep)) @@ -562,7 +562,7 @@ class KafkaClient(object): # Accumulate as many responses as the connection has pending while conn.in_flight_requests: - response = conn.recv() # Note: conn.recv runs callbacks / errbacks + response = conn.recv() # Note: conn.recv runs callbacks / errbacks # Incomplete responses are buffered internally # while conn.in_flight_requests retains the request @@ -770,9 +770,9 @@ class KafkaClient(object): self._delayed_tasks.remove(task) def check_version(self, node_id=None, timeout=2, strict=False): - """Attempt to guess a broker version + """Attempt to guess the version of a Kafka broker. - Note: it is possible that this method blocks longer than the + Note: It is possible that this method blocks longer than the specified timeout. This can happen if the entire cluster is down and the client enters a bootstrap backoff sleep. This is only possible if node_id is None. @@ -831,9 +831,9 @@ class KafkaClient(object): class DelayedTaskQueue(object): # see https://docs.python.org/2/library/heapq.html def __init__(self): - self._tasks = [] # list of entries arranged in a heap - self._task_map = {} # mapping of tasks to entries - self._counter = itertools.count() # unique sequence count + self._tasks = [] # list of entries arranged in a heap + self._task_map = {} # mapping of tasks to entries + self._counter = itertools.count() # unique sequence count def add(self, task, at): """Add a task to run at a later time. diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 00d26c6..73daa36 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -44,7 +44,7 @@ class Fetcher(six.Iterator): 'max_poll_records': sys.maxsize, 'check_crcs': True, 'skip_double_compressed_messages': False, - 'iterator_refetch_records': 1, # undocumented -- interface may change + 'iterator_refetch_records': 1, # undocumented -- interface may change 'metric_group_prefix': 'consumer', 'api_version': (0, 8, 0), } @@ -91,10 +91,10 @@ class Fetcher(six.Iterator): self._client = client self._subscriptions = subscriptions - self._records = collections.deque() # (offset, topic_partition, messages) + self._records = collections.deque() # (offset, topic_partition, messages) self._unauthorized_topics = set() - self._offset_out_of_range_partitions = dict() # {topic_partition: offset} - self._record_too_large_partitions = dict() # {topic_partition: offset} + self._offset_out_of_range_partitions = dict() # {topic_partition: offset} + self._record_too_large_partitions = dict() # {topic_partition: offset} self._iterator = None self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) @@ -217,7 +217,7 @@ class Fetcher(six.Iterator): return future.value if not future.retriable(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type if future.exception.invalid_metadata: refresh_future = self._client.cluster.request_update() @@ -494,10 +494,10 @@ class Fetcher(six.Iterator): # of a compressed message depends on the # typestamp type of the wrapper message: - if msg.timestamp_type == 0: # CREATE_TIME (0) + if msg.timestamp_type == 0: # CREATE_TIME (0) inner_timestamp = inner_msg.timestamp - elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) + elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1) inner_timestamp = msg.timestamp else: @@ -673,7 +673,7 @@ class Fetcher(six.Iterator): requests = {} for node_id, partition_data in six.iteritems(fetchable): requests[node_id] = FetchRequest[version]( - -1, # replica_id + -1, # replica_id self.config['fetch_max_wait_ms'], self.config['fetch_min_bytes'], partition_data.items()) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 10d293c..47c721f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -239,7 +239,7 @@ class KafkaConsumer(six.Iterator): 'ssl_password': None, 'api_version': None, 'api_version_auto_timeout_ms': 2000, - 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet + 'connections_max_idle_ms': 9 * 60 * 1000, # Not implemented yet 'metric_reporters': [], 'metrics_num_samples': 2, 'metrics_sample_window_ms': 30000, @@ -831,8 +831,8 @@ class KafkaConsumer(six.Iterator): NoOffsetForPartitionError: If no offset is stored for a given partition and no offset reset policy is defined. """ - if (self.config['api_version'] >= (0, 8, 1) - and self.config['group_id'] is not None): + if (self.config['api_version'] >= (0, 8, 1) and + self.config['group_id'] is not None): # Refresh commits for all assigned partitions self._coordinator.refresh_committed_offsets_if_needed() diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index e4ebcb0..66d7e6c 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -15,7 +15,7 @@ from ..metrics import AnonMeasurable from ..metrics.stats import Avg, Count, Max, Rate from ..protocol.commit import GroupCoordinatorRequest, OffsetCommitRequest from ..protocol.group import (HeartbeatRequest, JoinGroupRequest, - LeaveGroupRequest, SyncGroupRequest) + LeaveGroupRequest, SyncGroupRequest) log = logging.getLogger('kafka.coordinator') @@ -220,7 +220,7 @@ class BaseCoordinator(object): metadata_update = self._client.cluster.request_update() self._client.poll(future=metadata_update) else: - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type def need_rejoin(self): """Check whether the group should be rejoined (e.g. if metadata changes) @@ -270,7 +270,7 @@ class BaseCoordinator(object): Errors.IllegalGenerationError)): continue elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type + raise exception # pylint: disable-msg=raising-bad-type time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): @@ -428,7 +428,7 @@ class BaseCoordinator(object): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: log.info("Successfully joined group %s with generation %s", - self.group_id, self.generation) + self.group_id, self.generation) self.sensors.sync_latency.record((time.time() - send_time) * 1000) future.success(response.member_assignment) return @@ -554,7 +554,7 @@ class BaseCoordinator(object): def _send_heartbeat_request(self): """Send a heartbeat request""" request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id) - log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) #pylint: disable-msg=no-member + log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future, time.time()) @@ -627,7 +627,7 @@ class HeartbeatTask(object): def __call__(self): if (self._coordinator.generation < 0 or - self._coordinator.need_rejoin()): + self._coordinator.need_rejoin()): # no need to send the heartbeat we're not using auto-assignment # or if we are awaiting a rebalance log.info("Skipping heartbeat: no auto-assignment" diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 30b6fd7..4079e22 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -282,7 +282,7 @@ class Producer(object): codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, async=False, - batch_send=False, # deprecated, use async + batch_send=False, # deprecated, use async batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -452,7 +452,7 @@ class Producer(object): # py3 supports unregistering if hasattr(atexit, 'unregister'): - atexit.unregister(self._cleanup_func) # pylint: disable=no-member + atexit.unregister(self._cleanup_func) # pylint: disable=no-member # py2 requires removing from private attribute... else: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 785919b..98d4426 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -265,7 +265,7 @@ class KafkaProducer(object): 'linger_ms': 0, 'partitioner': DefaultPartitioner(), 'buffer_memory': 33554432, - 'connections_max_idle_ms': 600000, # not implemented yet + 'connections_max_idle_ms': 600000, # not implemented yet 'max_block_ms': 60000, 'max_request_size': 1048576, 'metadata_max_age_ms': 300000, @@ -296,7 +296,7 @@ class KafkaProducer(object): } def __init__(self, **configs): - log.debug("Starting the Kafka producer") # trace + log.debug("Starting the Kafka producer") # trace self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: @@ -369,7 +369,7 @@ class KafkaProducer(object): def _unregister_cleanup(self): if getattr(self, '_cleanup', None): if hasattr(atexit, 'unregister'): - atexit.unregister(self._cleanup) # pylint: disable=no-member + atexit.unregister(self._cleanup) # pylint: disable=no-member # py2 requires removing from private attribute... else: @@ -549,7 +549,7 @@ class KafkaProducer(object): Arguments: timeout (float, optional): timeout in seconds to wait for completion. """ - log.debug("Flushing accumulated records in producer.") # trace + log.debug("Flushing accumulated records in producer.") # trace self._accumulator.begin_flush() self._sender.wakeup() self._accumulator.await_flush_completion(timeout=timeout) diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index 79b010f..6a9ad5b 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -37,7 +37,7 @@ class FetchResponse_v1(Struct): class FetchResponse_v2(Struct): API_KEY = 1 API_VERSION = 2 - SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally + SCHEMA = FetchResponse_v1.SCHEMA # message format changed internally class FetchResponse_v3(Struct): diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index f2ae44a..6d9329d 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -3,9 +3,7 @@ from __future__ import absolute_import import logging import struct -from kafka.vendor import six # pylint: disable=import-error - -from kafka.vendor.six.moves import xrange # pylint: disable=import-error +from kafka.vendor import six # pylint: disable=import-error import kafka.protocol.commit import kafka.protocol.fetch @@ -15,13 +13,12 @@ import kafka.protocol.offset import kafka.protocol.produce import kafka.structs -from kafka.codec import ( - gzip_encode, gzip_decode, snappy_encode, snappy_decode) -from kafka.errors import ProtocolError, ChecksumError, UnsupportedCodecError +from kafka.codec import gzip_encode, snappy_encode +from kafka.errors import ProtocolError, UnsupportedCodecError from kafka.structs import ConsumerMetadataResponse from kafka.util import ( - crc32, read_short_string, read_int_string, relative_unpack, - write_short_string, write_int_string, group_by_topic_and_partition) + crc32, read_short_string, relative_unpack, + write_int_string, group_by_topic_and_partition) log = logging.getLogger(__name__) @@ -320,7 +317,6 @@ class KafkaProtocol(object): for partition, payload in six.iteritems(topic_payloads)]) for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))]) - @classmethod def decode_offset_commit_response(cls, response): """ diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 36f03ca..bfad127 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -6,7 +6,6 @@ import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka) -from . import pickle from .struct import Struct from .types import ( Int8, Int32, Int64, Bytes, Schema, AbstractType @@ -36,7 +35,7 @@ class Message(Struct): CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 TIMESTAMP_TYPE_MASK = 0x08 - HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) + HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) def __init__(self, value, key=None, magic=0, attributes=0, crc=0, timestamp=None): @@ -127,7 +126,7 @@ class Message(Struct): else: raw_bytes = lz4_decode(self.value) else: - raise Exception('This should be impossible') + raise Exception('This should be impossible') return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes)) @@ -145,7 +144,7 @@ class MessageSet(AbstractType): ('offset', Int64), ('message', Bytes) ) - HEADER_SIZE = 12 # offset + message_size + HEADER_SIZE = 12 # offset + message_size @classmethod def encode(cls, items): diff --git a/kafka/protocol/struct.py b/kafka/protocol/struct.py index 602cfb8..a3d28d7 100644 --- a/kafka/protocol/struct.py +++ b/kafka/protocol/struct.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -#from collections import namedtuple from io import BytesIO from .abstract import AbstractType @@ -23,7 +22,7 @@ class Struct(AbstractType): self.encode = self._encode_self @classmethod - def encode(cls, item): # pylint: disable=E0202 + def encode(cls, item): # pylint: disable=E0202 bits = [] for i, field in enumerate(cls.SCHEMA.fields): bits.append(field.encode(item[i])) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index fea3f7d..984de88 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -6,11 +6,9 @@ import pytest from kafka.client_async import KafkaClient from kafka.consumer.fetcher import Fetcher from kafka.consumer.subscription_state import SubscriptionState -import kafka.errors as Errors -from kafka.future import Future from kafka.metrics import Metrics from kafka.protocol.fetch import FetchRequest -from kafka.structs import TopicPartition, OffsetAndMetadata +from kafka.structs import TopicPartition @pytest.fixture @@ -51,7 +49,7 @@ def test_send_fetches(fetcher, mocker): ] mocker.patch.object(fetcher, '_create_fetch_requests', - return_value = dict(enumerate(fetch_requests))) + return_value=dict(enumerate(fetch_requests))) ret = fetcher.send_fetches() for node, request in enumerate(fetch_requests): |