summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-11-13 11:57:45 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-11-18 00:21:18 -0800
commit1d443638e22c2d360086b8d7cee8b5d930741d12 (patch)
tree9e60f53f4a791b306acd6f1a2223557ead1ed750
parentf3105a434f3bd2fb3f8899e4861e187e786b03da (diff)
downloadkafka-python-1d443638e22c2d360086b8d7cee8b5d930741d12.tar.gz
Be explicit with tuples for %s formatting
Fix #1633
-rw-r--r--kafka/admin/kafka.py2
-rw-r--r--kafka/client.py2
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/consumer/fetcher.py14
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--kafka/consumer/simple.py2
-rw-r--r--kafka/consumer/subscription_state.py2
-rw-r--r--kafka/coordinator/consumer.py6
-rw-r--r--kafka/metrics/metrics.py2
-rw-r--r--kafka/metrics/stats/percentiles.py2
-rw-r--r--kafka/metrics/stats/rate.py2
-rw-r--r--kafka/metrics/stats/sensor.py2
-rw-r--r--kafka/producer/base.py4
-rw-r--r--kafka/producer/future.py2
-rw-r--r--kafka/producer/kafka.py10
-rw-r--r--kafka/producer/keyed.py2
-rw-r--r--kafka/producer/record_accumulator.py6
-rw-r--r--kafka/producer/simple.py2
-rw-r--r--kafka/protocol/legacy.py2
-rw-r--r--kafka/protocol/message.py4
-rw-r--r--kafka/protocol/parser.py2
-rw-r--r--kafka/record/legacy_records.py2
-rw-r--r--test/fixtures.py8
-rw-r--r--test/test_metrics.py2
-rw-r--r--test/test_producer.py2
-rw-r--r--test/testutil.py2
26 files changed, 45 insertions, 45 deletions
diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py
index 01db6a9..7aceea1 100644
--- a/kafka/admin/kafka.py
+++ b/kafka/admin/kafka.py
@@ -169,7 +169,7 @@ class KafkaAdmin(object):
log.debug("Starting Kafka administration interface")
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
- raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
+ raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))
self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
diff --git a/kafka/client.py b/kafka/client.py
index 789d4da..148cae0 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -174,7 +174,7 @@ class SimpleClient(object):
return decoder_fn(future.value)
- raise KafkaUnavailableError('All servers failed to process request: %s' % hosts)
+ raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,))
def _payloads_by_broker(self, payloads):
payloads_by_broker = collections.defaultdict(list)
diff --git a/kafka/client_async.py b/kafka/client_async.py
index bf395c5..cf57ef9 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -355,7 +355,7 @@ class KafkaClient(object):
conn = self._conns.get(node_id)
if conn is None:
- assert broker, 'Broker id %s not in current metadata' % node_id
+ assert broker, 'Broker id %s not in current metadata' % (node_id,)
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index 7d58b7c..3638831 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -298,7 +298,7 @@ class Fetcher(six.Iterator):
remaining_ms = timeout_ms - elapsed_ms
raise Errors.KafkaTimeoutError(
- "Failed to get offsets by timestamps in %s ms" % timeout_ms)
+ "Failed to get offsets by timestamps in %s ms" % (timeout_ms,))
def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
@@ -911,7 +911,7 @@ class FetchResponseMetricAggregator(object):
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):
self.metrics = metrics
- self.group_name = '%s-fetch-manager-metrics' % prefix
+ self.group_name = '%s-fetch-manager-metrics' % (prefix,)
self.bytes_fetched = metrics.sensor('bytes-fetched')
self.bytes_fetched.add(metrics.metric_name('fetch-size-avg', self.group_name,
@@ -955,15 +955,15 @@ class FetchManagerMetrics(object):
bytes_fetched = self.metrics.sensor(name)
bytes_fetched.add(self.metrics.metric_name('fetch-size-avg',
self.group_name,
- 'The average number of bytes fetched per request for topic %s' % topic,
+ 'The average number of bytes fetched per request for topic %s' % (topic,),
metric_tags), Avg())
bytes_fetched.add(self.metrics.metric_name('fetch-size-max',
self.group_name,
- 'The maximum number of bytes fetched per request for topic %s' % topic,
+ 'The maximum number of bytes fetched per request for topic %s' % (topic,),
metric_tags), Max())
bytes_fetched.add(self.metrics.metric_name('bytes-consumed-rate',
self.group_name,
- 'The average number of bytes consumed per second for topic %s' % topic,
+ 'The average number of bytes consumed per second for topic %s' % (topic,),
metric_tags), Rate())
bytes_fetched.record(num_bytes)
@@ -976,10 +976,10 @@ class FetchManagerMetrics(object):
records_fetched = self.metrics.sensor(name)
records_fetched.add(self.metrics.metric_name('records-per-request-avg',
self.group_name,
- 'The average number of records in each request for topic %s' % topic,
+ 'The average number of records in each request for topic %s' % (topic,),
metric_tags), Avg())
records_fetched.add(self.metrics.metric_name('records-consumed-rate',
self.group_name,
- 'The average number of records consumed per second for topic %s' % topic,
+ 'The average number of records consumed per second for topic %s' % (topic,),
metric_tags), Rate())
records_fetched.record(num_records)
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 279cce0..8727de7 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -309,7 +309,7 @@ class KafkaConsumer(six.Iterator):
# Only check for extra config keys in top-level class
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
- raise KafkaConfigurationError("Unrecognized configs: %s" % extra_configs)
+ raise KafkaConfigurationError("Unrecognized configs: %s" % (extra_configs,))
self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
index b60a586..a6a64a5 100644
--- a/kafka/consumer/simple.py
+++ b/kafka/consumer/simple.py
@@ -247,7 +247,7 @@ class SimpleConsumer(Consumer):
self.offsets[resp.partition] = \
resp.offsets[0] + deltas[resp.partition]
else:
- raise ValueError('Unexpected value for `whence`, %d' % whence)
+ raise ValueError('Unexpected value for `whence`, %d' % (whence,))
# Reset queue and fetch offsets since they are invalid
self.fetch_offsets = self.offsets.copy()
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index 10d722e..4b0b275 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -247,7 +247,7 @@ class SubscriptionState(object):
for tp in assignments:
if tp.topic not in self.subscription:
- raise ValueError("Assigned partition %s for non-subscribed topic." % str(tp))
+ raise ValueError("Assigned partition %s for non-subscribed topic." % (tp,))
# after rebalancing, we always reinitialize the assignment state
self.assignment.clear()
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 647a6b5..14eee0f 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -216,7 +216,7 @@ class ConsumerCoordinator(BaseCoordinator):
self._assignment_snapshot = None
assignor = self._lookup_assignor(protocol)
- assert assignor, 'Coordinator selected invalid assignment protocol: %s' % protocol
+ assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)
assignment = ConsumerProtocol.ASSIGNMENT.decode(member_assignment_bytes)
@@ -297,7 +297,7 @@ class ConsumerCoordinator(BaseCoordinator):
def _perform_assignment(self, leader_id, assignment_strategy, members):
assignor = self._lookup_assignor(assignment_strategy)
- assert assignor, 'Invalid assignment protocol: %s' % assignment_strategy
+ assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,)
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:
@@ -804,7 +804,7 @@ class ConsumerCoordinator(BaseCoordinator):
class ConsumerCoordinatorMetrics(object):
def __init__(self, metrics, metric_group_prefix, subscription):
self.metrics = metrics
- self.metric_group_name = '%s-coordinator-metrics' % metric_group_prefix
+ self.metric_group_name = '%s-coordinator-metrics' % (metric_group_prefix,)
self.commit_latency = metrics.sensor('commit-latency')
self.commit_latency.add(metrics.metric_name(
diff --git a/kafka/metrics/metrics.py b/kafka/metrics/metrics.py
index f2e99ed..2c53488 100644
--- a/kafka/metrics/metrics.py
+++ b/kafka/metrics/metrics.py
@@ -225,7 +225,7 @@ class Metrics(object):
with self._lock:
if metric.metric_name in self.metrics:
raise ValueError('A metric named "%s" already exists, cannot'
- ' register another one.' % metric.metric_name)
+ ' register another one.' % (metric.metric_name,))
self.metrics[metric.metric_name] = metric
for reporter in self._reporters:
reporter.metric_change(metric)
diff --git a/kafka/metrics/stats/percentiles.py b/kafka/metrics/stats/percentiles.py
index b55c5ac..6d702e8 100644
--- a/kafka/metrics/stats/percentiles.py
+++ b/kafka/metrics/stats/percentiles.py
@@ -27,7 +27,7 @@ class Percentiles(AbstractSampledStat, AbstractCompoundStat):
' to be 0.0.')
self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
else:
- ValueError('Unknown bucket type: %s' % bucketing)
+ ValueError('Unknown bucket type: %s' % (bucketing,))
def stats(self):
measurables = []
diff --git a/kafka/metrics/stats/rate.py b/kafka/metrics/stats/rate.py
index 810c543..68393fb 100644
--- a/kafka/metrics/stats/rate.py
+++ b/kafka/metrics/stats/rate.py
@@ -101,7 +101,7 @@ class Rate(AbstractMeasurableStat):
elif self._unit == TimeUnit.DAYS:
return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
else:
- raise ValueError('Unknown unit: %s' % self._unit)
+ raise ValueError('Unknown unit: %s' % (self._unit,))
class SampledTotal(AbstractSampledStat):
diff --git a/kafka/metrics/stats/sensor.py b/kafka/metrics/stats/sensor.py
index 73a4665..571723f 100644
--- a/kafka/metrics/stats/sensor.py
+++ b/kafka/metrics/stats/sensor.py
@@ -35,7 +35,7 @@ class Sensor(object):
"""Validate that this sensor doesn't end up referencing itself."""
if self in sensors:
raise ValueError('Circular dependency in sensors: %s is its own'
- 'parent.' % self.name)
+ 'parent.' % (self.name,))
sensors.add(self)
for parent in self._parents:
parent._check_forest(sensors)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 1da74c8..b323966 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -316,7 +316,7 @@ class Producer(object):
if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
- raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
self.codec = codec
self.codec_compresslevel = codec_compresslevel
@@ -419,7 +419,7 @@ class Producer(object):
raise AsyncProducerQueueFull(
msg[idx:],
'Producer async queue overfilled. '
- 'Current queue size %d.' % self.queue.qsize())
+ 'Current queue size %d.' % (self.queue.qsize(),))
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 1c5d6d7..f67db09 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -59,7 +59,7 @@ class FutureRecordMetadata(Future):
def get(self, timeout=None):
if not self.is_done and not self._produce_future.wait(timeout):
raise Errors.KafkaTimeoutError(
- "Timeout after waiting for %s secs." % timeout)
+ "Timeout after waiting for %s secs." % (timeout,))
assert self.is_done
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 45bb058..685c3f9 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -340,11 +340,11 @@ class KafkaProducer(object):
self.config[key] = configs.pop(key)
# Only check for extra config keys in top-level class
- assert not configs, 'Unrecognized configs: %s' % configs
+ assert not configs, 'Unrecognized configs: %s' % (configs,)
if self.config['client_id'] is None:
self.config['client_id'] = 'kafka-python-producer-%s' % \
- PRODUCER_CLIENT_ID_SEQUENCE.increment()
+ (PRODUCER_CLIENT_ID_SEQUENCE.increment(),)
if self.config['acks'] == 'all':
self.config['acks'] = -1
@@ -633,12 +633,12 @@ class KafkaProducer(object):
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
- " max_request_size configuration" % size)
+ " max_request_size configuration" % (size,))
if size > self.config['buffer_memory']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the total memory buffer you have configured with the"
- " buffer_memory configuration." % size)
+ " buffer_memory configuration." % (size,))
def _wait_on_metadata(self, topic, max_wait):
"""
@@ -679,7 +679,7 @@ class KafkaProducer(object):
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
- "Failed to update metadata after %.1f secs." % max_wait)
+ "Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 62bb733..3ba9216 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -46,4 +46,4 @@ class KeyedProducer(Producer):
return self.send_messages(topic, key, msg)
def __repr__(self):
- return '<KeyedProducer batch=%s>' % self.async_send
+ return '<KeyedProducer batch=%s>' % (self.async_send,)
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 728bf18..eeb928d 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -102,11 +102,11 @@ class ProducerBatch(object):
error = None
if not self.in_retry() and is_full and timeout < since_append:
- error = "%d seconds have passed since last append" % since_append
+ error = "%d seconds have passed since last append" % (since_append,)
elif not self.in_retry() and timeout < since_ready:
- error = "%d seconds have passed since batch creation plus linger time" % since_ready
+ error = "%d seconds have passed since batch creation plus linger time" % (since_ready,)
elif self.in_retry() and timeout < since_backoff:
- error = "%d seconds have passed since last attempt plus backoff time" % since_backoff
+ error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,)
if error:
self.records.close()
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index e06e659..f334a49 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -51,4 +51,4 @@ class SimpleProducer(Producer):
)
def __repr__(self):
- return '<SimpleProducer batch=%s>' % self.async_send
+ return '<SimpleProducer batch=%s>' % (self.async_send,)
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
index 7dd2580..2e8f5bc 100644
--- a/kafka/protocol/legacy.py
+++ b/kafka/protocol/legacy.py
@@ -471,4 +471,4 @@ def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None)
elif codec == CODEC_SNAPPY:
return [create_snappy_message(messages, key)]
else:
- raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 19dcbd9..31527bf 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -77,7 +77,7 @@ class Message(Struct):
elif version == 0:
fields = (self.crc, self.magic, self.attributes, self.key, self.value)
else:
- raise ValueError('Unrecognized message version: %s' % version)
+ raise ValueError('Unrecognized message version: %s' % (version,))
message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
@@ -143,7 +143,7 @@ class Message(Struct):
class PartialMessage(bytes):
def __repr__(self):
- return 'PartialMessage(%s)' % self
+ return 'PartialMessage(%s)' % (self,)
class MessageSet(AbstractType):
diff --git a/kafka/protocol/parser.py b/kafka/protocol/parser.py
index 4d77bb3..a99b3ae 100644
--- a/kafka/protocol/parser.py
+++ b/kafka/protocol/parser.py
@@ -136,7 +136,7 @@ class KafkaProtocol(object):
raise Errors.CorrelationIdError(
'No in-flight-request found for server response'
' with correlation ID %d'
- % recv_correlation_id)
+ % (recv_correlation_id,))
(correlation_id, request) = self.in_flight_requests.popleft()
diff --git a/kafka/record/legacy_records.py b/kafka/record/legacy_records.py
index 1bdba81..bb6c21c 100644
--- a/kafka/record/legacy_records.py
+++ b/kafka/record/legacy_records.py
@@ -254,7 +254,7 @@ class LegacyRecordBatch(ABCRecordBatch, LegacyRecordBase):
# There should only ever be a single layer of compression
assert not attrs & self.CODEC_MASK, (
'MessageSet at offset %d appears double-compressed. This '
- 'should not happen -- check your producers!' % offset)
+ 'should not happen -- check your producers!' % (offset,))
# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
diff --git a/test/fixtures.py b/test/fixtures.py
index 76e3071..6f7fc3f 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -102,7 +102,7 @@ class Fixture(object):
def kafka_run_class_env(self):
env = os.environ.copy()
env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % \
- self.test_resource("log4j.properties")
+ (self.test_resource("log4j.properties"),)
return env
@classmethod
@@ -110,7 +110,7 @@ class Fixture(object):
log.info('Rendering %s from template %s', target_file.strpath, source_file)
with open(source_file, "r") as handle:
template = handle.read()
- assert len(template) > 0, 'Empty template %s' % source_file
+ assert len(template) > 0, 'Empty template %s' % (source_file,)
with open(target_file.strpath, "w") as handle:
handle.write(template.format(**binding))
handle.flush()
@@ -257,7 +257,7 @@ class KafkaFixture(Fixture):
# TODO: checking for port connection would be better than scanning logs
# until then, we need the pattern to work across all supported broker versions
# The logging format changed slightly in 1.0.0
- self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % broker_id
+ self.start_pattern = r"\[Kafka ?Server (id=)?%d\],? started" % (broker_id,)
self.zookeeper = zookeeper
self.zk_chroot = zk_chroot
@@ -291,7 +291,7 @@ class KafkaFixture(Fixture):
"%s:%d" % (self.zookeeper.host,
self.zookeeper.port),
"create",
- "/%s" % self.zk_chroot,
+ "/%s" % (self.zk_chroot,),
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
diff --git a/test/test_metrics.py b/test/test_metrics.py
index 8d35f55..308ea58 100644
--- a/test/test_metrics.py
+++ b/test/test_metrics.py
@@ -469,7 +469,7 @@ def test_reporter(metrics):
for key in list(expected.keys()):
metrics = expected.pop(key)
- expected['foo.%s' % key] = metrics
+ expected['foo.%s' % (key,)] = metrics
assert expected == foo_reporter.snapshot()
diff --git a/test/test_producer.py b/test/test_producer.py
index 16da618..d6b94e8 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -65,7 +65,7 @@ def test_end_to_end(kafka_broker, compression):
except StopIteration:
break
- assert msgs == set(['msg %d' % i for i in range(messages)])
+ assert msgs == set(['msg %d' % (i,) for i in range(messages)])
consumer.close()
diff --git a/test/testutil.py b/test/testutil.py
index 6f6cafb..a8227cf 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -32,7 +32,7 @@ def kafka_versions(*versions):
op_str = s[0:2] # >= <=
v_str = s[2:]
else:
- raise ValueError('Unrecognized kafka version / operator: %s' % s)
+ raise ValueError('Unrecognized kafka version / operator: %s' % (s,))
op_map = {
'=': operator.eq,