summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2017-10-08 10:00:39 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-08 10:00:39 -0700
commit87e5d1625968c214d2ad6c198ec526ef484f3688 (patch)
tree8a7ba1952b19a92c3abeb26cf65e7ebc6a4cd1e2
parent8f211805c40a446c74c62a8b3558c75a33eaa161 (diff)
downloadkafka-python-87e5d1625968c214d2ad6c198ec526ef484f3688.tar.gz
Expand metrics docs (#1243)
* Expand metrics docstrings * Document metrics interface in readme * Use six.iteritems(d) rather than d.items() * Use Sphinx warning syntax
-rw-r--r--README.rst10
-rw-r--r--kafka/consumer/group.py15
-rw-r--r--kafka/producer/kafka.py21
3 files changed, 33 insertions, 13 deletions
diff --git a/README.rst b/README.rst
index 6e9a507..d4fc1a9 100644
--- a/README.rst
+++ b/README.rst
@@ -70,6 +70,8 @@ that expose basic message attributes: topic, partition, offset, key, and value:
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
+>>> # Get consumer metrics
+>>> metrics = consumer.metrics()
KafkaProducer
*************
@@ -110,6 +112,9 @@ for more details.
>>> for i in range(1000):
... producer.send('foobar', b'msg %d' % i)
+>>> # Get producer performance metrics
+>>> metrics = producer.metrics()
+
Thread safety
*************
@@ -122,8 +127,8 @@ multiprocessing is recommended.
Compression
***********
-kafka-python supports gzip compression/decompression natively. To produce or consume lz4
-compressed messages, you should install python-lz4 (pip install lz4).
+kafka-python supports gzip compression/decompression natively. To produce or consume lz4
+compressed messages, you should install python-lz4 (pip install lz4).
To enable snappy compression/decompression install python-snappy (also requires snappy library).
See <https://kafka-python.readthedocs.io/en/master/install.html#optional-snappy-install>
for more information.
@@ -138,7 +143,6 @@ leveraged to enable a KafkaClient.check_version() method that
probes a kafka broker and attempts to identify which version it is running
(0.8.0 to 0.11).
-
Low-level
*********
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index b7fbd83..a83d5da 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -846,13 +846,20 @@ class KafkaConsumer(six.Iterator):
log.debug("Unsubscribed all topics or patterns and assigned partitions")
def metrics(self, raw=False):
- """Warning: this is an unstable interface.
- It may change in future releases without warning"""
+ """Get metrics on consumer performance.
+
+ This is ported from the Java Consumer, for details see:
+ https://kafka.apache.org/documentation/#new_consumer_monitoring
+
+ Warning:
+ This is an unstable interface. It may change in future
+ releases without warning.
+ """
if raw:
return self._metrics.metrics
metrics = {}
- for k, v in self._metrics.metrics.items():
+ for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]:
@@ -897,7 +904,7 @@ class KafkaConsumer(six.Iterator):
raise UnsupportedVersionError(
"offsets_for_times API not supported for cluster version {}"
.format(self.config['api_version']))
- for tp, ts in timestamps.items():
+ for tp, ts in six.iteritems(timestamps):
timestamps[tp] = int(ts)
if ts < 0:
raise ValueError(
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 09ca744..de9dcd2 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -8,6 +8,8 @@ import threading
import time
import weakref
+from ..vendor import six
+
from .. import errors as Errors
from ..client_async import KafkaClient, selectors
from ..metrics import MetricConfig, Metrics
@@ -566,10 +568,10 @@ class KafkaProducer(object):
Arguments:
timeout (float, optional): timeout in seconds to wait for completion.
-
+
Raises:
- KafkaTimeoutError: failure to flush buffered records within the
- provided timeout
+ KafkaTimeoutError: failure to flush buffered records within the
+ provided timeout
"""
log.debug("Flushing accumulated records in producer.") # trace
self._accumulator.begin_flush()
@@ -655,13 +657,20 @@ class KafkaProducer(object):
available)
def metrics(self, raw=False):
- """Warning: this is an unstable interface.
- It may change in future releases without warning"""
+ """Get metrics on producer performance.
+
+ This is ported from the Java Producer, for details see:
+ https://kafka.apache.org/documentation/#producer_monitoring
+
+ Warning:
+ This is an unstable interface. It may change in future
+ releases without warning.
+ """
if raw:
return self._metrics.metrics
metrics = {}
- for k, v in self._metrics.metrics.items():
+ for k, v in six.iteritems(self._metrics.metrics):
if k.group not in metrics:
metrics[k.group] = {}
if k.name not in metrics[k.group]: