summaryrefslogtreecommitdiff
path: root/kafka/producer
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer')
-rw-r--r--kafka/producer/base.py4
-rw-r--r--kafka/producer/future.py2
-rw-r--r--kafka/producer/kafka.py10
-rw-r--r--kafka/producer/keyed.py2
-rw-r--r--kafka/producer/record_accumulator.py6
-rw-r--r--kafka/producer/simple.py2
6 files changed, 13 insertions, 13 deletions
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index 1da74c8..b323966 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -316,7 +316,7 @@ class Producer(object):
if codec is None:
codec = CODEC_NONE
elif codec not in ALL_CODECS:
- raise UnsupportedCodecError("Codec 0x%02x unsupported" % codec)
+ raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
self.codec = codec
self.codec_compresslevel = codec_compresslevel
@@ -419,7 +419,7 @@ class Producer(object):
raise AsyncProducerQueueFull(
msg[idx:],
'Producer async queue overfilled. '
- 'Current queue size %d.' % self.queue.qsize())
+ 'Current queue size %d.' % (self.queue.qsize(),))
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 1c5d6d7..f67db09 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -59,7 +59,7 @@ class FutureRecordMetadata(Future):
def get(self, timeout=None):
if not self.is_done and not self._produce_future.wait(timeout):
raise Errors.KafkaTimeoutError(
- "Timeout after waiting for %s secs." % timeout)
+ "Timeout after waiting for %s secs." % (timeout,))
assert self.is_done
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 45bb058..685c3f9 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -340,11 +340,11 @@ class KafkaProducer(object):
self.config[key] = configs.pop(key)
# Only check for extra config keys in top-level class
- assert not configs, 'Unrecognized configs: %s' % configs
+ assert not configs, 'Unrecognized configs: %s' % (configs,)
if self.config['client_id'] is None:
self.config['client_id'] = 'kafka-python-producer-%s' % \
- PRODUCER_CLIENT_ID_SEQUENCE.increment()
+ (PRODUCER_CLIENT_ID_SEQUENCE.increment(),)
if self.config['acks'] == 'all':
self.config['acks'] = -1
@@ -633,12 +633,12 @@ class KafkaProducer(object):
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the maximum request size you have configured with the"
- " max_request_size configuration" % size)
+ " max_request_size configuration" % (size,))
if size > self.config['buffer_memory']:
raise Errors.MessageSizeTooLargeError(
"The message is %d bytes when serialized which is larger than"
" the total memory buffer you have configured with the"
- " buffer_memory configuration." % size)
+ " buffer_memory configuration." % (size,))
def _wait_on_metadata(self, topic, max_wait):
"""
@@ -679,7 +679,7 @@ class KafkaProducer(object):
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
- "Failed to update metadata after %.1f secs." % max_wait)
+ "Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index 62bb733..3ba9216 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -46,4 +46,4 @@ class KeyedProducer(Producer):
return self.send_messages(topic, key, msg)
def __repr__(self):
- return '<KeyedProducer batch=%s>' % self.async_send
+ return '<KeyedProducer batch=%s>' % (self.async_send,)
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 728bf18..eeb928d 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -102,11 +102,11 @@ class ProducerBatch(object):
error = None
if not self.in_retry() and is_full and timeout < since_append:
- error = "%d seconds have passed since last append" % since_append
+ error = "%d seconds have passed since last append" % (since_append,)
elif not self.in_retry() and timeout < since_ready:
- error = "%d seconds have passed since batch creation plus linger time" % since_ready
+ error = "%d seconds have passed since batch creation plus linger time" % (since_ready,)
elif self.in_retry() and timeout < since_backoff:
- error = "%d seconds have passed since last attempt plus backoff time" % since_backoff
+ error = "%d seconds have passed since last attempt plus backoff time" % (since_backoff,)
if error:
self.records.close()
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
index e06e659..f334a49 100644
--- a/kafka/producer/simple.py
+++ b/kafka/producer/simple.py
@@ -51,4 +51,4 @@ class SimpleProducer(Producer):
)
def __repr__(self):
- return '<SimpleProducer batch=%s>' % self.async_send
+ return '<SimpleProducer batch=%s>' % (self.async_send,)