summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-10-11 12:03:22 -0700
committerGitHub <noreply@github.com>2019-10-11 12:03:22 -0700
commit3631bfa009a28767a2057c9beee470acaa6597d5 (patch)
treee10b73861a33d83a95b6496ef3074ee3caeaae41
parent6d3800ca9f45fd953689a1787fc90a5e566e34ea (diff)
downloadkafka-python-3631bfa009a28767a2057c9beee470acaa6597d5.tar.gz
Remove SimpleClient, Producer, Consumer, Unittest (#1196)
In the 2.0 release, we're removing: * `SimpleClient` * `SimpleConsumer` * `SimpleProducer` * Old partitioners used by `SimpleProducer`; these are superceded by the `DefaultPartitioner` These have been deprecated for several years in favor of `KafkaClient` / `KafkaConsumer` / `KafkaProducer`. Since 2.0 allows breaking changes, we are removing the deprecated classes. Additionally, since the only usage of `unittest` was in tests for these old Simple* clients, this also drops `unittest` from the library. All tests now run under `pytest`.
-rw-r--r--README.rst7
-rw-r--r--docs/apidoc/SimpleProducer.rst14
-rw-r--r--docs/apidoc/kafka.consumer.rst46
-rw-r--r--docs/apidoc/kafka.coordinator.assignors.rst30
-rw-r--r--docs/apidoc/kafka.coordinator.rst45
-rw-r--r--docs/apidoc/kafka.partitioner.rst38
-rw-r--r--docs/apidoc/kafka.producer.rst38
-rw-r--r--docs/apidoc/kafka.protocol.rst126
-rw-r--r--docs/apidoc/kafka.rst89
-rw-r--r--docs/index.rst8
-rw-r--r--docs/simple.rst162
-rw-r--r--docs/tests.rst2
-rw-r--r--kafka/__init__.py28
-rw-r--r--kafka/client.py719
-rw-r--r--kafka/common.py4
-rw-r--r--kafka/consumer/__init__.py4
-rw-r--r--kafka/consumer/base.py232
-rw-r--r--kafka/consumer/group.py25
-rw-r--r--kafka/consumer/multiprocess.py295
-rw-r--r--kafka/consumer/simple.py444
-rw-r--r--kafka/context.py178
-rw-r--r--kafka/errors.py16
-rw-r--r--kafka/partitioner/__init__.py8
-rw-r--r--kafka/partitioner/base.py27
-rw-r--r--kafka/partitioner/default.py72
-rw-r--r--kafka/partitioner/hashed.py118
-rw-r--r--kafka/partitioner/roundrobin.py70
-rw-r--r--kafka/producer/__init__.py5
-rw-r--r--kafka/producer/base.py482
-rw-r--r--kafka/producer/keyed.py49
-rw-r--r--kafka/producer/simple.py54
-rw-r--r--kafka/protocol/__init__.py6
-rw-r--r--kafka/protocol/legacy.py474
-rw-r--r--kafka/structs.py69
-rw-r--r--kafka/util.py108
-rw-r--r--setup.py2
-rw-r--r--test/__init__.py7
-rw-r--r--test/conftest.py9
-rw-r--r--test/fixtures.py7
-rw-r--r--test/test_client.py405
-rw-r--r--test/test_client_integration.py95
-rw-r--r--test/test_consumer.py135
-rw-r--r--test/test_consumer_integration.py498
-rw-r--r--test/test_context.py117
-rw-r--r--test/test_failover_integration.py240
-rw-r--r--test/test_package.py18
-rw-r--r--test/test_partitioner.py39
-rw-r--r--test/test_producer_integration.py529
-rw-r--r--test/test_producer_legacy.py257
-rw-r--r--test/test_protocol_legacy.py848
-rw-r--r--test/test_util.py85
-rw-r--r--test/testutil.py105
-rw-r--r--tox.ini2
53 files changed, 98 insertions, 7392 deletions
diff --git a/README.rst b/README.rst
index 40cd55c..f8947eb 100644
--- a/README.rst
+++ b/README.rst
@@ -151,10 +151,3 @@ testing, probing, and general experimentation. The protocol support is
leveraged to enable a KafkaClient.check_version() method that
probes a kafka broker and attempts to identify which version it is running
(0.8.0 to 2.3+).
-
-Low-level
-*********
-
-Legacy support is maintained for low-level consumer and producer classes,
-SimpleConsumer and SimpleProducer. See
-<https://kafka-python.readthedocs.io/en/master/simple.html?highlight=SimpleProducer> for API details.
diff --git a/docs/apidoc/SimpleProducer.rst b/docs/apidoc/SimpleProducer.rst
deleted file mode 100644
index a509858..0000000
--- a/docs/apidoc/SimpleProducer.rst
+++ /dev/null
@@ -1,14 +0,0 @@
-SimpleProducer
-==============
-
-.. autoclass:: kafka.producer.SimpleProducer
- :members:
- :show-inheritance:
-
-.. autoclass:: kafka.producer.KeyedProducer
- :members:
- :show-inheritance:
-
-.. automodule:: kafka.producer.base
- :members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.consumer.rst b/docs/apidoc/kafka.consumer.rst
deleted file mode 100644
index 8595f99..0000000
--- a/docs/apidoc/kafka.consumer.rst
+++ /dev/null
@@ -1,46 +0,0 @@
-kafka.consumer package
-======================
-
-Submodules
-----------
-
-kafka.consumer.base module
---------------------------
-
-.. automodule:: kafka.consumer.base
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.consumer.kafka module
----------------------------
-
-.. automodule:: kafka.consumer.kafka
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.consumer.multiprocess module
-----------------------------------
-
-.. automodule:: kafka.consumer.multiprocess
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.consumer.simple module
-----------------------------
-
-.. automodule:: kafka.consumer.simple
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka.consumer
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.coordinator.assignors.rst b/docs/apidoc/kafka.coordinator.assignors.rst
deleted file mode 100644
index 87b9f84..0000000
--- a/docs/apidoc/kafka.coordinator.assignors.rst
+++ /dev/null
@@ -1,30 +0,0 @@
-kafka.coordinator.assignors package
-===================================
-
-Submodules
-----------
-
-kafka.coordinator.assignors.abstract module
--------------------------------------------
-
-.. automodule:: kafka.coordinator.assignors.abstract
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.coordinator.assignors.roundrobin module
----------------------------------------------
-
-.. automodule:: kafka.coordinator.assignors.roundrobin
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka.coordinator.assignors
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.coordinator.rst b/docs/apidoc/kafka.coordinator.rst
deleted file mode 100644
index e15f638..0000000
--- a/docs/apidoc/kafka.coordinator.rst
+++ /dev/null
@@ -1,45 +0,0 @@
-kafka.coordinator package
-=========================
-
-Subpackages
------------
-
-.. toctree::
-
- kafka.coordinator.assignors
-
-Submodules
-----------
-
-kafka.coordinator.base module
------------------------------
-
-.. automodule:: kafka.coordinator.base
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.coordinator.consumer module
----------------------------------
-
-.. automodule:: kafka.coordinator.consumer
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.coordinator.heartbeat module
-----------------------------------
-
-.. automodule:: kafka.coordinator.heartbeat
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka.coordinator
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.partitioner.rst b/docs/apidoc/kafka.partitioner.rst
deleted file mode 100644
index ea215f1..0000000
--- a/docs/apidoc/kafka.partitioner.rst
+++ /dev/null
@@ -1,38 +0,0 @@
-kafka.partitioner package
-=========================
-
-Submodules
-----------
-
-kafka.partitioner.base module
------------------------------
-
-.. automodule:: kafka.partitioner.base
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.partitioner.hashed module
--------------------------------
-
-.. automodule:: kafka.partitioner.hashed
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.partitioner.roundrobin module
------------------------------------
-
-.. automodule:: kafka.partitioner.roundrobin
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka.partitioner
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.producer.rst b/docs/apidoc/kafka.producer.rst
deleted file mode 100644
index bd850bb..0000000
--- a/docs/apidoc/kafka.producer.rst
+++ /dev/null
@@ -1,38 +0,0 @@
-kafka.producer package
-======================
-
-Submodules
-----------
-
-kafka.producer.base module
---------------------------
-
-.. automodule:: kafka.producer.base
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.producer.keyed module
----------------------------
-
-.. automodule:: kafka.producer.keyed
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.producer.simple module
-----------------------------
-
-.. automodule:: kafka.producer.simple
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka.producer
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.protocol.rst b/docs/apidoc/kafka.protocol.rst
deleted file mode 100644
index 4e69aaf..0000000
--- a/docs/apidoc/kafka.protocol.rst
+++ /dev/null
@@ -1,126 +0,0 @@
-kafka.protocol package
-======================
-
-Submodules
-----------
-
-kafka.protocol.abstract module
-------------------------------
-
-.. automodule:: kafka.protocol.abstract
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.admin module
----------------------------
-
-.. automodule:: kafka.protocol.admin
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.api module
--------------------------
-
-.. automodule:: kafka.protocol.api
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.commit module
-----------------------------
-
-.. automodule:: kafka.protocol.commit
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.fetch module
----------------------------
-
-.. automodule:: kafka.protocol.fetch
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.group module
----------------------------
-
-.. automodule:: kafka.protocol.group
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.legacy module
-----------------------------
-
-.. automodule:: kafka.protocol.legacy
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.message module
------------------------------
-
-.. automodule:: kafka.protocol.message
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.metadata module
-------------------------------
-
-.. automodule:: kafka.protocol.metadata
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.offset module
-----------------------------
-
-.. automodule:: kafka.protocol.offset
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.pickle module
-----------------------------
-
-.. automodule:: kafka.protocol.pickle
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.produce module
------------------------------
-
-.. automodule:: kafka.protocol.produce
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.struct module
-----------------------------
-
-.. automodule:: kafka.protocol.struct
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol.types module
----------------------------
-
-.. automodule:: kafka.protocol.types
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka.protocol
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/apidoc/kafka.rst b/docs/apidoc/kafka.rst
deleted file mode 100644
index a29e063..0000000
--- a/docs/apidoc/kafka.rst
+++ /dev/null
@@ -1,89 +0,0 @@
-kafka package
-=============
-
-Subpackages
------------
-
-.. toctree::
-
- kafka.cluster
- kafka.consumer
- kafka.partitioner
- kafka.producer
-
-Submodules
-----------
-
-kafka.cluster module
---------------------
-
-.. automodule:: kafka.cluster
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-kafka.client module
--------------------
-
-.. automodule:: kafka.client
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.codec module
-------------------
-
-.. automodule:: kafka.codec
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.common module
--------------------
-
-.. automodule:: kafka.common
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.conn module
------------------
-
-.. automodule:: kafka.conn
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.context module
---------------------
-
-.. automodule:: kafka.context
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.protocol module
----------------------
-
-.. automodule:: kafka.protocol
- :members:
- :undoc-members:
- :show-inheritance:
-
-kafka.util module
------------------
-
-.. automodule:: kafka.util
- :members:
- :undoc-members:
- :show-inheritance:
-
-
-Module contents
----------------
-
-.. automodule:: kafka
- :members:
- :undoc-members:
- :show-inheritance:
diff --git a/docs/index.rst b/docs/index.rst
index 6fa9a0c..2322471 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -139,20 +139,12 @@ method that probes a kafka broker and
attempts to identify which version it is running (0.8.0 to 2.3+).
-Low-level
-*********
-
-Legacy support is maintained for low-level consumer and producer classes,
-SimpleConsumer and SimpleProducer.
-
-
.. toctree::
:hidden:
:maxdepth: 2
Usage Overview <usage>
API </apidoc/modules>
- Simple Clients [deprecated] <simple>
install
tests
compatibility
diff --git a/docs/simple.rst b/docs/simple.rst
deleted file mode 100644
index afdb975..0000000
--- a/docs/simple.rst
+++ /dev/null
@@ -1,162 +0,0 @@
-Simple APIs (DEPRECATED)
-************************
-
-
-SimpleConsumer (DEPRECATED)
-===========================
-
-.. code:: python
-
- from kafka import SimpleProducer, SimpleClient
-
- # To consume messages
- client = SimpleClient('localhost:9092')
- consumer = SimpleConsumer(client, "my-group", "my-topic")
- for message in consumer:
- # message is raw byte string -- decode if necessary!
- # e.g., for unicode: `message.decode('utf-8')`
- print(message)
-
-
- # Use multiprocessing for parallel consumers
- from kafka import MultiProcessConsumer
-
- # This will split the number of partitions among two processes
- consumer = MultiProcessConsumer(client, "my-group", "my-topic", num_procs=2)
-
- # This will spawn processes such that each handles 2 partitions max
- consumer = MultiProcessConsumer(client, "my-group", "my-topic",
- partitions_per_proc=2)
-
- for message in consumer:
- print(message)
-
- for message in consumer.get_messages(count=5, block=True, timeout=4):
- print(message)
-
- client.close()
-
-
-SimpleProducer (DEPRECATED)
-===========================
-
-Asynchronous Mode
------------------
-
-.. code:: python
-
- from kafka import SimpleProducer, SimpleClient
-
- # To send messages asynchronously
- client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async_send=True)
- producer.send_messages('my-topic', b'async message')
-
- # To send messages in batch. You can use any of the available
- # producers for doing this. The following producer will collect
- # messages in batch and send them to Kafka after 20 messages are
- # collected or every 60 seconds
- # Notes:
- # * If the producer dies before the messages are sent, there will be losses
- # * Call producer.stop() to send the messages and cleanup
- producer = SimpleProducer(client,
- async_send=True,
- batch_send_every_n=20,
- batch_send_every_t=60)
-
-Synchronous Mode
-----------------
-
-.. code:: python
-
- from kafka import SimpleProducer, SimpleClient
-
- # To send messages synchronously
- client = SimpleClient('localhost:9092')
- producer = SimpleProducer(client, async_send=False)
-
- # Note that the application is responsible for encoding messages to type bytes
- producer.send_messages('my-topic', b'some message')
- producer.send_messages('my-topic', b'this method', b'is variadic')
-
- # Send unicode message
- producer.send_messages('my-topic', u'你怎么样?'.encode('utf-8'))
-
- # To wait for acknowledgements
- # ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
- # a local log before sending response
- # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
- # by all in sync replicas before sending a response
- producer = SimpleProducer(client,
- async_send=False,
- req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
- ack_timeout=2000,
- sync_fail_on_error=False)
-
- responses = producer.send_messages('my-topic', b'another message')
- for r in responses:
- logging.info(r.offset)
-
-
-KeyedProducer (DEPRECATED)
-==========================
-
-.. code:: python
-
- from kafka import (
- SimpleClient, KeyedProducer,
- Murmur2Partitioner, RoundRobinPartitioner)
-
- kafka = SimpleClient('localhost:9092')
-
- # HashedPartitioner is default (currently uses python hash())
- producer = KeyedProducer(kafka)
- producer.send_messages(b'my-topic', b'key1', b'some message')
- producer.send_messages(b'my-topic', b'key2', b'this methode')
-
- # Murmur2Partitioner attempts to mirror the java client hashing
- producer = KeyedProducer(kafka, partitioner=Murmur2Partitioner)
-
- # Or just produce round-robin (or just use SimpleProducer)
- producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
-
-
-SimpleClient (DEPRECATED)
-=========================
-
-
-.. code:: python
-
- import time
- from kafka import SimpleClient
- from kafka.errors import LeaderNotAvailableError, NotLeaderForPartitionError
- from kafka.protocol import create_message
- from kafka.structs import ProduceRequestPayload
-
- kafka = SimpleClient('localhost:9092')
- payload = ProduceRequestPayload(topic='my-topic', partition=0,
- messages=[create_message("some message")])
-
- retries = 5
- resps = []
- while retries and not resps:
- retries -= 1
- try:
- resps = kafka.send_produce_request(
- payloads=[payload], fail_on_error=True)
- except LeaderNotAvailableError, NotLeaderForPartitionError:
- kafka.load_metadata_for_topics()
- time.sleep(1)
-
- # Other exceptions you might consider handling:
- # UnknownTopicOrPartitionError, TopicAuthorizationFailedError,
- # RequestTimedOutError, MessageSizeTooLargeError, InvalidTopicError,
- # RecordListTooLargeError, InvalidRequiredAcksError,
- # NotEnoughReplicasError, NotEnoughReplicasAfterAppendError
-
- kafka.close()
-
- resps[0].topic # 'my-topic'
- resps[0].partition # 0
- resps[0].error # 0
- resps[0].offset # offset of the first message sent in this request
diff --git a/docs/tests.rst b/docs/tests.rst
index 5983475..561179c 100644
--- a/docs/tests.rst
+++ b/docs/tests.rst
@@ -7,8 +7,6 @@ Tests
:target: https://travis-ci.org/dpkp/kafka-python
Test environments are managed via tox. The test suite is run via pytest.
-Individual tests are written using unittest, pytest, and in some cases,
-doctest.
Linting is run via pylint, but is generally skipped on pypy due to pylint
compatibility / performance issues.
diff --git a/kafka/__init__.py b/kafka/__init__.py
index cafa043..d5e30af 100644
--- a/kafka/__init__.py
+++ b/kafka/__init__.py
@@ -19,38 +19,16 @@ logging.getLogger(__name__).addHandler(NullHandler())
from kafka.admin import KafkaAdminClient
+from kafka.client_async import KafkaClient
from kafka.consumer import KafkaConsumer
from kafka.consumer.subscription_state import ConsumerRebalanceListener
from kafka.producer import KafkaProducer
from kafka.conn import BrokerConnection
-from kafka.protocol import (
- create_message, create_gzip_message, create_snappy_message)
-from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner, Murmur2Partitioner
from kafka.serializer import Serializer, Deserializer
from kafka.structs import TopicPartition, OffsetAndMetadata
-# To be deprecated when KafkaProducer interface is released
-from kafka.client import SimpleClient
-from kafka.producer import SimpleProducer, KeyedProducer
-
-# deprecated in favor of KafkaConsumer
-from kafka.consumer import SimpleConsumer, MultiProcessConsumer
-
-
-import warnings
-class KafkaClient(SimpleClient):
- def __init__(self, *args, **kwargs):
- warnings.warn('The legacy KafkaClient interface has been moved to'
- ' kafka.SimpleClient - this import will break in a'
- ' future release', DeprecationWarning)
- super(KafkaClient, self).__init__(*args, **kwargs)
-
__all__ = [
- 'KafkaAdminClient',
- 'KafkaConsumer', 'KafkaProducer', 'KafkaClient', 'BrokerConnection',
- 'SimpleClient', 'SimpleProducer', 'KeyedProducer',
- 'RoundRobinPartitioner', 'HashedPartitioner',
- 'create_message', 'create_gzip_message', 'create_snappy_message',
- 'SimpleConsumer', 'MultiProcessConsumer', 'ConsumerRebalanceListener',
+ 'BrokerConnection', 'ConsumerRebalanceListener', 'KafkaAdminClient',
+ 'KafkaClient', 'KafkaConsumer', 'KafkaProducer',
]
diff --git a/kafka/client.py b/kafka/client.py
deleted file mode 100644
index 148cae0..0000000
--- a/kafka/client.py
+++ /dev/null
@@ -1,719 +0,0 @@
-from __future__ import absolute_import
-
-import collections
-import copy
-import functools
-import logging
-import random
-import time
-import select
-
-from kafka.vendor import six
-
-import kafka.errors
-from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
- KafkaTimeoutError, KafkaUnavailableError,
- LeaderNotAvailableError, UnknownTopicOrPartitionError,
- NotLeaderForPartitionError, ReplicaNotAvailableError)
-from kafka.structs import TopicPartition, BrokerMetadata
-
-from kafka.conn import (
- collect_hosts, BrokerConnection,
- ConnectionStates, get_ip_port_afi)
-from kafka.protocol import KafkaProtocol
-
-# New KafkaClient
-# this is not exposed in top-level imports yet,
-# due to conflicts with legacy SimpleConsumer / SimpleProducer usage
-from kafka.client_async import KafkaClient
-
-
-log = logging.getLogger(__name__)
-
-
-# Legacy KafkaClient interface -- will be deprecated soon
-class SimpleClient(object):
-
- CLIENT_ID = b'kafka-python'
- DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
-
- # NOTE: The timeout given to the client should always be greater than the
- # one passed to SimpleConsumer.get_message(), otherwise you can get a
- # socket timeout.
- def __init__(self, hosts, client_id=CLIENT_ID,
- timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
- correlation_id=0):
- # We need one connection to bootstrap
- self.client_id = client_id
- self.timeout = timeout
- self.hosts = collect_hosts(hosts)
- self.correlation_id = correlation_id
-
- self._conns = {}
- self.brokers = {} # broker_id -> BrokerMetadata
- self.topics_to_brokers = {} # TopicPartition -> BrokerMetadata
- self.topic_partitions = {} # topic -> partition -> leader
-
- self.load_metadata_for_topics() # bootstrap with all metadata
-
- ##################
- # Private API #
- ##################
-
- def _get_conn(self, host, port, afi):
- """Get or create a connection to a broker using host and port"""
- host_key = (host, port)
- if host_key not in self._conns:
- self._conns[host_key] = BrokerConnection(
- host, port, afi,
- request_timeout_ms=self.timeout * 1000,
- client_id=self.client_id
- )
-
- conn = self._conns[host_key]
- if not conn.connect_blocking(self.timeout):
- conn.close()
- raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi))
- return conn
-
- def _get_leader_for_partition(self, topic, partition):
- """
- Returns the leader for a partition or None if the partition exists
- but has no leader.
-
- Raises:
- UnknownTopicOrPartitionError: If the topic or partition is not part
- of the metadata.
- LeaderNotAvailableError: If the server has metadata, but there is no
- current leader.
- """
-
- key = TopicPartition(topic, partition)
-
- # Use cached metadata if it is there
- if self.topics_to_brokers.get(key) is not None:
- return self.topics_to_brokers[key]
-
- # Otherwise refresh metadata
-
- # If topic does not already exist, this will raise
- # UnknownTopicOrPartitionError if not auto-creating
- # LeaderNotAvailableError otherwise until partitions are created
- self.load_metadata_for_topics(topic)
-
- # If the partition doesn't actually exist, raise
- if partition not in self.topic_partitions.get(topic, []):
- raise UnknownTopicOrPartitionError(key)
-
- # If there's no leader for the partition, raise
- leader = self.topic_partitions[topic][partition]
- if leader == -1:
- raise LeaderNotAvailableError((topic, partition))
-
- # Otherwise return the BrokerMetadata
- return self.brokers[leader]
-
- def _get_coordinator_for_group(self, group):
- """
- Returns the coordinator broker for a consumer group.
-
- GroupCoordinatorNotAvailableError will be raised if the coordinator
- does not currently exist for the group.
-
- GroupLoadInProgressError is raised if the coordinator is available
- but is still loading offsets from the internal topic
- """
-
- resp = self.send_consumer_metadata_request(group)
-
- # If there's a problem with finding the coordinator, raise the
- # provided error
- kafka.errors.check_error(resp)
-
- # Otherwise return the BrokerMetadata
- return BrokerMetadata(resp.nodeId, resp.host, resp.port, None)
-
- def _next_id(self):
- """Generate a new correlation id"""
- # modulo to keep w/i int32
- self.correlation_id = (self.correlation_id + 1) % 2**31
- return self.correlation_id
-
- def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
- """
- Attempt to send a broker-agnostic request to one of the available
- brokers. Keep trying until you succeed.
- """
- hosts = set()
- for broker in self.brokers.values():
- host, port, afi = get_ip_port_afi(broker.host)
- hosts.add((host, broker.port, afi))
-
- hosts.update(self.hosts)
- hosts = list(hosts)
- random.shuffle(hosts)
-
- for (host, port, afi) in hosts:
- try:
- conn = self._get_conn(host, port, afi)
- except KafkaConnectionError:
- log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
- host, port, afi)
- continue
- request = encoder_fn(payloads=payloads)
- future = conn.send(request)
-
- # Block
- while not future.is_done:
- for r, f in conn.recv():
- f.success(r)
-
- if future.failed():
- log.error("Request failed: %s", future.exception)
- continue
-
- return decoder_fn(future.value)
-
- raise KafkaUnavailableError('All servers failed to process request: %s' % (hosts,))
-
- def _payloads_by_broker(self, payloads):
- payloads_by_broker = collections.defaultdict(list)
- for payload in payloads:
- try:
- leader = self._get_leader_for_partition(payload.topic, payload.partition)
- except (KafkaUnavailableError, LeaderNotAvailableError,
- UnknownTopicOrPartitionError):
- leader = None
- payloads_by_broker[leader].append(payload)
- return dict(payloads_by_broker)
-
- def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
- """
- Group a list of request payloads by topic+partition and send them to
- the leader broker for that partition using the supplied encode/decode
- functions
-
- Arguments:
-
- payloads: list of object-like entities with a topic (str) and
- partition (int) attribute; payloads with duplicate topic-partitions
- are not supported.
-
- encode_fn: a method to encode the list of payloads to a request body,
- must accept client_id, correlation_id, and payloads as
- keyword arguments
-
- decode_fn: a method to decode a response body into response objects.
- The response objects must be object-like and have topic
- and partition attributes
-
- Returns:
-
- List of response objects in the same order as the supplied payloads
- """
- # encoders / decoders do not maintain ordering currently
- # so we need to keep this so we can rebuild order before returning
- original_ordering = [(p.topic, p.partition) for p in payloads]
-
- # Connection errors generally mean stale metadata
- # although sometimes it means incorrect api request
- # Unfortunately there is no good way to tell the difference
- # so we'll just reset metadata on all errors to be safe
- refresh_metadata = False
-
- # For each broker, send the list of request payloads
- # and collect the responses and errors
- payloads_by_broker = self._payloads_by_broker(payloads)
- responses = {}
-
- def failed_payloads(payloads):
- for payload in payloads:
- topic_partition = (str(payload.topic), payload.partition)
- responses[(topic_partition)] = FailedPayloadsError(payload)
-
- # For each BrokerConnection keep the real socket so that we can use
- # a select to perform unblocking I/O
- connections_by_future = {}
- for broker, broker_payloads in six.iteritems(payloads_by_broker):
- if broker is None:
- failed_payloads(broker_payloads)
- continue
-
- host, port, afi = get_ip_port_afi(broker.host)
- try:
- conn = self._get_conn(host, broker.port, afi)
- except KafkaConnectionError:
- refresh_metadata = True
- failed_payloads(broker_payloads)
- continue
-
- request = encoder_fn(payloads=broker_payloads)
- future = conn.send(request)
-
- if future.failed():
- refresh_metadata = True
- failed_payloads(broker_payloads)
- continue
-
- if not request.expect_response():
- for payload in broker_payloads:
- topic_partition = (str(payload.topic), payload.partition)
- responses[topic_partition] = None
- continue
-
- connections_by_future[future] = (conn, broker)
-
- conn = None
- while connections_by_future:
- futures = list(connections_by_future.keys())
-
- # block until a socket is ready to be read
- sockets = [
- conn._sock
- for future, (conn, _) in six.iteritems(connections_by_future)
- if not future.is_done and conn._sock is not None]
- if sockets:
- read_socks, _, _ = select.select(sockets, [], [])
-
- for future in futures:
-
- if not future.is_done:
- conn, _ = connections_by_future[future]
- for r, f in conn.recv():
- f.success(r)
- continue
-
- _, broker = connections_by_future.pop(future)
- if future.failed():
- refresh_metadata = True
- failed_payloads(payloads_by_broker[broker])
-
- else:
- for payload_response in decoder_fn(future.value):
- topic_partition = (str(payload_response.topic),
- payload_response.partition)
- responses[topic_partition] = payload_response
-
- if refresh_metadata:
- self.reset_all_metadata()
-
- # Return responses in the same order as provided
- return [responses[tp] for tp in original_ordering]
-
- def _send_consumer_aware_request(self, group, payloads, encoder_fn, decoder_fn):
- """
- Send a list of requests to the consumer coordinator for the group
- specified using the supplied encode/decode functions. As the payloads
- that use consumer-aware requests do not contain the group (e.g.
- OffsetFetchRequest), all payloads must be for a single group.
-
- Arguments:
-
- group: the name of the consumer group (str) the payloads are for
- payloads: list of object-like entities with topic (str) and
- partition (int) attributes; payloads with duplicate
- topic+partition are not supported.
-
- encode_fn: a method to encode the list of payloads to a request body,
- must accept client_id, correlation_id, and payloads as
- keyword arguments
-
- decode_fn: a method to decode a response body into response objects.
- The response objects must be object-like and have topic
- and partition attributes
-
- Returns:
-
- List of response objects in the same order as the supplied payloads
- """
- # encoders / decoders do not maintain ordering currently
- # so we need to keep this so we can rebuild order before returning
- original_ordering = [(p.topic, p.partition) for p in payloads]
-
- broker = self._get_coordinator_for_group(group)
-
- # Send the list of request payloads and collect the responses and
- # errors
- responses = {}
- request_id = self._next_id()
- log.debug('Request %s to %s: %s', request_id, broker, payloads)
- request = encoder_fn(client_id=self.client_id,
- correlation_id=request_id, payloads=payloads)
-
- # Send the request, recv the response
- try:
- host, port, afi = get_ip_port_afi(broker.host)
- conn = self._get_conn(host, broker.port, afi)
- except KafkaConnectionError as e:
- log.warning('KafkaConnectionError attempting to send request %s '
- 'to server %s: %s', request_id, broker, e)
-
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
-
- # No exception, try to get response
- else:
-
- future = conn.send(request_id, request)
- while not future.is_done:
- for r, f in conn.recv():
- f.success(r)
-
- # decoder_fn=None signal that the server is expected to not
- # send a response. This probably only applies to
- # ProduceRequest w/ acks = 0
- if decoder_fn is None:
- log.debug('Request %s does not expect a response '
- '(skipping conn.recv)', request_id)
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = None
- return []
-
- if future.failed():
- log.warning('Error attempting to receive a '
- 'response to request %s from server %s: %s',
- request_id, broker, future.exception)
-
- for payload in payloads:
- topic_partition = (payload.topic, payload.partition)
- responses[topic_partition] = FailedPayloadsError(payload)
-
- else:
- response = future.value
- _resps = []
- for payload_response in decoder_fn(response):
- topic_partition = (payload_response.topic,
- payload_response.partition)
- responses[topic_partition] = payload_response
- _resps.append(payload_response)
- log.debug('Response %s: %s', request_id, _resps)
-
- # Return responses in the same order as provided
- return [responses[tp] for tp in original_ordering]
-
- def __repr__(self):
- return '<SimpleClient client_id=%s>' % (self.client_id)
-
- def _raise_on_response_error(self, resp):
-
- # Response can be an unraised exception object (FailedPayloadsError)
- if isinstance(resp, Exception):
- raise resp
-
- # Or a server api error response
- try:
- kafka.errors.check_error(resp)
- except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
- self.reset_topic_metadata(resp.topic)
- raise
-
- # Return False if no error to enable list comprehensions
- return False
-
- #################
- # Public API #
- #################
- def close(self):
- for conn in self._conns.values():
- conn.close()
-
- def copy(self):
- """
- Create an inactive copy of the client object, suitable for passing
- to a separate thread.
-
- Note that the copied connections are not initialized, so :meth:`.reinit`
- must be called on the returned copy.
- """
- _conns = self._conns
- self._conns = {}
- c = copy.deepcopy(self)
- self._conns = _conns
- return c
-
- def reinit(self):
- timeout = time.time() + self.timeout
- conns = set(self._conns.values())
- for conn in conns:
- conn.close()
- conn.connect()
-
- while time.time() < timeout:
- for conn in list(conns):
- conn.connect()
- if conn.connected():
- conns.remove(conn)
- if not conns:
- break
-
- def reset_topic_metadata(self, *topics):
- for topic in topics:
- for topic_partition in list(self.topics_to_brokers.keys()):
- if topic_partition.topic == topic:
- del self.topics_to_brokers[topic_partition]
- if topic in self.topic_partitions:
- del self.topic_partitions[topic]
-
- def reset_all_metadata(self):
- self.topics_to_brokers.clear()
- self.topic_partitions.clear()
-
- def has_metadata_for_topic(self, topic):
- return (
- topic in self.topic_partitions
- and len(self.topic_partitions[topic]) > 0
- )
-
- def get_partition_ids_for_topic(self, topic):
- if topic not in self.topic_partitions:
- return []
-
- return sorted(list(self.topic_partitions[topic]))
-
- @property
- def topics(self):
- return list(self.topic_partitions.keys())
-
- def ensure_topic_exists(self, topic, timeout=30):
- start_time = time.time()
-
- while not self.has_metadata_for_topic(topic):
- if time.time() > start_time + timeout:
- raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
- self.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
- time.sleep(.5)
-
- def load_metadata_for_topics(self, *topics, **kwargs):
- """Fetch broker and topic-partition metadata from the server.
-
- Updates internal data: broker list, topic/partition list, and
- topic/partition -> broker map. This method should be called after
- receiving any error.
-
- Note: Exceptions *will not* be raised in a full refresh (i.e. no topic
- list). In this case, error codes will be logged as errors.
- Partition-level errors will also not be raised here (a single partition
- w/o a leader, for example).
-
- Arguments:
- *topics (optional): If a list of topics is provided,
- the metadata refresh will be limited to the specified topics
- only.
- ignore_leadernotavailable (bool): suppress LeaderNotAvailableError
- so that metadata is loaded correctly during auto-create.
- Default: False.
-
- Raises:
- UnknownTopicOrPartitionError: Raised for topics that do not exist,
- unless the broker is configured to auto-create topics.
- LeaderNotAvailableError: Raised for topics that do not exist yet,
- when the broker is configured to auto-create topics. Retry
- after a short backoff (topics/partitions are initializing).
- """
- if 'ignore_leadernotavailable' in kwargs:
- ignore_leadernotavailable = kwargs['ignore_leadernotavailable']
- else:
- ignore_leadernotavailable = False
-
- if topics:
- self.reset_topic_metadata(*topics)
- else:
- self.reset_all_metadata()
-
- resp = self.send_metadata_request(topics)
-
- log.debug('Updating broker metadata: %s', resp.brokers)
- log.debug('Updating topic metadata: %s', [topic for _, topic, _ in resp.topics])
-
- self.brokers = dict([(nodeId, BrokerMetadata(nodeId, host, port, None))
- for nodeId, host, port in resp.brokers])
-
- for error, topic, partitions in resp.topics:
- # Errors expected for new topics
- if error:
- error_type = kafka.errors.kafka_errors.get(error, UnknownError)
- if error_type in (UnknownTopicOrPartitionError, LeaderNotAvailableError):
- log.error('Error loading topic metadata for %s: %s (%s)',
- topic, error_type, error)
- if topic not in topics:
- continue
- elif (error_type is LeaderNotAvailableError and
- ignore_leadernotavailable):
- continue
- raise error_type(topic)
-
- self.topic_partitions[topic] = {}
- for error, partition, leader, _, _ in partitions:
-
- self.topic_partitions[topic][partition] = leader
-
- # Populate topics_to_brokers dict
- topic_part = TopicPartition(topic, partition)
-
- # Check for partition errors
- if error:
- error_type = kafka.errors.kafka_errors.get(error, UnknownError)
-
- # If No Leader, topics_to_brokers topic_partition -> None
- if error_type is LeaderNotAvailableError:
- log.error('No leader for topic %s partition %d', topic, partition)
- self.topics_to_brokers[topic_part] = None
- continue
-
- # If one of the replicas is unavailable -- ignore
- # this error code is provided for admin purposes only
- # we never talk to replicas, only the leader
- elif error_type is ReplicaNotAvailableError:
- log.debug('Some (non-leader) replicas not available for topic %s partition %d', topic, partition)
-
- else:
- raise error_type(topic_part)
-
- # If Known Broker, topic_partition -> BrokerMetadata
- if leader in self.brokers:
- self.topics_to_brokers[topic_part] = self.brokers[leader]
-
- # If Unknown Broker, fake BrokerMetadata so we don't lose the id
- # (not sure how this could happen. server could be in bad state)
- else:
- self.topics_to_brokers[topic_part] = BrokerMetadata(
- leader, None, None, None
- )
-
- def send_metadata_request(self, payloads=(), fail_on_error=True,
- callback=None):
- encoder = KafkaProtocol.encode_metadata_request
- decoder = KafkaProtocol.decode_metadata_response
-
- return self._send_broker_unaware_request(payloads, encoder, decoder)
-
- def send_consumer_metadata_request(self, payloads=(), fail_on_error=True,
- callback=None):
- encoder = KafkaProtocol.encode_consumer_metadata_request
- decoder = KafkaProtocol.decode_consumer_metadata_response
-
- return self._send_broker_unaware_request(payloads, encoder, decoder)
-
- def send_produce_request(self, payloads=(), acks=1, timeout=1000,
- fail_on_error=True, callback=None):
- """
- Encode and send some ProduceRequests
-
- ProduceRequests will be grouped by (topic, partition) and then
- sent to a specific broker. Output is a list of responses in the
- same order as the list of payloads specified
-
- Arguments:
- payloads (list of ProduceRequest): produce requests to send to kafka
- ProduceRequest payloads must not contain duplicates for any
- topic-partition.
- acks (int, optional): how many acks the servers should receive from replica
- brokers before responding to the request. If it is 0, the server
- will not send any response. If it is 1, the server will wait
- until the data is written to the local log before sending a
- response. If it is -1, the server will wait until the message
- is committed by all in-sync replicas before sending a response.
- For any value > 1, the server will wait for this number of acks to
- occur (but the server will never wait for more acknowledgements than
- there are in-sync replicas). defaults to 1.
- timeout (int, optional): maximum time in milliseconds the server can
- await the receipt of the number of acks, defaults to 1000.
- fail_on_error (bool, optional): raise exceptions on connection and
- server response errors, defaults to True.
- callback (function, optional): instead of returning the ProduceResponse,
- first pass it through this function, defaults to None.
-
- Returns:
- list of ProduceResponses, or callback results if supplied, in the
- order of input payloads
- """
-
- encoder = functools.partial(
- KafkaProtocol.encode_produce_request,
- acks=acks,
- timeout=timeout)
-
- if acks == 0:
- decoder = None
- else:
- decoder = KafkaProtocol.decode_produce_response
-
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
-
- return [resp if not callback else callback(resp) for resp in resps
- if resp is not None and
- (not fail_on_error or not self._raise_on_response_error(resp))]
-
- def send_fetch_request(self, payloads=(), fail_on_error=True,
- callback=None, max_wait_time=100, min_bytes=4096):
- """
- Encode and send a FetchRequest
-
- Payloads are grouped by topic and partition so they can be pipelined
- to the same brokers.
- """
-
- encoder = functools.partial(KafkaProtocol.encode_fetch_request,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes)
-
- resps = self._send_broker_aware_request(
- payloads, encoder,
- KafkaProtocol.decode_fetch_response)
-
- return [resp if not callback else callback(resp) for resp in resps
- if not fail_on_error or not self._raise_on_response_error(resp)]
-
- def send_offset_request(self, payloads=(), fail_on_error=True,
- callback=None):
- resps = self._send_broker_aware_request(
- payloads,
- KafkaProtocol.encode_offset_request,
- KafkaProtocol.decode_offset_response)
-
- return [resp if not callback else callback(resp) for resp in resps
- if not fail_on_error or not self._raise_on_response_error(resp)]
-
- def send_list_offset_request(self, payloads=(), fail_on_error=True,
- callback=None):
- resps = self._send_broker_aware_request(
- payloads,
- KafkaProtocol.encode_list_offset_request,
- KafkaProtocol.decode_list_offset_response)
-
- return [resp if not callback else callback(resp) for resp in resps
- if not fail_on_error or not self._raise_on_response_error(resp)]
-
- def send_offset_commit_request(self, group, payloads=(),
- fail_on_error=True, callback=None):
- encoder = functools.partial(KafkaProtocol.encode_offset_commit_request,
- group=group)
- decoder = KafkaProtocol.decode_offset_commit_response
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
-
- return [resp if not callback else callback(resp) for resp in resps
- if not fail_on_error or not self._raise_on_response_error(resp)]
-
- def send_offset_fetch_request(self, group, payloads=(),
- fail_on_error=True, callback=None):
-
- encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
- group=group)
- decoder = KafkaProtocol.decode_offset_fetch_response
- resps = self._send_broker_aware_request(payloads, encoder, decoder)
-
- return [resp if not callback else callback(resp) for resp in resps
- if not fail_on_error or not self._raise_on_response_error(resp)]
-
- def send_offset_fetch_request_kafka(self, group, payloads=(),
- fail_on_error=True, callback=None):
-
- encoder = functools.partial(KafkaProtocol.encode_offset_fetch_request,
- group=group, from_kafka=True)
- decoder = KafkaProtocol.decode_offset_fetch_response
- resps = self._send_consumer_aware_request(group, payloads, encoder, decoder)
-
- return [resp if not callback else callback(resp) for resp in resps
- if not fail_on_error or not self._raise_on_response_error(resp)]
diff --git a/kafka/common.py b/kafka/common.py
deleted file mode 100644
index 15e88eb..0000000
--- a/kafka/common.py
+++ /dev/null
@@ -1,4 +0,0 @@
-from __future__ import absolute_import
-
-from kafka.structs import *
-from kafka.errors import *
diff --git a/kafka/consumer/__init__.py b/kafka/consumer/__init__.py
index 4b900ac..e09bcc1 100644
--- a/kafka/consumer/__init__.py
+++ b/kafka/consumer/__init__.py
@@ -1,9 +1,7 @@
from __future__ import absolute_import
-from kafka.consumer.simple import SimpleConsumer
-from kafka.consumer.multiprocess import MultiProcessConsumer
from kafka.consumer.group import KafkaConsumer
__all__ = [
- 'SimpleConsumer', 'MultiProcessConsumer', 'KafkaConsumer'
+ 'KafkaConsumer'
]
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
deleted file mode 100644
index a77ce7e..0000000
--- a/kafka/consumer/base.py
+++ /dev/null
@@ -1,232 +0,0 @@
-from __future__ import absolute_import
-
-import atexit
-import logging
-import numbers
-from threading import Lock
-import warnings
-
-from kafka.errors import (
- UnknownTopicOrPartitionError, check_error, KafkaError)
-from kafka.structs import (
- OffsetRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload)
-from kafka.util import ReentrantTimer
-
-
-log = logging.getLogger('kafka.consumer')
-
-AUTO_COMMIT_MSG_COUNT = 100
-AUTO_COMMIT_INTERVAL = 5000
-
-FETCH_DEFAULT_BLOCK_TIMEOUT = 1
-FETCH_MAX_WAIT_TIME = 100
-FETCH_MIN_BYTES = 4096
-FETCH_BUFFER_SIZE_BYTES = 4096
-MAX_FETCH_BUFFER_SIZE_BYTES = FETCH_BUFFER_SIZE_BYTES * 8
-
-ITER_TIMEOUT_SECONDS = 60
-NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
-FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
-
-MAX_BACKOFF_SECONDS = 60
-
-class Consumer(object):
- """
- Base class to be used by other consumers. Not to be used directly
-
- This base class provides logic for
-
- * initialization and fetching metadata of partitions
- * Auto-commit logic
- * APIs for fetching pending message count
-
- """
- def __init__(self, client, group, topic, partitions=None, auto_commit=True,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL):
-
- warnings.warn('deprecated -- this class will be removed in a future'
- ' release. Use KafkaConsumer instead.',
- DeprecationWarning)
- self.client = client
- self.topic = topic
- self.group = group
- self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
- self.offsets = {}
-
- if partitions is None:
- partitions = self.client.get_partition_ids_for_topic(topic)
- else:
- assert all(isinstance(x, numbers.Integral) for x in partitions)
-
- # Variables for handling offset commits
- self.commit_lock = Lock()
- self.commit_timer = None
- self.count_since_commit = 0
- self.auto_commit = auto_commit
- self.auto_commit_every_n = auto_commit_every_n
- self.auto_commit_every_t = auto_commit_every_t
-
- # Set up the auto-commit timer
- if auto_commit is True and auto_commit_every_t is not None:
- self.commit_timer = ReentrantTimer(auto_commit_every_t,
- self.commit)
- self.commit_timer.start()
-
- # Set initial offsets
- if self.group is not None:
- self.fetch_last_known_offsets(partitions)
- else:
- for partition in partitions:
- self.offsets[partition] = 0
-
- # Register a cleanup handler
- def cleanup(obj):
- obj.stop()
- self._cleanup_func = cleanup
- atexit.register(cleanup, self)
-
- self.partition_info = False # Do not return partition info in msgs
-
- def provide_partition_info(self):
- """
- Indicates that partition info must be returned by the consumer
- """
- self.partition_info = True
-
- def fetch_last_known_offsets(self, partitions=None):
- if self.group is None:
- raise ValueError('SimpleClient.group must not be None')
-
- if partitions is None:
- partitions = self.client.get_partition_ids_for_topic(self.topic)
-
- responses = self.client.send_offset_fetch_request(
- self.group,
- [OffsetFetchRequestPayload(self.topic, p) for p in partitions],
- fail_on_error=False
- )
-
- for resp in responses:
- try:
- check_error(resp)
- # API spec says server won't set an error here
- # but 0.8.1.1 does actually...
- except UnknownTopicOrPartitionError:
- pass
-
- # -1 offset signals no commit is currently stored
- if resp.offset == -1:
- self.offsets[resp.partition] = 0
-
- # Otherwise we committed the stored offset
- # and need to fetch the next one
- else:
- self.offsets[resp.partition] = resp.offset
-
- def commit(self, partitions=None):
- """Commit stored offsets to Kafka via OffsetCommitRequest (v0)
-
- Keyword Arguments:
- partitions (list): list of partitions to commit, default is to commit
- all of them
-
- Returns: True on success, False on failure
- """
-
- # short circuit if nothing happened. This check is kept outside
- # to prevent un-necessarily acquiring a lock for checking the state
- if self.count_since_commit == 0:
- return
-
- with self.commit_lock:
- # Do this check again, just in case the state has changed
- # during the lock acquiring timeout
- if self.count_since_commit == 0:
- return
-
- reqs = []
- if partitions is None: # commit all partitions
- partitions = list(self.offsets.keys())
-
- log.debug('Committing new offsets for %s, partitions %s',
- self.topic, partitions)
- for partition in partitions:
- offset = self.offsets[partition]
- log.debug('Commit offset %d in SimpleConsumer: '
- 'group=%s, topic=%s, partition=%s',
- offset, self.group, self.topic, partition)
-
- reqs.append(OffsetCommitRequestPayload(self.topic, partition,
- offset, None))
-
- try:
- self.client.send_offset_commit_request(self.group, reqs)
- except KafkaError as e:
- log.error('%s saving offsets: %s', e.__class__.__name__, e)
- return False
- else:
- self.count_since_commit = 0
- return True
-
- def _auto_commit(self):
- """
- Check if we have to commit based on number of messages and commit
- """
-
- # Check if we are supposed to do an auto-commit
- if not self.auto_commit or self.auto_commit_every_n is None:
- return
-
- if self.count_since_commit >= self.auto_commit_every_n:
- self.commit()
-
- def stop(self):
- if self.commit_timer is not None:
- self.commit_timer.stop()
- self.commit()
-
- if hasattr(self, '_cleanup_func'):
- # Remove cleanup handler now that we've stopped
-
- # py3 supports unregistering
- if hasattr(atexit, 'unregister'):
- atexit.unregister(self._cleanup_func) # pylint: disable=no-member
-
- # py2 requires removing from private attribute...
- else:
-
- # ValueError on list.remove() if the exithandler no longer
- # exists is fine here
- try:
- atexit._exithandlers.remove( # pylint: disable=no-member
- (self._cleanup_func, (self,), {}))
- except ValueError:
- pass
-
- del self._cleanup_func
-
- def pending(self, partitions=None):
- """
- Gets the pending message count
-
- Keyword Arguments:
- partitions (list): list of partitions to check for, default is to check all
- """
- if partitions is None:
- partitions = self.offsets.keys()
-
- total = 0
- reqs = []
-
- for partition in partitions:
- reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
-
- resps = self.client.send_offset_request(reqs)
- for resp in resps:
- partition = resp.partition
- pending = resp.offsets[0]
- offset = self.offsets[partition]
- total += pending - offset
-
- return total
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 15c2905..e9fd44c 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -1207,28 +1207,3 @@ class KafkaConsumer(six.Iterator):
if self.config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (
self.config['consumer_timeout_ms'] / 1000.0)
-
- # Old KafkaConsumer methods are deprecated
- def configure(self, **configs):
- raise NotImplementedError(
- 'deprecated -- initialize a new consumer')
-
- def set_topic_partitions(self, *topics):
- raise NotImplementedError(
- 'deprecated -- use subscribe() or assign()')
-
- def fetch_messages(self):
- raise NotImplementedError(
- 'deprecated -- use poll() or iterator interface')
-
- def get_partition_offsets(self, topic, partition,
- request_time_ms, max_num_offsets):
- raise NotImplementedError(
- 'deprecated -- send an OffsetRequest with KafkaClient')
-
- def offsets(self, group=None):
- raise NotImplementedError('deprecated -- use committed(partition)')
-
- def task_done(self, message):
- raise NotImplementedError(
- 'deprecated -- commit offsets manually if needed')
diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py
deleted file mode 100644
index 758bb92..0000000
--- a/kafka/consumer/multiprocess.py
+++ /dev/null
@@ -1,295 +0,0 @@
-from __future__ import absolute_import
-
-from collections import namedtuple
-import logging
-from multiprocessing import Process, Manager as MPManager
-import time
-import warnings
-
-from kafka.vendor.six.moves import queue # pylint: disable=import-error
-
-from kafka.errors import KafkaError
-from kafka.consumer.base import (
- Consumer,
- AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
- NO_MESSAGES_WAIT_TIME_SECONDS,
- FULL_QUEUE_WAIT_TIME_SECONDS,
- MAX_BACKOFF_SECONDS,
-)
-from kafka.consumer.simple import SimpleConsumer
-
-
-log = logging.getLogger(__name__)
-
-Events = namedtuple("Events", ["start", "pause", "exit"])
-
-
-def _mp_consume(client, group, topic, message_queue, size, events, **consumer_options):
- """
- A child process worker which consumes messages based on the
- notifications given by the controller process
-
- NOTE: Ideally, this should have been a method inside the Consumer
- class. However, multiprocessing module has issues in windows. The
- functionality breaks unless this function is kept outside of a class
- """
-
- # Initial interval for retries in seconds.
- interval = 1
- while not events.exit.is_set():
- try:
- # Make the child processes open separate socket connections
- client.reinit()
-
- # We will start consumers without auto-commit. Auto-commit will be
- # done by the master controller process.
- consumer = SimpleConsumer(client, group, topic,
- auto_commit=False,
- auto_commit_every_n=None,
- auto_commit_every_t=None,
- **consumer_options)
-
- # Ensure that the consumer provides the partition information
- consumer.provide_partition_info()
-
- while True:
- # Wait till the controller indicates us to start consumption
- events.start.wait()
-
- # If we are asked to quit, do so
- if events.exit.is_set():
- break
-
- # Consume messages and add them to the queue. If the controller
- # indicates a specific number of messages, follow that advice
- count = 0
-
- message = consumer.get_message()
- if message:
- while True:
- try:
- message_queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
- break
- except queue.Full:
- if events.exit.is_set(): break
-
- count += 1
-
- # We have reached the required size. The controller might have
- # more than what he needs. Wait for a while.
- # Without this logic, it is possible that we run into a big
- # loop consuming all available messages before the controller
- # can reset the 'start' event
- if count == size.value:
- events.pause.wait()
-
- else:
- # In case we did not receive any message, give up the CPU for
- # a while before we try again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
-
- consumer.stop()
-
- except KafkaError as e:
- # Retry with exponential backoff
- log.exception("Problem communicating with Kafka, retrying in %d seconds...", interval)
- time.sleep(interval)
- interval = interval*2 if interval*2 < MAX_BACKOFF_SECONDS else MAX_BACKOFF_SECONDS
-
-
-class MultiProcessConsumer(Consumer):
- """
- A consumer implementation that consumes partitions for a topic in
- parallel using multiple processes
-
- Arguments:
- client: a connected SimpleClient
- group: a name for this consumer, used for offset storage and must be unique
- If you are connecting to a server that does not support offset
- commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
- topic: the topic to consume
-
- Keyword Arguments:
- partitions: An optional list of partitions to consume the data from
- auto_commit: default True. Whether or not to auto commit the offsets
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- num_procs: Number of processes to start for consuming messages.
- The available partitions will be divided among these processes
- partitions_per_proc: Number of partitions to be allocated per process
- (overrides num_procs)
-
- Auto commit details:
- If both auto_commit_every_n and auto_commit_every_t are set, they will
- reset one another when one is triggered. These triggers simply call the
- commit method on this class. A manual call to commit will also reset
- these triggers
- """
- def __init__(self, client, group, topic,
- partitions=None,
- auto_commit=True,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- num_procs=1,
- partitions_per_proc=0,
- **simple_consumer_options):
-
- warnings.warn('This class has been deprecated and will be removed in a'
- ' future release. Use KafkaConsumer instead',
- DeprecationWarning)
-
- # Initiate the base consumer class
- super(MultiProcessConsumer, self).__init__(
- client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
-
- # Variables for managing and controlling the data flow from
- # consumer child process to master
- manager = MPManager()
- self.queue = manager.Queue(1024) # Child consumers dump messages into this
- self.events = Events(
- start = manager.Event(), # Indicates the consumers to start fetch
- exit = manager.Event(), # Requests the consumers to shutdown
- pause = manager.Event()) # Requests the consumers to pause fetch
- self.size = manager.Value('i', 0) # Indicator of number of messages to fetch
-
- # dict.keys() returns a view in py3 + it's not a thread-safe operation
- # http://blog.labix.org/2008/06/27/watch-out-for-listdictkeys-in-python-3
- # It's safer to copy dict as it only runs during the init.
- partitions = list(self.offsets.copy().keys())
-
- # By default, start one consumer process for all partitions
- # The logic below ensures that
- # * we do not cross the num_procs limit
- # * we have an even distribution of partitions among processes
-
- if partitions_per_proc:
- num_procs = len(partitions) / partitions_per_proc
- if num_procs * partitions_per_proc < len(partitions):
- num_procs += 1
-
- # The final set of chunks
- chunks = [partitions[proc::num_procs] for proc in range(num_procs)]
-
- self.procs = []
- for chunk in chunks:
- options = {'partitions': list(chunk)}
- if simple_consumer_options:
- simple_consumer_options.pop('partitions', None)
- options.update(simple_consumer_options)
-
- args = (client.copy(), self.group, self.topic, self.queue,
- self.size, self.events)
- proc = Process(target=_mp_consume, args=args, kwargs=options)
- proc.daemon = True
- proc.start()
- self.procs.append(proc)
-
- def __repr__(self):
- return '<MultiProcessConsumer group=%s, topic=%s, consumers=%d>' % \
- (self.group, self.topic, len(self.procs))
-
- def stop(self):
- # Set exit and start off all waiting consumers
- self.events.exit.set()
- self.events.pause.set()
- self.events.start.set()
-
- for proc in self.procs:
- proc.join()
- proc.terminate()
-
- super(MultiProcessConsumer, self).stop()
-
- def __iter__(self):
- """
- Iterator to consume the messages available on this consumer
- """
- # Trigger the consumer procs to start off.
- # We will iterate till there are no more messages available
- self.size.value = 0
- self.events.pause.set()
-
- while True:
- self.events.start.set()
- try:
- # We will block for a small while so that the consumers get
- # a chance to run and put some messages in the queue
- # TODO: This is a hack and will make the consumer block for
- # at least one second. Need to find a better way of doing this
- partition, message = self.queue.get(block=True, timeout=1)
- except queue.Empty:
- break
-
- # Count, check and commit messages if necessary
- self.offsets[partition] = message.offset + 1
- self.events.start.clear()
- self.count_since_commit += 1
- self._auto_commit()
- yield message
-
- self.events.start.clear()
-
- def get_messages(self, count=1, block=True, timeout=10):
- """
- Fetch the specified number of messages
-
- Keyword Arguments:
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till all messages are fetched.
- If block is a positive integer the API will block until that
- many messages are fetched.
- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is
- fetched. If None, it will block forever.
- """
- messages = []
-
- # Give a size hint to the consumers. Each consumer process will fetch
- # a maximum of "count" messages. This will fetch more messages than
- # necessary, but these will not be committed to kafka. Also, the extra
- # messages can be provided in subsequent runs
- self.size.value = count
- self.events.pause.clear()
-
- if timeout is not None:
- max_time = time.time() + timeout
-
- new_offsets = {}
- while count > 0 and (timeout is None or timeout > 0):
- # Trigger consumption only if the queue is empty
- # By doing this, we will ensure that consumers do not
- # go into overdrive and keep consuming thousands of
- # messages when the user might need only a few
- if self.queue.empty():
- self.events.start.set()
-
- block_next_call = block is True or block > len(messages)
- try:
- partition, message = self.queue.get(block_next_call,
- timeout)
- except queue.Empty:
- break
-
- _msg = (partition, message) if self.partition_info else message
- messages.append(_msg)
- new_offsets[partition] = message.offset + 1
- count -= 1
- if timeout is not None:
- timeout = max_time - time.time()
-
- self.size.value = 0
- self.events.start.clear()
- self.events.pause.set()
-
- # Update and commit offsets if necessary
- self.offsets.update(new_offsets)
- self.count_since_commit += len(messages)
- self._auto_commit()
-
- return messages
diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py
deleted file mode 100644
index a6a64a5..0000000
--- a/kafka/consumer/simple.py
+++ /dev/null
@@ -1,444 +0,0 @@
-from __future__ import absolute_import
-
-try:
- from itertools import zip_longest as izip_longest, repeat # pylint: disable=E0611
-except ImportError:
- from itertools import izip_longest as izip_longest, repeat # pylint: disable=E0611
-import logging
-import sys
-import time
-import warnings
-
-from kafka.vendor import six
-from kafka.vendor.six.moves import queue # pylint: disable=import-error
-
-from kafka.consumer.base import (
- Consumer,
- FETCH_DEFAULT_BLOCK_TIMEOUT,
- AUTO_COMMIT_MSG_COUNT,
- AUTO_COMMIT_INTERVAL,
- FETCH_MIN_BYTES,
- FETCH_BUFFER_SIZE_BYTES,
- MAX_FETCH_BUFFER_SIZE_BYTES,
- FETCH_MAX_WAIT_TIME,
- ITER_TIMEOUT_SECONDS,
- NO_MESSAGES_WAIT_TIME_SECONDS
-)
-from kafka.errors import (
- KafkaError, ConsumerFetchSizeTooSmall,
- UnknownTopicOrPartitionError, NotLeaderForPartitionError,
- OffsetOutOfRangeError, FailedPayloadsError, check_error
-)
-from kafka.protocol.message import PartialMessage
-from kafka.structs import FetchRequestPayload, OffsetRequestPayload
-
-
-log = logging.getLogger(__name__)
-
-
-class FetchContext(object):
- """
- Class for managing the state of a consumer during fetch
- """
- def __init__(self, consumer, block, timeout):
- warnings.warn('deprecated - this class will be removed in a future'
- ' release', DeprecationWarning)
- self.consumer = consumer
- self.block = block
-
- if block:
- if not timeout:
- timeout = FETCH_DEFAULT_BLOCK_TIMEOUT
- self.timeout = timeout * 1000
-
- def __enter__(self):
- """Set fetch values based on blocking status"""
- self.orig_fetch_max_wait_time = self.consumer.fetch_max_wait_time
- self.orig_fetch_min_bytes = self.consumer.fetch_min_bytes
- if self.block:
- self.consumer.fetch_max_wait_time = self.timeout
- self.consumer.fetch_min_bytes = 1
- else:
- self.consumer.fetch_min_bytes = 0
-
- def __exit__(self, type, value, traceback):
- """Reset values"""
- self.consumer.fetch_max_wait_time = self.orig_fetch_max_wait_time
- self.consumer.fetch_min_bytes = self.orig_fetch_min_bytes
-
-
-class SimpleConsumer(Consumer):
- """
- A simple consumer implementation that consumes all/specified partitions
- for a topic
-
- Arguments:
- client: a connected SimpleClient
- group: a name for this consumer, used for offset storage and must be unique
- If you are connecting to a server that does not support offset
- commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
- topic: the topic to consume
-
- Keyword Arguments:
- partitions: An optional list of partitions to consume the data from
-
- auto_commit: default True. Whether or not to auto commit the offsets
-
- auto_commit_every_n: default 100. How many messages to consume
- before a commit
-
- auto_commit_every_t: default 5000. How much time (in milliseconds) to
- wait before commit
- fetch_size_bytes: number of bytes to request in a FetchRequest
-
- buffer_size: default 4K. Initial number of bytes to tell kafka we
- have available. This will double as needed.
-
- max_buffer_size: default 16K. Max number of bytes to tell kafka we have
- available. None means no limit.
-
- iter_timeout: default None. How much time (in seconds) to wait for a
- message in the iterator before exiting. None means no
- timeout, so it will wait forever.
-
- auto_offset_reset: default largest. Reset partition offsets upon
- OffsetOutOfRangeError. Valid values are largest and smallest.
- Otherwise, do not reset the offsets and raise OffsetOutOfRangeError.
-
- Auto commit details:
- If both auto_commit_every_n and auto_commit_every_t are set, they will
- reset one another when one is triggered. These triggers simply call the
- commit method on this class. A manual call to commit will also reset
- these triggers
- """
- def __init__(self, client, group, topic, auto_commit=True, partitions=None,
- auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
- auto_commit_every_t=AUTO_COMMIT_INTERVAL,
- fetch_size_bytes=FETCH_MIN_BYTES,
- buffer_size=FETCH_BUFFER_SIZE_BYTES,
- max_buffer_size=MAX_FETCH_BUFFER_SIZE_BYTES,
- iter_timeout=None,
- auto_offset_reset='largest'):
- warnings.warn('deprecated - this class will be removed in a future'
- ' release. Use KafkaConsumer instead.',
- DeprecationWarning)
- super(SimpleConsumer, self).__init__(
- client, group, topic,
- partitions=partitions,
- auto_commit=auto_commit,
- auto_commit_every_n=auto_commit_every_n,
- auto_commit_every_t=auto_commit_every_t)
-
- if max_buffer_size is not None and buffer_size > max_buffer_size:
- raise ValueError('buffer_size (%d) is greater than '
- 'max_buffer_size (%d)' %
- (buffer_size, max_buffer_size))
- self.buffer_size = buffer_size
- self.max_buffer_size = max_buffer_size
- self.fetch_max_wait_time = FETCH_MAX_WAIT_TIME
- self.fetch_min_bytes = fetch_size_bytes
- self.fetch_offsets = self.offsets.copy()
- self.iter_timeout = iter_timeout
- self.auto_offset_reset = auto_offset_reset
- self.queue = queue.Queue()
-
- def __repr__(self):
- return '<SimpleConsumer group=%s, topic=%s, partitions=%s>' % \
- (self.group, self.topic, str(self.offsets.keys()))
-
- def reset_partition_offset(self, partition):
- """Update offsets using auto_offset_reset policy (smallest|largest)
-
- Arguments:
- partition (int): the partition for which offsets should be updated
-
- Returns: Updated offset on success, None on failure
- """
- LATEST = -1
- EARLIEST = -2
- if self.auto_offset_reset == 'largest':
- reqs = [OffsetRequestPayload(self.topic, partition, LATEST, 1)]
- elif self.auto_offset_reset == 'smallest':
- reqs = [OffsetRequestPayload(self.topic, partition, EARLIEST, 1)]
- else:
- # Let's raise an reasonable exception type if user calls
- # outside of an exception context
- if sys.exc_info() == (None, None, None):
- raise OffsetOutOfRangeError('Cannot reset partition offsets without a '
- 'valid auto_offset_reset setting '
- '(largest|smallest)')
- # Otherwise we should re-raise the upstream exception
- # b/c it typically includes additional data about
- # the request that triggered it, and we do not want to drop that
- raise # pylint: disable=E0704
-
- # send_offset_request
- log.info('Resetting topic-partition offset to %s for %s:%d',
- self.auto_offset_reset, self.topic, partition)
- try:
- (resp, ) = self.client.send_offset_request(reqs)
- except KafkaError as e:
- log.error('%s sending offset request for %s:%d',
- e.__class__.__name__, self.topic, partition)
- else:
- self.offsets[partition] = resp.offsets[0]
- self.fetch_offsets[partition] = resp.offsets[0]
- return resp.offsets[0]
-
- def seek(self, offset, whence=None, partition=None):
- """
- Alter the current offset in the consumer, similar to fseek
-
- Arguments:
- offset: how much to modify the offset
- whence: where to modify it from, default is None
-
- * None is an absolute offset
- * 0 is relative to the earliest available offset (head)
- * 1 is relative to the current offset
- * 2 is relative to the latest known offset (tail)
-
- partition: modify which partition, default is None.
- If partition is None, would modify all partitions.
- """
-
- if whence is None: # set an absolute offset
- if partition is None:
- for tmp_partition in self.offsets:
- self.offsets[tmp_partition] = offset
- else:
- self.offsets[partition] = offset
- elif whence == 1: # relative to current position
- if partition is None:
- for tmp_partition, _offset in self.offsets.items():
- self.offsets[tmp_partition] = _offset + offset
- else:
- self.offsets[partition] += offset
- elif whence in (0, 2): # relative to beginning or end
- reqs = []
- deltas = {}
- if partition is None:
- # divide the request offset by number of partitions,
- # distribute the remained evenly
- (delta, rem) = divmod(offset, len(self.offsets))
- for tmp_partition, r in izip_longest(self.offsets.keys(),
- repeat(1, rem),
- fillvalue=0):
- deltas[tmp_partition] = delta + r
-
- for tmp_partition in self.offsets.keys():
- if whence == 0:
- reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -2, 1))
- elif whence == 2:
- reqs.append(OffsetRequestPayload(self.topic, tmp_partition, -1, 1))
- else:
- pass
- else:
- deltas[partition] = offset
- if whence == 0:
- reqs.append(OffsetRequestPayload(self.topic, partition, -2, 1))
- elif whence == 2:
- reqs.append(OffsetRequestPayload(self.topic, partition, -1, 1))
- else:
- pass
-
- resps = self.client.send_offset_request(reqs)
- for resp in resps:
- self.offsets[resp.partition] = \
- resp.offsets[0] + deltas[resp.partition]
- else:
- raise ValueError('Unexpected value for `whence`, %d' % (whence,))
-
- # Reset queue and fetch offsets since they are invalid
- self.fetch_offsets = self.offsets.copy()
- self.count_since_commit += 1
- if self.auto_commit:
- self.commit()
-
- self.queue = queue.Queue()
-
- def get_messages(self, count=1, block=True, timeout=0.1):
- """
- Fetch the specified number of messages
-
- Keyword Arguments:
- count: Indicates the maximum number of messages to be fetched
- block: If True, the API will block till all messages are fetched.
- If block is a positive integer the API will block until that
- many messages are fetched.
- timeout: When blocking is requested the function will block for
- the specified time (in seconds) until count messages is
- fetched. If None, it will block forever.
- """
- messages = []
- if timeout is not None:
- timeout += time.time()
-
- new_offsets = {}
- log.debug('getting %d messages', count)
- while len(messages) < count:
- block_time = timeout - time.time()
- log.debug('calling _get_message block=%s timeout=%s', block, block_time)
- block_next_call = block is True or block > len(messages)
- result = self._get_message(block_next_call, block_time,
- get_partition_info=True,
- update_offset=False)
- log.debug('got %s from _get_messages', result)
- if not result:
- if block_next_call and (timeout is None or time.time() <= timeout):
- continue
- break
-
- partition, message = result
- _msg = (partition, message) if self.partition_info else message
- messages.append(_msg)
- new_offsets[partition] = message.offset + 1
-
- # Update and commit offsets if necessary
- self.offsets.update(new_offsets)
- self.count_since_commit += len(messages)
- self._auto_commit()
- log.debug('got %d messages: %s', len(messages), messages)
- return messages
-
- def get_message(self, block=True, timeout=0.1, get_partition_info=None):
- return self._get_message(block, timeout, get_partition_info)
-
- def _get_message(self, block=True, timeout=0.1, get_partition_info=None,
- update_offset=True):
- """
- If no messages can be fetched, returns None.
- If get_partition_info is None, it defaults to self.partition_info
- If get_partition_info is True, returns (partition, message)
- If get_partition_info is False, returns message
- """
- start_at = time.time()
- while self.queue.empty():
- # We're out of messages, go grab some more.
- log.debug('internal queue empty, fetching more messages')
- with FetchContext(self, block, timeout):
- self._fetch()
-
- if not block or time.time() > (start_at + timeout):
- break
-
- try:
- partition, message = self.queue.get_nowait()
-
- if update_offset:
- # Update partition offset
- self.offsets[partition] = message.offset + 1
-
- # Count, check and commit messages if necessary
- self.count_since_commit += 1
- self._auto_commit()
-
- if get_partition_info is None:
- get_partition_info = self.partition_info
- if get_partition_info:
- return partition, message
- else:
- return message
- except queue.Empty:
- log.debug('internal queue empty after fetch - returning None')
- return None
-
- def __iter__(self):
- if self.iter_timeout is None:
- timeout = ITER_TIMEOUT_SECONDS
- else:
- timeout = self.iter_timeout
-
- while True:
- message = self.get_message(True, timeout)
- if message:
- yield message
- elif self.iter_timeout is None:
- # We did not receive any message yet but we don't have a
- # timeout, so give up the CPU for a while before trying again
- time.sleep(NO_MESSAGES_WAIT_TIME_SECONDS)
- else:
- # Timed out waiting for a message
- break
-
- def _fetch(self):
- # Create fetch request payloads for all the partitions
- partitions = dict((p, self.buffer_size)
- for p in self.fetch_offsets.keys())
- while partitions:
- requests = []
- for partition, buffer_size in six.iteritems(partitions):
- requests.append(FetchRequestPayload(self.topic, partition,
- self.fetch_offsets[partition],
- buffer_size))
- # Send request
- responses = self.client.send_fetch_request(
- requests,
- max_wait_time=int(self.fetch_max_wait_time),
- min_bytes=self.fetch_min_bytes,
- fail_on_error=False
- )
-
- retry_partitions = {}
- for resp in responses:
-
- try:
- check_error(resp)
- except UnknownTopicOrPartitionError:
- log.error('UnknownTopicOrPartitionError for %s:%d',
- resp.topic, resp.partition)
- self.client.reset_topic_metadata(resp.topic)
- raise
- except NotLeaderForPartitionError:
- log.error('NotLeaderForPartitionError for %s:%d',
- resp.topic, resp.partition)
- self.client.reset_topic_metadata(resp.topic)
- continue
- except OffsetOutOfRangeError:
- log.warning('OffsetOutOfRangeError for %s:%d. '
- 'Resetting partition offset...',
- resp.topic, resp.partition)
- self.reset_partition_offset(resp.partition)
- # Retry this partition
- retry_partitions[resp.partition] = partitions[resp.partition]
- continue
- except FailedPayloadsError as e:
- log.warning('FailedPayloadsError for %s:%d',
- e.payload.topic, e.payload.partition)
- # Retry this partition
- retry_partitions[e.payload.partition] = partitions[e.payload.partition]
- continue
-
- partition = resp.partition
- buffer_size = partitions[partition]
-
- # Check for partial message
- if resp.messages and isinstance(resp.messages[-1].message, PartialMessage):
-
- # If buffer is at max and all we got was a partial message
- # raise ConsumerFetchSizeTooSmall
- if (self.max_buffer_size is not None and
- buffer_size == self.max_buffer_size and
- len(resp.messages) == 1):
-
- log.error('Max fetch size %d too small', self.max_buffer_size)
- raise ConsumerFetchSizeTooSmall()
-
- if self.max_buffer_size is None:
- buffer_size *= 2
- else:
- buffer_size = min(buffer_size * 2, self.max_buffer_size)
- log.warning('Fetch size too small, increase to %d (2x) '
- 'and retry', buffer_size)
- retry_partitions[partition] = buffer_size
- resp.messages.pop()
-
- for message in resp.messages:
- if message.offset < self.fetch_offsets[partition]:
- log.debug('Skipping message %s because its offset is less than the consumer offset',
- message)
- continue
- # Put the message in our queue
- self.queue.put((partition, message))
- self.fetch_offsets[partition] = message.offset + 1
- partitions = retry_partitions
diff --git a/kafka/context.py b/kafka/context.py
deleted file mode 100644
index 1ebc71d..0000000
--- a/kafka/context.py
+++ /dev/null
@@ -1,178 +0,0 @@
-"""
-Context manager to commit/rollback consumer offsets.
-"""
-from __future__ import absolute_import
-
-from logging import getLogger
-
-from kafka.errors import check_error, OffsetOutOfRangeError
-from kafka.structs import OffsetCommitRequestPayload
-
-
-class OffsetCommitContext(object):
- """
- Provides commit/rollback semantics around a `SimpleConsumer`.
-
- Usage assumes that `auto_commit` is disabled, that messages are consumed in
- batches, and that the consuming process will record its own successful
- processing of each message. Both the commit and rollback operations respect
- a "high-water mark" to ensure that last unsuccessfully processed message
- will be retried.
-
- Example:
-
- .. code:: python
-
- consumer = SimpleConsumer(client, group, topic, auto_commit=False)
- consumer.provide_partition_info()
- consumer.fetch_last_known_offsets()
-
- while some_condition:
- with OffsetCommitContext(consumer) as context:
- messages = consumer.get_messages(count, block=False)
-
- for partition, message in messages:
- if can_process(message):
- context.mark(partition, message.offset)
- else:
- break
-
- if not context:
- sleep(delay)
-
-
- These semantics allow for deferred message processing (e.g. if `can_process`
- compares message time to clock time) and for repeated processing of the last
- unsuccessful message (until some external error is resolved).
- """
-
- def __init__(self, consumer):
- """
- :param consumer: an instance of `SimpleConsumer`
- """
- self.consumer = consumer
- self.initial_offsets = None
- self.high_water_mark = None
- self.logger = getLogger("kafka.context")
-
- def mark(self, partition, offset):
- """
- Set the high-water mark in the current context.
-
- In order to know the current partition, it is helpful to initialize
- the consumer to provide partition info via:
-
- .. code:: python
-
- consumer.provide_partition_info()
-
- """
- max_offset = max(offset + 1, self.high_water_mark.get(partition, 0))
-
- self.logger.debug("Setting high-water mark to: %s",
- {partition: max_offset})
-
- self.high_water_mark[partition] = max_offset
-
- def __nonzero__(self):
- """
- Return whether any operations were marked in the context.
- """
- return bool(self.high_water_mark)
-
- def __enter__(self):
- """
- Start a new context:
-
- - Record the initial offsets for rollback
- - Reset the high-water mark
- """
- self.initial_offsets = dict(self.consumer.offsets)
- self.high_water_mark = dict()
-
- self.logger.debug("Starting context at: %s", self.initial_offsets)
-
- return self
-
- def __exit__(self, exc_type, exc_value, traceback):
- """
- End a context.
-
- - If there was no exception, commit up to the current high-water mark.
- - If there was an offset of range error, attempt to find the correct
- initial offset.
- - If there was any other error, roll back to the initial offsets.
- """
- if exc_type is None:
- self.commit()
- elif isinstance(exc_value, OffsetOutOfRangeError):
- self.handle_out_of_range()
- return True
- else:
- self.rollback()
-
- def commit(self):
- """
- Commit this context's offsets:
-
- - If the high-water mark has moved, commit up to and position the
- consumer at the high-water mark.
- - Otherwise, reset to the consumer to the initial offsets.
- """
- if self.high_water_mark:
- self.logger.info("Committing offsets: %s", self.high_water_mark)
- self.commit_partition_offsets(self.high_water_mark)
- self.update_consumer_offsets(self.high_water_mark)
- else:
- self.update_consumer_offsets(self.initial_offsets)
-
- def rollback(self):
- """
- Rollback this context:
-
- - Position the consumer at the initial offsets.
- """
- self.logger.info("Rolling back context: %s", self.initial_offsets)
- self.update_consumer_offsets(self.initial_offsets)
-
- def commit_partition_offsets(self, partition_offsets):
- """
- Commit explicit partition/offset pairs.
- """
- self.logger.debug("Committing partition offsets: %s", partition_offsets)
-
- commit_requests = [
- OffsetCommitRequestPayload(self.consumer.topic, partition, offset, None)
- for partition, offset in partition_offsets.items()
- ]
- commit_responses = self.consumer.client.send_offset_commit_request(
- self.consumer.group,
- commit_requests,
- )
- for commit_response in commit_responses:
- check_error(commit_response)
-
- def update_consumer_offsets(self, partition_offsets):
- """
- Update consumer offsets to explicit positions.
- """
- self.logger.debug("Updating consumer offsets to: %s", partition_offsets)
-
- for partition, offset in partition_offsets.items():
- self.consumer.offsets[partition] = offset
-
- # consumer keeps other offset states beyond its `offsets` dictionary,
- # a relative seek with zero delta forces the consumer to reset to the
- # current value of the `offsets` dictionary
- self.consumer.seek(0, 1)
-
- def handle_out_of_range(self):
- """
- Handle out of range condition by seeking to the beginning of valid
- ranges.
-
- This assumes that an out of range doesn't happen by seeking past the end
- of valid ranges -- which is far less likely.
- """
- self.logger.info("Seeking beginning of partition on out of range error")
- self.consumer.seek(0, 0)
diff --git a/kafka/errors.py b/kafka/errors.py
index abef2c5..6da2908 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -472,22 +472,6 @@ class ConnectionError(KafkaConnectionError):
"""Deprecated"""
-class BufferUnderflowError(KafkaError):
- pass
-
-
-class ChecksumError(KafkaError):
- pass
-
-
-class ConsumerFetchSizeTooSmall(KafkaError):
- pass
-
-
-class ConsumerNoMoreData(KafkaError):
- pass
-
-
class ProtocolError(KafkaError):
pass
diff --git a/kafka/partitioner/__init__.py b/kafka/partitioner/__init__.py
index a9dbbdc..21a3bbb 100644
--- a/kafka/partitioner/__init__.py
+++ b/kafka/partitioner/__init__.py
@@ -1,10 +1,8 @@
from __future__ import absolute_import
-from kafka.partitioner.default import DefaultPartitioner
-from kafka.partitioner.hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
-from kafka.partitioner.roundrobin import RoundRobinPartitioner
+from kafka.partitioner.default import DefaultPartitioner, murmur2
+
__all__ = [
- 'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner',
- 'Murmur2Partitioner', 'LegacyPartitioner'
+ 'DefaultPartitioner', 'murmur2'
]
diff --git a/kafka/partitioner/base.py b/kafka/partitioner/base.py
deleted file mode 100644
index 0e36253..0000000
--- a/kafka/partitioner/base.py
+++ /dev/null
@@ -1,27 +0,0 @@
-from __future__ import absolute_import
-
-
-class Partitioner(object):
- """
- Base class for a partitioner
- """
- def __init__(self, partitions=None):
- """
- Initialize the partitioner
-
- Arguments:
- partitions: A list of available partitions (during startup) OPTIONAL.
- """
- self.partitions = partitions
-
- def __call__(self, key, all_partitions=None, available_partitions=None):
- """
- Takes a string key, num_partitions and available_partitions as argument and returns
- a partition to be used for the message
-
- Arguments:
- key: the key to use for partitioning.
- all_partitions: a list of the topic's partitions.
- available_partitions: a list of the broker's currently avaliable partitions(optional).
- """
- raise NotImplementedError('partition function has to be implemented')
diff --git a/kafka/partitioner/default.py b/kafka/partitioner/default.py
index e4d9df5..d0914c6 100644
--- a/kafka/partitioner/default.py
+++ b/kafka/partitioner/default.py
@@ -2,7 +2,7 @@ from __future__ import absolute_import
import random
-from kafka.partitioner.hashed import murmur2
+from kafka.vendor import six
class DefaultPartitioner(object):
@@ -30,3 +30,73 @@ class DefaultPartitioner(object):
idx &= 0x7fffffff
idx %= len(all_partitions)
return all_partitions[idx]
+
+
+# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
+def murmur2(data):
+ """Pure-python Murmur2 implementation.
+
+ Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
+
+ Args:
+ data (bytes): opaque bytes
+
+ Returns: MurmurHash2 of data
+ """
+ # Python2 bytes is really a str, causing the bitwise operations below to fail
+ # so convert to bytearray.
+ if six.PY2:
+ data = bytearray(bytes(data))
+
+ length = len(data)
+ seed = 0x9747b28c
+ # 'm' and 'r' are mixing constants generated offline.
+ # They're not really 'magic', they just happen to work well.
+ m = 0x5bd1e995
+ r = 24
+
+ # Initialize the hash to a random value
+ h = seed ^ length
+ length4 = length // 4
+
+ for i in range(length4):
+ i4 = i * 4
+ k = ((data[i4 + 0] & 0xff) +
+ ((data[i4 + 1] & 0xff) << 8) +
+ ((data[i4 + 2] & 0xff) << 16) +
+ ((data[i4 + 3] & 0xff) << 24))
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+ k ^= (k % 0x100000000) >> r # k ^= k >>> r
+ k &= 0xffffffff
+ k *= m
+ k &= 0xffffffff
+
+ h *= m
+ h &= 0xffffffff
+ h ^= k
+ h &= 0xffffffff
+
+ # Handle the last few bytes of the input array
+ extra_bytes = length % 4
+ if extra_bytes >= 3:
+ h ^= (data[(length & ~3) + 2] & 0xff) << 16
+ h &= 0xffffffff
+ if extra_bytes >= 2:
+ h ^= (data[(length & ~3) + 1] & 0xff) << 8
+ h &= 0xffffffff
+ if extra_bytes >= 1:
+ h ^= (data[length & ~3] & 0xff)
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+
+ h ^= (h % 0x100000000) >> 13 # h >>> 13;
+ h &= 0xffffffff
+ h *= m
+ h &= 0xffffffff
+ h ^= (h % 0x100000000) >> 15 # h >>> 15;
+ h &= 0xffffffff
+
+ return h
diff --git a/kafka/partitioner/hashed.py b/kafka/partitioner/hashed.py
deleted file mode 100644
index be92daf..0000000
--- a/kafka/partitioner/hashed.py
+++ /dev/null
@@ -1,118 +0,0 @@
-from __future__ import absolute_import
-
-from kafka.vendor import six
-
-from kafka.partitioner.base import Partitioner
-
-
-class Murmur2Partitioner(Partitioner):
- """
- Implements a partitioner which selects the target partition based on
- the hash of the key. Attempts to apply the same hashing
- function as mainline java client.
- """
- def __call__(self, key, partitions=None, available=None):
- if available:
- return self.partition(key, available)
- return self.partition(key, partitions)
-
- def partition(self, key, partitions=None):
- if not partitions:
- partitions = self.partitions
-
- # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
- idx = (murmur2(key) & 0x7fffffff) % len(partitions)
-
- return partitions[idx]
-
-
-class LegacyPartitioner(object):
- """DEPRECATED -- See Issue 374
-
- Implements a partitioner which selects the target partition based on
- the hash of the key
- """
- def __init__(self, partitions):
- self.partitions = partitions
-
- def partition(self, key, partitions=None):
- if not partitions:
- partitions = self.partitions
- size = len(partitions)
- idx = hash(key) % size
-
- return partitions[idx]
-
-
-# Default will change to Murmur2 in 0.10 release
-HashedPartitioner = LegacyPartitioner
-
-
-# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
-def murmur2(data):
- """Pure-python Murmur2 implementation.
-
- Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
-
- Args:
- data (bytes): opaque bytes
-
- Returns: MurmurHash2 of data
- """
- # Python2 bytes is really a str, causing the bitwise operations below to fail
- # so convert to bytearray.
- if six.PY2:
- data = bytearray(bytes(data))
-
- length = len(data)
- seed = 0x9747b28c
- # 'm' and 'r' are mixing constants generated offline.
- # They're not really 'magic', they just happen to work well.
- m = 0x5bd1e995
- r = 24
-
- # Initialize the hash to a random value
- h = seed ^ length
- length4 = length // 4
-
- for i in range(length4):
- i4 = i * 4
- k = ((data[i4 + 0] & 0xff) +
- ((data[i4 + 1] & 0xff) << 8) +
- ((data[i4 + 2] & 0xff) << 16) +
- ((data[i4 + 3] & 0xff) << 24))
- k &= 0xffffffff
- k *= m
- k &= 0xffffffff
- k ^= (k % 0x100000000) >> r # k ^= k >>> r
- k &= 0xffffffff
- k *= m
- k &= 0xffffffff
-
- h *= m
- h &= 0xffffffff
- h ^= k
- h &= 0xffffffff
-
- # Handle the last few bytes of the input array
- extra_bytes = length % 4
- if extra_bytes >= 3:
- h ^= (data[(length & ~3) + 2] & 0xff) << 16
- h &= 0xffffffff
- if extra_bytes >= 2:
- h ^= (data[(length & ~3) + 1] & 0xff) << 8
- h &= 0xffffffff
- if extra_bytes >= 1:
- h ^= (data[length & ~3] & 0xff)
- h &= 0xffffffff
- h *= m
- h &= 0xffffffff
-
- h ^= (h % 0x100000000) >> 13 # h >>> 13;
- h &= 0xffffffff
- h *= m
- h &= 0xffffffff
- h ^= (h % 0x100000000) >> 15 # h >>> 15;
- h &= 0xffffffff
-
- return h
diff --git a/kafka/partitioner/roundrobin.py b/kafka/partitioner/roundrobin.py
deleted file mode 100644
index e68c372..0000000
--- a/kafka/partitioner/roundrobin.py
+++ /dev/null
@@ -1,70 +0,0 @@
-from __future__ import absolute_import
-
-from kafka.partitioner.base import Partitioner
-
-
-class RoundRobinPartitioner(Partitioner):
- def __init__(self, partitions=None):
- self.partitions_iterable = CachedPartitionCycler(partitions)
- if partitions:
- self._set_partitions(partitions)
- else:
- self.partitions = None
-
- def __call__(self, key, all_partitions=None, available_partitions=None):
- if available_partitions:
- cur_partitions = available_partitions
- else:
- cur_partitions = all_partitions
- if not self.partitions:
- self._set_partitions(cur_partitions)
- elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None:
- self._set_partitions(cur_partitions)
- return next(self.partitions_iterable)
-
- def _set_partitions(self, available_partitions):
- self.partitions = available_partitions
- self.partitions_iterable.set_partitions(available_partitions)
-
- def partition(self, key, all_partitions=None, available_partitions=None):
- return self.__call__(key, all_partitions, available_partitions)
-
-
-class CachedPartitionCycler(object):
- def __init__(self, partitions=None):
- self.partitions = partitions
- if partitions:
- assert type(partitions) is list
- self.cur_pos = None
-
- def __next__(self):
- return self.next()
-
- @staticmethod
- def _index_available(cur_pos, partitions):
- return cur_pos < len(partitions)
-
- def set_partitions(self, partitions):
- if self.cur_pos:
- if not self._index_available(self.cur_pos, partitions):
- self.cur_pos = 0
- self.partitions = partitions
- return None
-
- self.partitions = partitions
- next_item = self.partitions[self.cur_pos]
- if next_item in partitions:
- self.cur_pos = partitions.index(next_item)
- else:
- self.cur_pos = 0
- return None
- self.partitions = partitions
-
- def next(self):
- assert self.partitions is not None
- if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions):
- self.cur_pos = 1
- return self.partitions[0]
- cur_item = self.partitions[self.cur_pos]
- self.cur_pos += 1
- return cur_item
diff --git a/kafka/producer/__init__.py b/kafka/producer/__init__.py
index 54fd8d2..576c772 100644
--- a/kafka/producer/__init__.py
+++ b/kafka/producer/__init__.py
@@ -1,10 +1,7 @@
from __future__ import absolute_import
from kafka.producer.kafka import KafkaProducer
-from kafka.producer.simple import SimpleProducer
-from kafka.producer.keyed import KeyedProducer
__all__ = [
- 'KafkaProducer',
- 'SimpleProducer', 'KeyedProducer' # deprecated
+ 'KafkaProducer'
]
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
deleted file mode 100644
index b323966..0000000
--- a/kafka/producer/base.py
+++ /dev/null
@@ -1,482 +0,0 @@
-from __future__ import absolute_import
-
-import atexit
-import logging
-import time
-
-try:
- from queue import Empty, Full, Queue # pylint: disable=import-error
-except ImportError:
- from Queue import Empty, Full, Queue # pylint: disable=import-error
-from collections import defaultdict
-
-from threading import Thread, Event
-
-from kafka.vendor import six
-
-from kafka.errors import (
- kafka_errors, UnsupportedCodecError, FailedPayloadsError,
- RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
- RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
-from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
-from kafka.structs import (
- ProduceRequestPayload, ProduceResponsePayload, TopicPartition, RetryOptions)
-
-log = logging.getLogger('kafka.producer')
-
-BATCH_SEND_DEFAULT_INTERVAL = 20
-BATCH_SEND_MSG_COUNT = 20
-
-# unlimited
-ASYNC_QUEUE_MAXSIZE = 0
-ASYNC_QUEUE_PUT_TIMEOUT = 0
-# unlimited retries by default
-ASYNC_RETRY_LIMIT = None
-ASYNC_RETRY_BACKOFF_MS = 100
-ASYNC_RETRY_ON_TIMEOUTS = True
-ASYNC_LOG_MESSAGES_ON_ERROR = True
-
-STOP_ASYNC_PRODUCER = -1
-ASYNC_STOP_TIMEOUT_SECS = 30
-
-SYNC_FAIL_ON_ERROR_DEFAULT = True
-
-
-def _send_upstream(queue, client, codec, batch_time, batch_size,
- req_acks, ack_timeout, retry_options, stop_event,
- log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
- stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
- codec_compresslevel=None):
- """Private method to manage producing messages asynchronously
-
- Listens on the queue for a specified number of messages or until
- a specified timeout and then sends messages to the brokers in grouped
- requests (one per broker).
-
- Messages placed on the queue should be tuples that conform to this format:
- ((topic, partition), message, key)
-
- Currently does not mark messages with task_done. Do not attempt to
- :meth:`join`!
-
- Arguments:
- queue (threading.Queue): the queue from which to get messages
- client (kafka.SimpleClient): instance to use for communicating
- with brokers
- codec (kafka.protocol.ALL_CODECS): compression codec to use
- batch_time (int): interval in seconds to send message batches
- batch_size (int): count of messages that will trigger an immediate send
- req_acks: required acks to use with ProduceRequests. see server protocol
- ack_timeout: timeout to wait for required acks. see server protocol
- retry_options (RetryOptions): settings for retry limits, backoff etc
- stop_event (threading.Event): event to monitor for shutdown signal.
- when this event is 'set', the producer will stop sending messages.
- log_messages_on_error (bool, optional): log stringified message-contents
- on any produce error, otherwise only log a hash() of the contents,
- defaults to True.
- stop_timeout (int or float, optional): number of seconds to continue
- retrying messages after stop_event is set, defaults to 30.
- """
- request_tries = {}
-
- while not stop_event.is_set():
- try:
- client.reinit()
- except Exception as e:
- log.warning('Async producer failed to connect to brokers; backoff for %s(ms) before retrying', retry_options.backoff_ms)
- time.sleep(float(retry_options.backoff_ms) / 1000)
- else:
- break
-
- stop_at = None
- while not (stop_event.is_set() and queue.empty() and not request_tries):
-
- # Handle stop_timeout
- if stop_event.is_set():
- if not stop_at:
- stop_at = stop_timeout + time.time()
- if time.time() > stop_at:
- log.debug('Async producer stopping due to stop_timeout')
- break
-
- timeout = batch_time
- count = batch_size
- send_at = time.time() + timeout
- msgset = defaultdict(list)
-
- # Merging messages will require a bit more work to manage correctly
- # for now, don't look for new batches if we have old ones to retry
- if request_tries:
- count = 0
- log.debug('Skipping new batch collection to handle retries')
- else:
- log.debug('Batching size: %s, timeout: %s', count, timeout)
-
- # Keep fetching till we gather enough messages or a
- # timeout is reached
- while count > 0 and timeout >= 0:
- try:
- topic_partition, msg, key = queue.get(timeout=timeout)
- except Empty:
- break
-
- # Check if the controller has requested us to stop
- if topic_partition == STOP_ASYNC_PRODUCER:
- stop_event.set()
- break
-
- # Adjust the timeout to match the remaining period
- count -= 1
- timeout = send_at - time.time()
- msgset[topic_partition].append((msg, key))
-
- # Send collected requests upstream
- for topic_partition, msg in msgset.items():
- messages = create_message_set(msg, codec, key, codec_compresslevel)
- req = ProduceRequestPayload(
- topic_partition.topic,
- topic_partition.partition,
- tuple(messages))
- request_tries[req] = 0
-
- if not request_tries:
- continue
-
- reqs_to_retry, error_cls = [], None
- retry_state = {
- 'do_backoff': False,
- 'do_refresh': False
- }
-
- def _handle_error(error_cls, request):
- if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
- reqs_to_retry.append(request)
- if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
- retry_state['do_backoff'] |= True
- if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
- retry_state['do_refresh'] |= True
-
- requests = list(request_tries.keys())
- log.debug('Sending: %s', requests)
- responses = client.send_produce_request(requests,
- acks=req_acks,
- timeout=ack_timeout,
- fail_on_error=False)
-
- log.debug('Received: %s', responses)
- for i, response in enumerate(responses):
- error_cls = None
- if isinstance(response, FailedPayloadsError):
- error_cls = response.__class__
- orig_req = response.payload
-
- elif isinstance(response, ProduceResponsePayload) and response.error:
- error_cls = kafka_errors.get(response.error, UnknownError)
- orig_req = requests[i]
-
- if error_cls:
- _handle_error(error_cls, orig_req)
- log.error('%s sending ProduceRequestPayload (#%d of %d) '
- 'to %s:%d with msgs %s',
- error_cls.__name__, (i + 1), len(requests),
- orig_req.topic, orig_req.partition,
- orig_req.messages if log_messages_on_error
- else hash(orig_req.messages))
-
- if not reqs_to_retry:
- request_tries = {}
- continue
-
- # doing backoff before next retry
- if retry_state['do_backoff'] and retry_options.backoff_ms:
- log.warning('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms)
- time.sleep(float(retry_options.backoff_ms) / 1000)
-
- # refresh topic metadata before next retry
- if retry_state['do_refresh']:
- log.warning('Async producer forcing metadata refresh metadata before retrying')
- try:
- client.load_metadata_for_topics()
- except Exception:
- log.exception("Async producer couldn't reload topic metadata.")
-
- # Apply retry limit, dropping messages that are over
- request_tries = dict(
- (key, count + 1)
- for (key, count) in request_tries.items()
- if key in reqs_to_retry
- and (retry_options.limit is None
- or (count < retry_options.limit))
- )
-
- # Log messages we are going to retry
- for orig_req in request_tries.keys():
- log.info('Retrying ProduceRequestPayload to %s:%d with msgs %s',
- orig_req.topic, orig_req.partition,
- orig_req.messages if log_messages_on_error
- else hash(orig_req.messages))
-
- if request_tries or not queue.empty():
- log.error('Stopped producer with %d unsent messages', len(request_tries) + queue.qsize())
-
-
-class Producer(object):
- """
- Base class to be used by producers
-
- Arguments:
- client (kafka.SimpleClient): instance to use for broker
- communications. If async_send=True, the background thread will use
- :meth:`client.copy`, which is expected to return a thread-safe
- object.
- codec (kafka.protocol.ALL_CODECS): compression codec to use.
- req_acks (int, optional): A value indicating the acknowledgements that
- the server must receive before responding to the request,
- defaults to 1 (local ack).
- ack_timeout (int, optional): millisecond timeout to wait for the
- configured req_acks, defaults to 1000.
- sync_fail_on_error (bool, optional): whether sync producer should
- raise exceptions (True), or just return errors (False),
- defaults to True.
- async_send (bool, optional): send message using a background thread,
- defaults to False.
- batch_send_every_n (int, optional): If async_send is True, messages are
- sent in batches of this size, defaults to 20.
- batch_send_every_t (int or float, optional): If async_send is True,
- messages are sent immediately after this timeout in seconds, even
- if there are fewer than batch_send_every_n, defaults to 20.
- async_retry_limit (int, optional): number of retries for failed messages
- or None for unlimited, defaults to None / unlimited.
- async_retry_backoff_ms (int, optional): milliseconds to backoff on
- failed messages, defaults to 100.
- async_retry_on_timeouts (bool, optional): whether to retry on
- RequestTimedOutError, defaults to True.
- async_queue_maxsize (int, optional): limit to the size of the
- internal message queue in number of messages (not size), defaults
- to 0 (no limit).
- async_queue_put_timeout (int or float, optional): timeout seconds
- for queue.put in send_messages for async producers -- will only
- apply if async_queue_maxsize > 0 and the queue is Full,
- defaults to 0 (fail immediately on full queue).
- async_log_messages_on_error (bool, optional): set to False and the
- async producer will only log hash() contents on failed produce
- requests, defaults to True (log full messages). Hash logging
- will not allow you to identify the specific message that failed,
- but it will allow you to match failures with retries.
- async_stop_timeout (int or float, optional): seconds to continue
- attempting to send queued messages after :meth:`producer.stop`,
- defaults to 30.
-
- Deprecated Arguments:
- async (bool, optional): send message using a background thread,
- defaults to False. Deprecated, use 'async_send'
- batch_send (bool, optional): If True, messages are sent by a background
- thread in batches, defaults to False. Deprecated, use 'async_send'
- """
- ACK_NOT_REQUIRED = 0 # No ack is required
- ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
- ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
- DEFAULT_ACK_TIMEOUT = 1000
-
- def __init__(self, client,
- req_acks=ACK_AFTER_LOCAL_WRITE,
- ack_timeout=DEFAULT_ACK_TIMEOUT,
- codec=None,
- codec_compresslevel=None,
- sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
- async_send=False,
- batch_send=False, # deprecated, use async_send
- batch_send_every_n=BATCH_SEND_MSG_COUNT,
- batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
- async_retry_limit=ASYNC_RETRY_LIMIT,
- async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
- async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
- async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
- async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
- async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
- async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
- **kwargs):
-
- # async renamed async_send for python3.7 support
- if 'async' in kwargs:
- log.warning('Deprecated async option found -- use async_send')
- async_send = kwargs['async']
-
- if async_send:
- assert batch_send_every_n > 0
- assert batch_send_every_t > 0
- assert async_queue_maxsize >= 0
-
- self.client = client
- self.async_send = async_send
- self.req_acks = req_acks
- self.ack_timeout = ack_timeout
- self.stopped = False
-
- if codec is None:
- codec = CODEC_NONE
- elif codec not in ALL_CODECS:
- raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
-
- self.codec = codec
- self.codec_compresslevel = codec_compresslevel
-
- if self.async_send:
- # Messages are sent through this queue
- self.queue = Queue(async_queue_maxsize)
- self.async_queue_put_timeout = async_queue_put_timeout
- async_retry_options = RetryOptions(
- limit=async_retry_limit,
- backoff_ms=async_retry_backoff_ms,
- retry_on_timeouts=async_retry_on_timeouts)
- self.thread_stop_event = Event()
- self.thread = Thread(
- target=_send_upstream,
- args=(self.queue, self.client.copy(), self.codec,
- batch_send_every_t, batch_send_every_n,
- self.req_acks, self.ack_timeout,
- async_retry_options, self.thread_stop_event),
- kwargs={'log_messages_on_error': async_log_messages_on_error,
- 'stop_timeout': async_stop_timeout,
- 'codec_compresslevel': self.codec_compresslevel}
- )
-
- # Thread will die if main thread exits
- self.thread.daemon = True
- self.thread.start()
-
- def cleanup(obj):
- if not obj.stopped:
- obj.stop()
- self._cleanup_func = cleanup
- atexit.register(cleanup, self)
- else:
- self.sync_fail_on_error = sync_fail_on_error
-
- def send_messages(self, topic, partition, *msg):
- """Helper method to send produce requests.
-
- Note that msg type *must* be encoded to bytes by user. Passing unicode
- message will not work, for example you should encode before calling
- send_messages via something like `unicode_message.encode('utf-8')`
- All messages will set the message 'key' to None.
-
- Arguments:
- topic (str): name of topic for produce request
- partition (int): partition number for produce request
- *msg (bytes): one or more message payloads
-
- Returns:
- ResponseRequest returned by server
-
- Raises:
- FailedPayloadsError: low-level connection error, can be caused by
- networking failures, or a malformed request.
- KafkaUnavailableError: all known brokers are down when attempting
- to refresh metadata.
- LeaderNotAvailableError: topic or partition is initializing or
- a broker failed and leadership election is in progress.
- NotLeaderForPartitionError: metadata is out of sync; the broker
- that the request was sent to is not the leader for the topic
- or partition.
- UnknownTopicOrPartitionError: the topic or partition has not
- been created yet and auto-creation is not available.
- AsyncProducerQueueFull: in async mode, if too many messages are
- unsent and remain in the internal queue.
- """
- return self._send_messages(topic, partition, *msg)
-
- def _send_messages(self, topic, partition, *msg, **kwargs):
- key = kwargs.pop('key', None)
-
- # Guarantee that msg is actually a list or tuple (should always be true)
- if not isinstance(msg, (list, tuple)):
- raise TypeError("msg is not a list or tuple!")
-
- for m in msg:
- # The protocol allows to have key & payload with null values both,
- # (https://goo.gl/o694yN) but having (null,null) pair doesn't make sense.
- if m is None:
- if key is None:
- raise TypeError("key and payload can't be null in one")
- # Raise TypeError if any non-null message is not encoded as bytes
- elif not isinstance(m, six.binary_type):
- raise TypeError("all produce message payloads must be null or type bytes")
-
- # Raise TypeError if the key is not encoded as bytes
- if key is not None and not isinstance(key, six.binary_type):
- raise TypeError("the key must be type bytes")
-
- if self.async_send:
- for idx, m in enumerate(msg):
- try:
- item = (TopicPartition(topic, partition), m, key)
- if self.async_queue_put_timeout == 0:
- self.queue.put_nowait(item)
- else:
- self.queue.put(item, True, self.async_queue_put_timeout)
- except Full:
- raise AsyncProducerQueueFull(
- msg[idx:],
- 'Producer async queue overfilled. '
- 'Current queue size %d.' % (self.queue.qsize(),))
- resp = []
- else:
- messages = create_message_set([(m, key) for m in msg], self.codec, key, self.codec_compresslevel)
- req = ProduceRequestPayload(topic, partition, messages)
- try:
- resp = self.client.send_produce_request(
- [req], acks=self.req_acks, timeout=self.ack_timeout,
- fail_on_error=self.sync_fail_on_error
- )
- except Exception:
- log.exception("Unable to send messages")
- raise
- return resp
-
- def stop(self, timeout=None):
- """
- Stop the producer (async mode). Blocks until async thread completes.
- """
- if timeout is not None:
- log.warning('timeout argument to stop() is deprecated - '
- 'it will be removed in future release')
-
- if not self.async_send:
- log.warning('producer.stop() called, but producer is not async')
- return
-
- if self.stopped:
- log.warning('producer.stop() called, but producer is already stopped')
- return
-
- if self.async_send:
- self.queue.put((STOP_ASYNC_PRODUCER, None, None))
- self.thread_stop_event.set()
- self.thread.join()
-
- if hasattr(self, '_cleanup_func'):
- # Remove cleanup handler now that we've stopped
-
- # py3 supports unregistering
- if hasattr(atexit, 'unregister'):
- atexit.unregister(self._cleanup_func) # pylint: disable=no-member
-
- # py2 requires removing from private attribute...
- else:
-
- # ValueError on list.remove() if the exithandler no longer exists
- # but that is fine here
- try:
- atexit._exithandlers.remove( # pylint: disable=no-member
- (self._cleanup_func, (self,), {}))
- except ValueError:
- pass
-
- del self._cleanup_func
-
- self.stopped = True
-
- def __del__(self):
- if self.async_send and not self.stopped:
- self.stop()
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
deleted file mode 100644
index 3ba9216..0000000
--- a/kafka/producer/keyed.py
+++ /dev/null
@@ -1,49 +0,0 @@
-from __future__ import absolute_import
-
-import logging
-import warnings
-
-from kafka.producer.base import Producer
-from kafka.partitioner import HashedPartitioner
-
-
-log = logging.getLogger(__name__)
-
-
-class KeyedProducer(Producer):
- """
- A producer which distributes messages to partitions based on the key
-
- See Producer class for Arguments
-
- Additional Arguments:
- partitioner: A partitioner class that will be used to get the partition
- to send the message to. Must be derived from Partitioner.
- Defaults to HashedPartitioner.
- """
- def __init__(self, *args, **kwargs):
- self.partitioner_class = kwargs.pop('partitioner', HashedPartitioner)
- self.partitioners = {}
- super(KeyedProducer, self).__init__(*args, **kwargs)
-
- def _next_partition(self, topic, key):
- if topic not in self.partitioners:
- if not self.client.has_metadata_for_topic(topic):
- self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
-
- self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))
-
- partitioner = self.partitioners[topic]
- return partitioner.partition(key)
-
- def send_messages(self, topic, key, *msg):
- partition = self._next_partition(topic, key)
- return self._send_messages(topic, partition, *msg, key=key)
-
- # DEPRECATED
- def send(self, topic, key, msg):
- warnings.warn("KeyedProducer.send is deprecated in favor of send_messages", DeprecationWarning)
- return self.send_messages(topic, key, msg)
-
- def __repr__(self):
- return '<KeyedProducer batch=%s>' % (self.async_send,)
diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py
deleted file mode 100644
index f334a49..0000000
--- a/kafka/producer/simple.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from __future__ import absolute_import
-
-from itertools import cycle
-import logging
-import random
-
-from kafka.vendor.six.moves import range
-
-from kafka.producer.base import Producer
-
-
-log = logging.getLogger(__name__)
-
-
-class SimpleProducer(Producer):
- """A simple, round-robin producer.
-
- See Producer class for Base Arguments
-
- Additional Arguments:
- random_start (bool, optional): randomize the initial partition which
- the first message block will be published to, otherwise
- if false, the first message block will always publish
- to partition 0 before cycling through each partition,
- defaults to True.
- """
- def __init__(self, *args, **kwargs):
- self.partition_cycles = {}
- self.random_start = kwargs.pop('random_start', True)
- super(SimpleProducer, self).__init__(*args, **kwargs)
-
- def _next_partition(self, topic):
- if topic not in self.partition_cycles:
- if not self.client.has_metadata_for_topic(topic):
- self.client.ensure_topic_exists(topic)
-
- self.partition_cycles[topic] = cycle(self.client.get_partition_ids_for_topic(topic))
-
- # Randomize the initial partition that is returned
- if self.random_start:
- num_partitions = len(self.client.get_partition_ids_for_topic(topic))
- for _ in range(random.randint(0, num_partitions-1)):
- next(self.partition_cycles[topic])
-
- return next(self.partition_cycles[topic])
-
- def send_messages(self, topic, *msg):
- partition = self._next_partition(topic)
- return super(SimpleProducer, self).send_messages(
- topic, partition, *msg
- )
-
- def __repr__(self):
- return '<SimpleProducer batch=%s>' % (self.async_send,)
diff --git a/kafka/protocol/__init__.py b/kafka/protocol/__init__.py
index 8cf5640..26dcc78 100644
--- a/kafka/protocol/__init__.py
+++ b/kafka/protocol/__init__.py
@@ -1,11 +1,5 @@
from __future__ import absolute_import
-from kafka.protocol.legacy import (
- create_message, create_gzip_message,
- create_snappy_message, create_message_set,
- CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, ALL_CODECS,
- ATTRIBUTE_CODEC_MASK, KafkaProtocol,
-)
API_KEYS = {
0: 'Produce',
diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py
deleted file mode 100644
index 2e8f5bc..0000000
--- a/kafka/protocol/legacy.py
+++ /dev/null
@@ -1,474 +0,0 @@
-from __future__ import absolute_import
-
-import logging
-import struct
-
-from kafka.vendor import six # pylint: disable=import-error
-
-import kafka.protocol.commit
-import kafka.protocol.fetch
-import kafka.protocol.message
-import kafka.protocol.metadata
-import kafka.protocol.offset
-import kafka.protocol.produce
-import kafka.structs
-
-from kafka.codec import gzip_encode, snappy_encode
-from kafka.errors import ProtocolError, UnsupportedCodecError
-from kafka.util import (
- crc32, read_short_string, relative_unpack,
- write_int_string, group_by_topic_and_partition)
-from kafka.protocol.message import MessageSet
-
-
-log = logging.getLogger(__name__)
-
-ATTRIBUTE_CODEC_MASK = 0x03
-CODEC_NONE = 0x00
-CODEC_GZIP = 0x01
-CODEC_SNAPPY = 0x02
-ALL_CODECS = (CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY)
-
-
-class KafkaProtocol(object):
- """
- Class to encapsulate all of the protocol encoding/decoding.
- This class does not have any state associated with it, it is purely
- for organization.
- """
- PRODUCE_KEY = 0
- FETCH_KEY = 1
- OFFSET_KEY = 2
- METADATA_KEY = 3
- OFFSET_COMMIT_KEY = 8
- OFFSET_FETCH_KEY = 9
- CONSUMER_METADATA_KEY = 10
-
- ###################
- # Private API #
- ###################
-
- @classmethod
- def _encode_message_header(cls, client_id, correlation_id, request_key,
- version=0):
- """
- Encode the common request envelope
- """
- return struct.pack('>hhih%ds' % len(client_id),
- request_key, # ApiKey
- version, # ApiVersion
- correlation_id, # CorrelationId
- len(client_id), # ClientId size
- client_id) # ClientId
-
- @classmethod
- def _encode_message_set(cls, messages):
- """
- Encode a MessageSet. Unlike other arrays in the protocol,
- MessageSets are not length-prefixed
-
- Format
- ======
- MessageSet => [Offset MessageSize Message]
- Offset => int64
- MessageSize => int32
- """
- message_set = []
- for message in messages:
- encoded_message = KafkaProtocol._encode_message(message)
- message_set.append(struct.pack('>qi%ds' % len(encoded_message), 0,
- len(encoded_message),
- encoded_message))
- return b''.join(message_set)
-
- @classmethod
- def _encode_message(cls, message):
- """
- Encode a single message.
-
- The magic number of a message is a format version number.
- The only supported magic number right now is zero
-
- Format
- ======
- Message => Crc MagicByte Attributes Key Value
- Crc => int32
- MagicByte => int8
- Attributes => int8
- Key => bytes
- Value => bytes
- """
- if message.magic == 0:
- msg = b''.join([
- struct.pack('>BB', message.magic, message.attributes),
- write_int_string(message.key),
- write_int_string(message.value)
- ])
- crc = crc32(msg)
- msg = struct.pack('>i%ds' % len(msg), crc, msg)
- else:
- raise ProtocolError("Unexpected magic number: %d" % message.magic)
- return msg
-
- ##################
- # Public API #
- ##################
-
- @classmethod
- def encode_produce_request(cls, payloads=(), acks=1, timeout=1000):
- """
- Encode a ProduceRequest struct
-
- Arguments:
- payloads: list of ProduceRequestPayload
- acks: How "acky" you want the request to be
- 1: written to disk by the leader
- 0: immediate response
- -1: waits for all replicas to be in sync
- timeout: Maximum time (in ms) the server will wait for replica acks.
- This is _not_ a socket timeout
-
- Returns: ProduceRequest
- """
- if acks not in (1, 0, -1):
- raise ValueError('ProduceRequest acks (%s) must be 1, 0, -1' % acks)
-
- topics = []
- for topic, topic_payloads in group_by_topic_and_partition(payloads).items():
- topic_msgs = []
- for partition, payload in topic_payloads.items():
- partition_msgs = []
- for msg in payload.messages:
- m = kafka.protocol.message.Message(
- msg.value, key=msg.key,
- magic=msg.magic, attributes=msg.attributes
- )
- partition_msgs.append((0, m.encode()))
- topic_msgs.append((partition, MessageSet.encode(partition_msgs, prepend_size=False)))
- topics.append((topic, topic_msgs))
-
-
- return kafka.protocol.produce.ProduceRequest[0](
- required_acks=acks,
- timeout=timeout,
- topics=topics
- )
-
- @classmethod
- def decode_produce_response(cls, response):
- """
- Decode ProduceResponse to ProduceResponsePayload
-
- Arguments:
- response: ProduceResponse
-
- Return: list of ProduceResponsePayload
- """
- return [
- kafka.structs.ProduceResponsePayload(topic, partition, error, offset)
- for topic, partitions in response.topics
- for partition, error, offset in partitions
- ]
-
- @classmethod
- def encode_fetch_request(cls, payloads=(), max_wait_time=100, min_bytes=4096):
- """
- Encodes a FetchRequest struct
-
- Arguments:
- payloads: list of FetchRequestPayload
- max_wait_time (int, optional): ms to block waiting for min_bytes
- data. Defaults to 100.
- min_bytes (int, optional): minimum bytes required to return before
- max_wait_time. Defaults to 4096.
-
- Return: FetchRequest
- """
- return kafka.protocol.fetch.FetchRequest[0](
- replica_id=-1,
- max_wait_time=max_wait_time,
- min_bytes=min_bytes,
- topics=[(
- topic,
- [(
- partition,
- payload.offset,
- payload.max_bytes)
- for partition, payload in topic_payloads.items()])
- for topic, topic_payloads in group_by_topic_and_partition(payloads).items()])
-
- @classmethod
- def decode_fetch_response(cls, response):
- """
- Decode FetchResponse struct to FetchResponsePayloads
-
- Arguments:
- response: FetchResponse
- """
- return [
- kafka.structs.FetchResponsePayload(
- topic, partition, error, highwater_offset, [
- offset_and_msg
- for offset_and_msg in cls.decode_message_set(messages)])
- for topic, partitions in response.topics
- for partition, error, highwater_offset, messages in partitions
- ]
-
- @classmethod
- def decode_message_set(cls, raw_data):
- messages = MessageSet.decode(raw_data, bytes_to_read=len(raw_data))
- for offset, _, message in messages:
- if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
- inner_messages = message.decompress()
- for (inner_offset, _msg_size, inner_msg) in inner_messages:
- yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
- else:
- yield kafka.structs.OffsetAndMessage(offset, message)
-
- @classmethod
- def encode_offset_request(cls, payloads=()):
- return kafka.protocol.offset.OffsetRequest[0](
- replica_id=-1,
- topics=[(
- topic,
- [(
- partition,
- payload.time,
- payload.max_offsets)
- for partition, payload in six.iteritems(topic_payloads)])
- for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
-
- @classmethod
- def decode_offset_response(cls, response):
- """
- Decode OffsetResponse into OffsetResponsePayloads
-
- Arguments:
- response: OffsetResponse
-
- Returns: list of OffsetResponsePayloads
- """
- return [
- kafka.structs.OffsetResponsePayload(topic, partition, error, tuple(offsets))
- for topic, partitions in response.topics
- for partition, error, offsets in partitions
- ]
-
- @classmethod
- def encode_list_offset_request(cls, payloads=()):
- return kafka.protocol.offset.OffsetRequest[1](
- replica_id=-1,
- topics=[(
- topic,
- [(
- partition,
- payload.time)
- for partition, payload in six.iteritems(topic_payloads)])
- for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
-
- @classmethod
- def decode_list_offset_response(cls, response):
- """
- Decode OffsetResponse_v2 into ListOffsetResponsePayloads
-
- Arguments:
- response: OffsetResponse_v2
-
- Returns: list of ListOffsetResponsePayloads
- """
- return [
- kafka.structs.ListOffsetResponsePayload(topic, partition, error, timestamp, offset)
- for topic, partitions in response.topics
- for partition, error, timestamp, offset in partitions
- ]
-
-
- @classmethod
- def encode_metadata_request(cls, topics=(), payloads=None):
- """
- Encode a MetadataRequest
-
- Arguments:
- topics: list of strings
- """
- if payloads is not None:
- topics = payloads
-
- return kafka.protocol.metadata.MetadataRequest[0](topics)
-
- @classmethod
- def decode_metadata_response(cls, response):
- return response
-
- @classmethod
- def encode_consumer_metadata_request(cls, client_id, correlation_id, payloads):
- """
- Encode a ConsumerMetadataRequest
-
- Arguments:
- client_id: string
- correlation_id: int
- payloads: string (consumer group)
- """
- message = []
- message.append(cls._encode_message_header(client_id, correlation_id,
- KafkaProtocol.CONSUMER_METADATA_KEY))
- message.append(struct.pack('>h%ds' % len(payloads), len(payloads), payloads))
-
- msg = b''.join(message)
- return write_int_string(msg)
-
- @classmethod
- def decode_consumer_metadata_response(cls, data):
- """
- Decode bytes to a kafka.structs.ConsumerMetadataResponse
-
- Arguments:
- data: bytes to decode
- """
- ((correlation_id, error, nodeId), cur) = relative_unpack('>ihi', data, 0)
- (host, cur) = read_short_string(data, cur)
- ((port,), cur) = relative_unpack('>i', data, cur)
-
- return kafka.structs.ConsumerMetadataResponse(error, nodeId, host, port)
-
- @classmethod
- def encode_offset_commit_request(cls, group, payloads):
- """
- Encode an OffsetCommitRequest struct
-
- Arguments:
- group: string, the consumer group you are committing offsets for
- payloads: list of OffsetCommitRequestPayload
- """
- return kafka.protocol.commit.OffsetCommitRequest[0](
- consumer_group=group,
- topics=[(
- topic,
- [(
- partition,
- payload.offset,
- payload.metadata)
- for partition, payload in six.iteritems(topic_payloads)])
- for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
-
- @classmethod
- def decode_offset_commit_response(cls, response):
- """
- Decode OffsetCommitResponse to an OffsetCommitResponsePayload
-
- Arguments:
- response: OffsetCommitResponse
- """
- return [
- kafka.structs.OffsetCommitResponsePayload(topic, partition, error)
- for topic, partitions in response.topics
- for partition, error in partitions
- ]
-
- @classmethod
- def encode_offset_fetch_request(cls, group, payloads, from_kafka=False):
- """
- Encode an OffsetFetchRequest struct. The request is encoded using
- version 0 if from_kafka is false, indicating a request for Zookeeper
- offsets. It is encoded using version 1 otherwise, indicating a request
- for Kafka offsets.
-
- Arguments:
- group: string, the consumer group you are fetching offsets for
- payloads: list of OffsetFetchRequestPayload
- from_kafka: bool, default False, set True for Kafka-committed offsets
- """
- version = 1 if from_kafka else 0
- return kafka.protocol.commit.OffsetFetchRequest[version](
- consumer_group=group,
- topics=[(
- topic,
- list(topic_payloads.keys()))
- for topic, topic_payloads in six.iteritems(group_by_topic_and_partition(payloads))])
-
- @classmethod
- def decode_offset_fetch_response(cls, response):
- """
- Decode OffsetFetchResponse to OffsetFetchResponsePayloads
-
- Arguments:
- response: OffsetFetchResponse
- """
- return [
- kafka.structs.OffsetFetchResponsePayload(
- topic, partition, offset, metadata, error
- )
- for topic, partitions in response.topics
- for partition, offset, metadata, error in partitions
- ]
-
-
-def create_message(payload, key=None):
- """
- Construct a Message
-
- Arguments:
- payload: bytes, the payload to send to Kafka
- key: bytes, a key used for partition routing (optional)
-
- """
- return kafka.structs.Message(0, 0, key, payload)
-
-
-def create_gzip_message(payloads, key=None, compresslevel=None):
- """
- Construct a Gzipped Message containing multiple Messages
-
- The given payloads will be encoded, compressed, and sent as a single atomic
- message to Kafka.
-
- Arguments:
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
-
- """
- message_set = KafkaProtocol._encode_message_set(
- [create_message(payload, pl_key) for payload, pl_key in payloads])
-
- gzipped = gzip_encode(message_set, compresslevel=compresslevel)
- codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
-
- return kafka.structs.Message(0, 0x00 | codec, key, gzipped)
-
-
-def create_snappy_message(payloads, key=None):
- """
- Construct a Snappy Message containing multiple Messages
-
- The given payloads will be encoded, compressed, and sent as a single atomic
- message to Kafka.
-
- Arguments:
- payloads: list(bytes), a list of payload to send be sent to Kafka
- key: bytes, a key used for partition routing (optional)
-
- """
- message_set = KafkaProtocol._encode_message_set(
- [create_message(payload, pl_key) for payload, pl_key in payloads])
-
- snapped = snappy_encode(message_set)
- codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
-
- return kafka.structs.Message(0, 0x00 | codec, key, snapped)
-
-
-def create_message_set(messages, codec=CODEC_NONE, key=None, compresslevel=None):
- """Create a message set using the given codec.
-
- If codec is CODEC_NONE, return a list of raw Kafka messages. Otherwise,
- return a list containing a single codec-encoded message.
- """
- if codec == CODEC_NONE:
- return [create_message(m, k) for m, k in messages]
- elif codec == CODEC_GZIP:
- return [create_gzip_message(messages, key, compresslevel)]
- elif codec == CODEC_SNAPPY:
- return [create_snappy_message(messages, key)]
- else:
- raise UnsupportedCodecError("Codec 0x%02x unsupported" % (codec,))
diff --git a/kafka/structs.py b/kafka/structs.py
index baacbcd..9ab4f8b 100644
--- a/kafka/structs.py
+++ b/kafka/structs.py
@@ -3,64 +3,6 @@ from __future__ import absolute_import
from collections import namedtuple
-# SimpleClient Payload Structs - Deprecated
-
-# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI
-MetadataRequest = namedtuple("MetadataRequest",
- ["topics"])
-
-MetadataResponse = namedtuple("MetadataResponse",
- ["brokers", "topics"])
-
-# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ConsumerMetadataRequest
-ConsumerMetadataRequest = namedtuple("ConsumerMetadataRequest",
- ["groups"])
-
-ConsumerMetadataResponse = namedtuple("ConsumerMetadataResponse",
- ["error", "nodeId", "host", "port"])
-
-# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
-ProduceRequestPayload = namedtuple("ProduceRequestPayload",
- ["topic", "partition", "messages"])
-
-ProduceResponsePayload = namedtuple("ProduceResponsePayload",
- ["topic", "partition", "error", "offset"])
-
-# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchAPI
-FetchRequestPayload = namedtuple("FetchRequestPayload",
- ["topic", "partition", "offset", "max_bytes"])
-
-FetchResponsePayload = namedtuple("FetchResponsePayload",
- ["topic", "partition", "error", "highwaterMark", "messages"])
-
-# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI
-OffsetRequestPayload = namedtuple("OffsetRequestPayload",
- ["topic", "partition", "time", "max_offsets"])
-
-ListOffsetRequestPayload = namedtuple("ListOffsetRequestPayload",
- ["topic", "partition", "time"])
-
-OffsetResponsePayload = namedtuple("OffsetResponsePayload",
- ["topic", "partition", "error", "offsets"])
-
-ListOffsetResponsePayload = namedtuple("ListOffsetResponsePayload",
- ["topic", "partition", "error", "timestamp", "offset"])
-
-# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
-OffsetCommitRequestPayload = namedtuple("OffsetCommitRequestPayload",
- ["topic", "partition", "offset", "metadata"])
-
-OffsetCommitResponsePayload = namedtuple("OffsetCommitResponsePayload",
- ["topic", "partition", "error"])
-
-OffsetFetchRequestPayload = namedtuple("OffsetFetchRequestPayload",
- ["topic", "partition"])
-
-OffsetFetchResponsePayload = namedtuple("OffsetFetchResponsePayload",
- ["topic", "partition", "offset", "metadata", "error"])
-
-
-
# Other useful structs
TopicPartition = namedtuple("TopicPartition",
["topic", "partition"])
@@ -79,17 +21,6 @@ OffsetAndTimestamp = namedtuple("OffsetAndTimestamp",
["offset", "timestamp"])
-# Deprecated structs
-OffsetAndMessage = namedtuple("OffsetAndMessage",
- ["offset", "message"])
-
-Message = namedtuple("Message",
- ["magic", "attributes", "key", "value"])
-
-KafkaMessage = namedtuple("KafkaMessage",
- ["topic", "partition", "offset", "key", "value"])
-
-
# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
diff --git a/kafka/util.py b/kafka/util.py
index 9354bd9..9f65b81 100644
--- a/kafka/util.py
+++ b/kafka/util.py
@@ -2,15 +2,10 @@ from __future__ import absolute_import
import atexit
import binascii
-import collections
-import struct
-from threading import Thread, Event
import weakref
from kafka.vendor import six
-from kafka.errors import BufferUnderflowError
-
if six.PY3:
MAX_INT = 2 ** 31
@@ -28,109 +23,6 @@ else:
from binascii import crc32
-def write_int_string(s):
- if s is not None and not isinstance(s, six.binary_type):
- raise TypeError('Expected "%s" to be bytes\n'
- 'data=%s' % (type(s), repr(s)))
- if s is None:
- return struct.pack('>i', -1)
- else:
- return struct.pack('>i%ds' % len(s), len(s), s)
-
-
-def read_short_string(data, cur):
- if len(data) < cur + 2:
- raise BufferUnderflowError("Not enough data left")
-
- (strlen,) = struct.unpack('>h', data[cur:cur + 2])
- if strlen == -1:
- return None, cur + 2
-
- cur += 2
- if len(data) < cur + strlen:
- raise BufferUnderflowError("Not enough data left")
-
- out = data[cur:cur + strlen]
- return out, cur + strlen
-
-
-def relative_unpack(fmt, data, cur):
- size = struct.calcsize(fmt)
- if len(data) < cur + size:
- raise BufferUnderflowError("Not enough data left")
-
- out = struct.unpack(fmt, data[cur:cur + size])
- return out, cur + size
-
-
-def group_by_topic_and_partition(tuples):
- out = collections.defaultdict(dict)
- for t in tuples:
- assert t.topic not in out or t.partition not in out[t.topic], \
- 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
- t.topic, t.partition)
- out[t.topic][t.partition] = t
- return out
-
-
-class ReentrantTimer(object):
- """
- A timer that can be restarted, unlike threading.Timer
- (although this uses threading.Timer)
-
- Arguments:
-
- t: timer interval in milliseconds
- fn: a callable to invoke
- args: tuple of args to be passed to function
- kwargs: keyword arguments to be passed to function
- """
- def __init__(self, t, fn, *args, **kwargs):
-
- if t <= 0:
- raise ValueError('Invalid timeout value')
-
- if not callable(fn):
- raise ValueError('fn must be callable')
-
- self.thread = None
- self.t = t / 1000.0
- self.fn = fn
- self.args = args
- self.kwargs = kwargs
- self.active = None
-
- def _timer(self, active):
- # python2.6 Event.wait() always returns None
- # python2.7 and greater returns the flag value (true/false)
- # we want the flag value, so add an 'or' here for python2.6
- # this is redundant for later python versions (FLAG OR FLAG == FLAG)
- while not (active.wait(self.t) or active.is_set()):
- self.fn(*self.args, **self.kwargs)
-
- def start(self):
- if self.thread is not None:
- self.stop()
-
- self.active = Event()
- self.thread = Thread(target=self._timer, args=(self.active,))
- self.thread.daemon = True # So the app exits when main thread exits
- self.thread.start()
-
- def stop(self):
- if self.thread is None:
- return
-
- self.active.set()
- self.thread.join(self.t + 1)
- # noinspection PyAttributeOutsideInit
- self.timer = None
- self.fn = None
-
- def __del__(self):
- self.stop()
-
-
class WeakMethod(object):
"""
Callable that weakly references a method and the object it is bound to. It
diff --git a/setup.py b/setup.py
index 779adb9..8bc484c 100644
--- a/setup.py
+++ b/setup.py
@@ -24,8 +24,6 @@ class Tox(Command):
test_require = ['tox', 'mock']
-if sys.version_info < (2, 7):
- test_require.append('unittest2')
here = os.path.abspath(os.path.dirname(__file__))
diff --git a/test/__init__.py b/test/__init__.py
index 3d2ba3d..71f667d 100644
--- a/test/__init__.py
+++ b/test/__init__.py
@@ -1,12 +1,5 @@
from __future__ import absolute_import
-import sys
-
-if sys.version_info < (2, 7):
- import unittest2 as unittest # pylint: disable=import-error
-else:
- import unittest
-
# Set default logging handler to avoid "No handler found" warnings.
import logging
try: # Python 2.7+
diff --git a/test/conftest.py b/test/conftest.py
index bbe4048..3fa0262 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -43,15 +43,6 @@ def kafka_broker_factory(zookeeper):
@pytest.fixture
-def simple_client(kafka_broker, request, topic):
- """Return a SimpleClient fixture"""
- client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
- client.ensure_topic_exists(topic)
- yield client
- client.close()
-
-
-@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
diff --git a/test/fixtures.py b/test/fixtures.py
index 68572b5..557fca6 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -13,8 +13,7 @@ import py
from kafka.vendor.six.moves import urllib, range
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
-from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient
-from kafka.client_async import KafkaClient
+from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.protocol.admin import CreateTopicsRequest
from kafka.protocol.metadata import MetadataRequest
from test.testutil import env_kafka_version, random_string
@@ -524,7 +523,3 @@ class KafkaFixture(Fixture):
for x in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
yield KafkaProducer(**params)
-
- def get_simple_client(self, **params):
- params.setdefault('client_id', 'simple_client')
- return SimpleClient(self.bootstrap_server(), **params)
diff --git a/test/test_client.py b/test/test_client.py
deleted file mode 100644
index 1c68978..0000000
--- a/test/test_client.py
+++ /dev/null
@@ -1,405 +0,0 @@
-import socket
-
-from mock import ANY, MagicMock, patch
-from operator import itemgetter
-from kafka.vendor import six
-from . import unittest
-
-from kafka import SimpleClient
-from kafka.errors import (
- KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
- UnknownTopicOrPartitionError, FailedPayloadsError)
-from kafka.future import Future
-from kafka.protocol import KafkaProtocol, create_message
-from kafka.protocol.metadata import MetadataResponse
-from kafka.structs import ProduceRequestPayload, BrokerMetadata, TopicPartition
-
-
-NO_ERROR = 0
-UNKNOWN_TOPIC_OR_PARTITION = 3
-NO_LEADER = 5
-
-
-def mock_conn(conn, success=True):
- mocked = MagicMock()
- mocked.connected.return_value = True
- if success:
- mocked.send.return_value = Future().success(True)
- else:
- mocked.send.return_value = Future().failure(Exception())
- conn.return_value = mocked
- conn.recv.return_value = []
-
-
-class TestSimpleClient(unittest.TestCase):
- def test_init_with_list(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
-
- self.assertEqual(
- sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC),
- ('kafka03', 9092, socket.AF_UNSPEC)]),
- sorted(client.hosts))
-
- def test_init_with_csv(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- client = SimpleClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
-
- self.assertEqual(
- sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC),
- ('kafka03', 9092, socket.AF_UNSPEC)]),
- sorted(client.hosts))
-
- def test_init_with_unicode_csv(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- client = SimpleClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
-
- self.assertEqual(
- sorted([('kafka01', 9092, socket.AF_UNSPEC), ('kafka02', 9092, socket.AF_UNSPEC),
- ('kafka03', 9092, socket.AF_UNSPEC)]),
- sorted(client.hosts))
-
- @patch.object(SimpleClient, '_get_conn')
- @patch.object(SimpleClient, 'load_metadata_for_topics')
- def test_send_broker_unaware_request_fail(self, load_metadata, conn):
- mocked_conns = {
- ('kafka01', 9092): MagicMock(),
- ('kafka02', 9092): MagicMock()
- }
- for val in mocked_conns.values():
- mock_conn(val, success=False)
-
- def mock_get_conn(host, port, afi):
- return mocked_conns[(host, port)]
- conn.side_effect = mock_get_conn
-
- client = SimpleClient(hosts=['kafka01:9092', 'kafka02:9092'])
-
- req = KafkaProtocol.encode_metadata_request()
- with self.assertRaises(KafkaUnavailableError):
- client._send_broker_unaware_request(payloads=['fake request'],
- encoder_fn=MagicMock(return_value='fake encoded message'),
- decoder_fn=lambda x: x)
-
- for key, conn in six.iteritems(mocked_conns):
- conn.send.assert_called_with('fake encoded message')
-
- def test_send_broker_unaware_request(self):
- mocked_conns = {
- ('kafka01', 9092): MagicMock(),
- ('kafka02', 9092): MagicMock(),
- ('kafka03', 9092): MagicMock()
- }
- # inject BrokerConnection side effects
- mock_conn(mocked_conns[('kafka01', 9092)], success=False)
- mock_conn(mocked_conns[('kafka03', 9092)], success=False)
- future = Future()
- mocked_conns[('kafka02', 9092)].send.return_value = future
- mocked_conns[('kafka02', 9092)].recv.return_value = [('valid response', future)]
-
- def mock_get_conn(host, port, afi):
- return mocked_conns[(host, port)]
-
- # patch to avoid making requests before we want it
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- with patch.object(SimpleClient, '_get_conn', side_effect=mock_get_conn):
-
- client = SimpleClient(hosts='kafka01:9092,kafka02:9092')
- resp = client._send_broker_unaware_request(payloads=['fake request'],
- encoder_fn=MagicMock(),
- decoder_fn=lambda x: x)
-
- self.assertEqual('valid response', resp)
- mocked_conns[('kafka02', 9092)].recv.assert_called_once_with()
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_load_metadata(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_ERROR, 'topic_1', [
- (NO_ERROR, 0, 1, [1, 2], [1, 2])
- ]),
- (NO_ERROR, 'topic_noleader', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- (NO_LEADER, 'topic_no_partitions', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
- (NO_ERROR, 'topic_3', [
- (NO_ERROR, 0, 0, [0, 1], [0, 1]),
- (NO_ERROR, 1, 1, [1, 0], [1, 0]),
- (NO_ERROR, 2, 0, [0, 1], [0, 1])
- ])
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- # client loads metadata at init
- client = SimpleClient(hosts=['broker_1:4567'])
- self.assertDictEqual({
- TopicPartition('topic_1', 0): brokers[1],
- TopicPartition('topic_noleader', 0): None,
- TopicPartition('topic_noleader', 1): None,
- TopicPartition('topic_3', 0): brokers[0],
- TopicPartition('topic_3', 1): brokers[1],
- TopicPartition('topic_3', 2): brokers[0]},
- client.topics_to_brokers)
-
- # if we ask for metadata explicitly, it should raise errors
- with self.assertRaises(LeaderNotAvailableError):
- client.load_metadata_for_topics('topic_no_partitions')
-
- with self.assertRaises(UnknownTopicOrPartitionError):
- client.load_metadata_for_topics('topic_unknown')
-
- # This should not raise
- client.load_metadata_for_topics('topic_no_leader')
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_has_metadata_for_topic(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_still_creating', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
- (NO_ERROR, 'topic_noleaders', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- # Topics with no partitions return False
- self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
- self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
-
- # Topic with partition metadata, but no leaders return True
- self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol.decode_metadata_response')
- def test_ensure_topic_exists(self, decode_metadata_response, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_still_creating', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
- (NO_ERROR, 'topic_noleaders', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- with self.assertRaises(UnknownTopicOrPartitionError):
- client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
-
- with self.assertRaises(KafkaTimeoutError):
- client.ensure_topic_exists('topic_still_creating', timeout=1)
-
- # This should not raise
- client.ensure_topic_exists('topic_noleaders', timeout=1)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
- "Get leader for partitions reload metadata if it is not available"
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_no_partitions', [])
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- # topic metadata is loaded but empty
- self.assertDictEqual({}, client.topics_to_brokers)
-
- topics = [
- (NO_ERROR, 'topic_one_partition', [
- (NO_ERROR, 0, 0, [0, 1], [0, 1])
- ])
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- # calling _get_leader_for_partition (from any broker aware request)
- # will try loading metadata again for the same topic
- leader = client._get_leader_for_partition('topic_one_partition', 0)
-
- self.assertEqual(brokers[0], leader)
- self.assertDictEqual({
- TopicPartition('topic_one_partition', 0): brokers[0]},
- client.topics_to_brokers)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_get_leader_for_unassigned_partitions(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_LEADER, 'topic_no_partitions', []),
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_unknown', []),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- self.assertDictEqual({}, client.topics_to_brokers)
-
- with self.assertRaises(LeaderNotAvailableError):
- client._get_leader_for_partition('topic_no_partitions', 0)
-
- with self.assertRaises(UnknownTopicOrPartitionError):
- client._get_leader_for_partition('topic_unknown', 0)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_get_leader_exceptions_when_noleader(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_ERROR, 'topic_noleader', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
- self.assertDictEqual(
- {
- TopicPartition('topic_noleader', 0): None,
- TopicPartition('topic_noleader', 1): None
- },
- client.topics_to_brokers)
-
- # No leader partitions -- raise LeaderNotAvailableError
- with self.assertRaises(LeaderNotAvailableError):
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
- with self.assertRaises(LeaderNotAvailableError):
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
-
- # Unknown partitions -- raise UnknownTopicOrPartitionError
- with self.assertRaises(UnknownTopicOrPartitionError):
- self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
-
- topics = [
- (NO_ERROR, 'topic_noleader', [
- (NO_ERROR, 0, 0, [0, 1], [0, 1]),
- (NO_ERROR, 1, 1, [1, 0], [1, 0])
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
- self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
- self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
-
- @patch.object(SimpleClient, '_get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_send_produce_request_raises_when_noleader(self, protocol, conn):
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (NO_ERROR, 'topic_noleader', [
- (NO_LEADER, 0, -1, [], []),
- (NO_LEADER, 1, -1, [], []),
- ]),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- requests = [ProduceRequestPayload(
- "topic_noleader", 0,
- [create_message("a"), create_message("b")])]
-
- with self.assertRaises(FailedPayloadsError):
- client.send_produce_request(requests)
-
- @patch('kafka.SimpleClient._get_conn')
- @patch('kafka.client.KafkaProtocol')
- def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
-
- mock_conn(conn)
-
- brokers = [
- BrokerMetadata(0, 'broker_1', 4567, None),
- BrokerMetadata(1, 'broker_2', 5678, None)
- ]
- resp0_brokers = list(map(itemgetter(0, 1, 2), brokers))
-
- topics = [
- (UNKNOWN_TOPIC_OR_PARTITION, 'topic_doesnt_exist', []),
- ]
- protocol.decode_metadata_response.return_value = MetadataResponse[0](resp0_brokers, topics)
-
- client = SimpleClient(hosts=['broker_1:4567'])
-
- requests = [ProduceRequestPayload(
- "topic_doesnt_exist", 0,
- [create_message("a"), create_message("b")])]
-
- with self.assertRaises(FailedPayloadsError):
- client.send_produce_request(requests)
-
- def test_correlation_rollover(self):
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- big_num = 2**31 - 3
- client = SimpleClient(hosts=(), correlation_id=big_num)
- self.assertEqual(big_num + 1, client._next_id())
- self.assertEqual(big_num + 2, client._next_id())
- self.assertEqual(0, client._next_id())
diff --git a/test/test_client_integration.py b/test/test_client_integration.py
deleted file mode 100644
index a983ce1..0000000
--- a/test/test_client_integration.py
+++ /dev/null
@@ -1,95 +0,0 @@
-import os
-
-import pytest
-
-from kafka.errors import KafkaTimeoutError
-from kafka.protocol import create_message
-from kafka.structs import (
- FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
- ProduceRequestPayload)
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, env_kafka_version
-
-
-class TestKafkaClientIntegration(KafkaIntegrationTestCase):
- @classmethod
- def setUpClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk)
-
- @classmethod
- def tearDownClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server.close()
- cls.zk.close()
-
- def test_consume_none(self):
- fetch = FetchRequestPayload(self.topic, 0, 0, 1024)
-
- fetch_resp, = self.client.send_fetch_request([fetch])
- self.assertEqual(fetch_resp.error, 0)
- self.assertEqual(fetch_resp.topic, self.topic)
- self.assertEqual(fetch_resp.partition, 0)
-
- messages = list(fetch_resp.messages)
- self.assertEqual(len(messages), 0)
-
- def test_ensure_topic_exists(self):
-
- # assume that self.topic was created by setUp
- # if so, this should succeed
- self.client.ensure_topic_exists(self.topic, timeout=1)
-
- # ensure_topic_exists should fail with KafkaTimeoutError
- with self.assertRaises(KafkaTimeoutError):
- self.client.ensure_topic_exists('this_topic_doesnt_exist', timeout=0)
-
- def test_send_produce_request_maintains_request_response_order(self):
-
- self.client.ensure_topic_exists('foo')
- self.client.ensure_topic_exists('bar')
-
- requests = [
- ProduceRequestPayload(
- 'foo', 0,
- [create_message(b'a'), create_message(b'b')]),
- ProduceRequestPayload(
- 'bar', 1,
- [create_message(b'a'), create_message(b'b')]),
- ProduceRequestPayload(
- 'foo', 1,
- [create_message(b'a'), create_message(b'b')]),
- ProduceRequestPayload(
- 'bar', 0,
- [create_message(b'a'), create_message(b'b')]),
- ]
-
- responses = self.client.send_produce_request(requests)
- while len(responses):
- request = requests.pop()
- response = responses.pop()
- self.assertEqual(request.topic, response.topic)
- self.assertEqual(request.partition, response.partition)
-
-
- ####################
- # Offset Tests #
- ####################
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_commit_fetch_offsets(self):
- req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
- (resp,) = self.client.send_offset_commit_request('group', [req])
- self.assertEqual(resp.error, 0)
-
- req = OffsetFetchRequestPayload(self.topic, 0)
- (resp,) = self.client.send_offset_fetch_request('group', [req])
- self.assertEqual(resp.error, 0)
- self.assertEqual(resp.offset, 42)
- self.assertEqual(resp.metadata, '') # Metadata isn't stored for now
diff --git a/test/test_consumer.py b/test/test_consumer.py
index edcc2d8..436fe55 100644
--- a/test/test_consumer.py
+++ b/test/test_consumer.py
@@ -1,15 +1,7 @@
-import sys
-
-from mock import MagicMock, patch
-from . import unittest
import pytest
-from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
-from kafka.errors import (
- FailedPayloadsError, KafkaConfigurationError, NotLeaderForPartitionError,
- UnknownTopicOrPartitionError)
-from kafka.structs import (
- FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)
+from kafka import KafkaConsumer
+from kafka.errors import KafkaConfigurationError
class TestKafkaConsumer:
@@ -32,126 +24,3 @@ class TestKafkaConsumer:
assert sub == set(['foo'])
sub.add('fizz')
assert consumer.subscription() == set(['foo'])
-
-
-class TestMultiProcessConsumer(unittest.TestCase):
- @unittest.skipIf(sys.platform.startswith('win'), 'test mocking fails on windows')
- def test_partition_list(self):
- client = MagicMock()
- partitions = (0,)
- with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
- MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
- self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
- self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
-
-
-class TestSimpleConsumer(unittest.TestCase):
- def test_non_integer_partitions(self):
- with self.assertRaises(AssertionError):
- SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])
-
- def test_simple_consumer_failed_payloads(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- def failed_payloads(payload):
- return FailedPayloadsError(payload)
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
-
- # This should not raise an exception
- consumer.get_messages(5)
-
- def test_simple_consumer_leader_change(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock so that only the first request gets a valid response
- def not_leader(request):
- return FetchResponsePayload(request.topic, request.partition,
- NotLeaderForPartitionError.errno, -1, ())
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
-
- # This should not raise an exception
- consumer.get_messages(20)
-
- # client should have updated metadata
- self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
- self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
-
- def test_simple_consumer_unknown_topic_partition(self):
- client = MagicMock()
- consumer = SimpleConsumer(client, group=None,
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock so that only the first request gets a valid response
- def unknown_topic_partition(request):
- return FetchResponsePayload(request.topic, request.partition,
- UnknownTopicOrPartitionError.errno, -1, ())
-
- client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
-
- # This should not raise an exception
- with self.assertRaises(UnknownTopicOrPartitionError):
- consumer.get_messages(20)
-
- def test_simple_consumer_commit_does_not_raise(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
-
- def mock_offset_fetch_request(group, payloads, **kwargs):
- return [OffsetFetchResponsePayload(p.topic, p.partition, 0, b'', 0) for p in payloads]
-
- client.send_offset_fetch_request.side_effect = mock_offset_fetch_request
-
- def mock_offset_commit_request(group, payloads, **kwargs):
- raise FailedPayloadsError(payloads[0])
-
- client.send_offset_commit_request.side_effect = mock_offset_commit_request
-
- consumer = SimpleConsumer(client, group='foobar',
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # Mock internal commit check
- consumer.count_since_commit = 10
-
- # This should not raise an exception
- self.assertFalse(consumer.commit(partitions=[0, 1]))
-
- def test_simple_consumer_reset_partition_offset(self):
- client = MagicMock()
-
- def mock_offset_request(payloads, **kwargs):
- raise FailedPayloadsError(payloads[0])
-
- client.send_offset_request.side_effect = mock_offset_request
-
- consumer = SimpleConsumer(client, group='foobar',
- topic='topic', partitions=[0, 1],
- auto_commit=False)
-
- # This should not raise an exception
- self.assertEqual(consumer.reset_partition_offset(0), None)
-
- @staticmethod
- def fail_requests_factory(error_factory):
- # Mock so that only the first request gets a valid response
- def fail_requests(payloads, **kwargs):
- responses = [
- FetchResponsePayload(payloads[0].topic, payloads[0].partition, 0, 0,
- [OffsetAndMessage(
- payloads[0].offset + i,
- "msg %d" % (payloads[0].offset + i))
- for i in range(10)]),
- ]
- for failure in payloads[1:]:
- responses.append(error_factory(failure))
- return responses
- return fail_requests
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index d6fd41c..6e6bc94 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -1,29 +1,17 @@
import logging
-import os
import time
from mock import patch
import pytest
from kafka.vendor.six.moves import range
-from . import unittest
-from kafka import (
- KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
- create_gzip_message, KafkaProducer
-)
import kafka.codec
-from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import (
- ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
- KafkaTimeoutError, UnsupportedCodecError
-)
-from kafka.protocol.message import PartialMessage
-from kafka.structs import (
- ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
+ KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError
)
+from kafka.structs import TopicPartition, OffsetAndTimestamp
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, Timer, assert_message_count, env_kafka_version, random_string
+from test.testutil import Timer, assert_message_count, env_kafka_version, random_string
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
@@ -63,486 +51,6 @@ def test_kafka_consumer_unsupported_encoding(
consumer.poll(timeout_ms=2000)
-class TestConsumerIntegration(KafkaIntegrationTestCase):
- maxDiff = None
-
- @classmethod
- def setUpClass(cls):
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- chroot = random_string(10)
- cls.server1 = KafkaFixture.instance(0, cls.zk,
- zk_chroot=chroot)
- cls.server2 = KafkaFixture.instance(1, cls.zk,
- zk_chroot=chroot)
-
- cls.server = cls.server1 # Bootstrapping server
-
- @classmethod
- def tearDownClass(cls):
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server1.close()
- cls.server2.close()
- cls.zk.close()
-
- def send_messages(self, partition, messages):
- messages = [ create_message(self.msg(str(msg))) for msg in messages ]
- produce = ProduceRequestPayload(self.topic, partition, messages = messages)
- resp, = self.client.send_produce_request([produce])
- self.assertEqual(resp.error, 0)
-
- return [ x.value for x in messages ]
-
- def send_gzip_message(self, partition, messages):
- message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
- produce = ProduceRequestPayload(self.topic, partition, messages = [message])
- resp, = self.client.send_produce_request([produce])
- self.assertEqual(resp.error, 0)
-
- def assert_message_count(self, messages, num_messages):
- # Make sure we got them all
- self.assertEqual(len(messages), num_messages)
-
- # Make sure there are no duplicates
- self.assertEqual(len(set(messages)), num_messages)
-
- def consumer(self, **kwargs):
- if os.environ['KAFKA_VERSION'] == "0.8.0":
- # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
- kwargs['group'] = None
- kwargs['auto_commit'] = False
- else:
- kwargs.setdefault('group', None)
- kwargs.setdefault('auto_commit', False)
-
- consumer_class = kwargs.pop('consumer', SimpleConsumer)
- group = kwargs.pop('group', None)
- topic = kwargs.pop('topic', self.topic)
-
- if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
- kwargs.setdefault('iter_timeout', 0)
-
- return consumer_class(self.client, group, topic, **kwargs)
-
- def kafka_consumer(self, **configs):
- brokers = '%s:%d' % (self.server.host, self.server.port)
- consumer = KafkaConsumer(self.topic,
- bootstrap_servers=brokers,
- **configs)
- return consumer
-
- def kafka_producer(self, **configs):
- brokers = '%s:%d' % (self.server.host, self.server.port)
- producer = KafkaProducer(
- bootstrap_servers=brokers, **configs)
- return producer
-
- def test_simple_consumer(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer = self.consumer()
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- def test_simple_consumer_gzip(self):
- self.send_gzip_message(0, range(0, 100))
- self.send_gzip_message(1, range(100, 200))
-
- # Start a consumer
- consumer = self.consumer()
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- def test_simple_consumer_smallest_offset_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer(auto_offset_reset='smallest')
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- # Since auto_offset_reset is set to smallest we should read all 200
- # messages from beginning.
- self.assert_message_count([message for message in consumer], 200)
-
- def test_simple_consumer_largest_offset_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Default largest
- consumer = self.consumer()
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- # Since auto_offset_reset is set to largest we should not read any
- # messages.
- self.assert_message_count([message for message in consumer], 0)
- # Send 200 new messages to the queue
- self.send_messages(0, range(200, 300))
- self.send_messages(1, range(300, 400))
- # Since the offset is set to largest we should read all the new messages.
- self.assert_message_count([message for message in consumer], 200)
-
- def test_simple_consumer_no_reset(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Default largest
- consumer = self.consumer(auto_offset_reset=None)
- # Move fetch offset ahead of 300 message (out of range)
- consumer.seek(300, 2)
- with self.assertRaises(OffsetOutOfRangeError):
- consumer.get_message()
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_simple_consumer_load_initial_offsets(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Create 1st consumer and change offsets
- consumer = self.consumer(group='test_simple_consumer_load_initial_offsets')
- self.assertEqual(consumer.offsets, {0: 0, 1: 0})
- consumer.offsets.update({0:51, 1:101})
- # Update counter after manual offsets update
- consumer.count_since_commit += 1
- consumer.commit()
-
- # Create 2nd consumer and check initial offsets
- consumer = self.consumer(group='test_simple_consumer_load_initial_offsets',
- auto_commit=False)
- self.assertEqual(consumer.offsets, {0: 51, 1: 101})
-
- def test_simple_consumer__seek(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer()
-
- # Rewind 10 messages from the end
- consumer.seek(-10, 2)
- self.assert_message_count([ message for message in consumer ], 10)
-
- # Rewind 13 messages from the end
- consumer.seek(-13, 2)
- self.assert_message_count([ message for message in consumer ], 13)
-
- # Set absolute offset
- consumer.seek(100)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(100, partition=0)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(101, partition=1)
- self.assert_message_count([ message for message in consumer ], 0)
- consumer.seek(90, partition=0)
- self.assert_message_count([ message for message in consumer ], 10)
- consumer.seek(20, partition=1)
- self.assert_message_count([ message for message in consumer ], 80)
- consumer.seek(0, partition=1)
- self.assert_message_count([ message for message in consumer ], 100)
-
- consumer.stop()
-
- @pytest.mark.skipif(env_kafka_version() >= (2, 0),
- reason="SimpleConsumer blocking does not handle PartialMessage change in kafka 2.0+")
- def test_simple_consumer_blocking(self):
- consumer = self.consumer()
-
- # Ask for 5 messages, nothing in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=1)
- self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 1)
-
- self.send_messages(0, range(0, 5))
- self.send_messages(1, range(5, 10))
-
- # Ask for 5 messages, 10 in queue. Get 5 back, no blocking
- with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=3)
- self.assert_message_count(messages, 5)
- self.assertLess(t.interval, 3)
-
- # Ask for 10 messages, get 5 back, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
- # second, get 5 back, no blocking
- self.send_messages(0, range(0, 3))
- self.send_messages(1, range(3, 5))
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=1, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
-
- consumer.stop()
-
- def test_simple_consumer_pending(self):
- # make sure that we start with no pending messages
- consumer = self.consumer()
- self.assertEquals(consumer.pending(), 0)
- self.assertEquals(consumer.pending(partitions=[0]), 0)
- self.assertEquals(consumer.pending(partitions=[1]), 0)
-
- # Produce 10 messages to partitions 0 and 1
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- consumer = self.consumer()
-
- self.assertEqual(consumer.pending(), 20)
- self.assertEqual(consumer.pending(partitions=[0]), 10)
- self.assertEqual(consumer.pending(partitions=[1]), 10)
-
- # move to last message, so one partition should have 1 pending
- # message and other 0
- consumer.seek(-1, 2)
- self.assertEqual(consumer.pending(), 1)
-
- pending_part1 = consumer.pending(partitions=[0])
- pending_part2 = consumer.pending(partitions=[1])
- self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_process_consumer(self):
- # Produce 100 messages to partitions 0 and 1
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- consumer = self.consumer(consumer = MultiProcessConsumer)
-
- self.assert_message_count([ message for message in consumer ], 200)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_process_consumer_blocking(self):
- consumer = self.consumer(consumer = MultiProcessConsumer)
-
- # Ask for 5 messages, No messages in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=1)
- self.assert_message_count(messages, 0)
-
- self.assertGreaterEqual(t.interval, 1)
-
- # Send 10 messages
- self.send_messages(0, range(0, 10))
-
- # Ask for 5 messages, 10 messages in queue, block 0 seconds
- with Timer() as t:
- messages = consumer.get_messages(count=5, block=True, timeout=5)
- self.assert_message_count(messages, 5)
- self.assertLessEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, block 1 second
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=1)
- self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 1)
-
- # Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
- # second, get at least one back, no blocking
- self.send_messages(0, range(0, 5))
- with Timer() as t:
- messages = consumer.get_messages(count=10, block=1, timeout=1)
- received_message_count = len(messages)
- self.assertGreaterEqual(received_message_count, 1)
- self.assert_message_count(messages, received_message_count)
- self.assertLessEqual(t.interval, 1)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- def test_multi_proc_pending(self):
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- # set group to None and auto_commit to False to avoid interactions w/
- # offset commit/fetch apis
- consumer = MultiProcessConsumer(self.client, None, self.topic,
- auto_commit=False, iter_timeout=0)
-
- self.assertEqual(consumer.pending(), 20)
- self.assertEqual(consumer.pending(partitions=[0]), 10)
- self.assertEqual(consumer.pending(partitions=[1]), 10)
-
- consumer.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_multi_process_consumer_load_initial_offsets(self):
- self.send_messages(0, range(0, 10))
- self.send_messages(1, range(10, 20))
-
- # Create 1st consumer and change offsets
- consumer = self.consumer(group='test_multi_process_consumer_load_initial_offsets')
- self.assertEqual(consumer.offsets, {0: 0, 1: 0})
- consumer.offsets.update({0:5, 1:15})
- # Update counter after manual offsets update
- consumer.count_since_commit += 1
- consumer.commit()
-
- # Create 2nd consumer and check initial offsets
- consumer = self.consumer(consumer = MultiProcessConsumer,
- group='test_multi_process_consumer_load_initial_offsets',
- auto_commit=False)
- self.assertEqual(consumer.offsets, {0: 5, 1: 15})
-
- def test_large_messages(self):
- # Produce 10 "normal" size messages
- small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
-
- # Produce 10 messages that are large (bigger than default fetch size)
- large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
-
- # Brokers prior to 0.11 will return the next message
- # if it is smaller than max_bytes (called buffer_size in SimpleConsumer)
- # Brokers 0.11 and later that store messages in v2 format
- # internally will return the next message only if the
- # full MessageSet is smaller than max_bytes.
- # For that reason, we set the max buffer size to a little more
- # than the size of all large messages combined
- consumer = self.consumer(max_buffer_size=60000)
-
- expected_messages = set(small_messages + large_messages)
- actual_messages = set([x.message.value for x in consumer
- if not isinstance(x.message, PartialMessage)])
- self.assertEqual(expected_messages, actual_messages)
-
- consumer.stop()
-
- def test_huge_messages(self):
- huge_message, = self.send_messages(0, [
- create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
- ])
-
- # Create a consumer with the default buffer size
- consumer = self.consumer()
-
- # This consumer fails to get the message
- with self.assertRaises(ConsumerFetchSizeTooSmall):
- consumer.get_message(False, 0.1)
-
- consumer.stop()
-
- # Create a consumer with no fetch size limit
- big_consumer = self.consumer(
- max_buffer_size = None,
- partitions = [0],
- )
-
- # Seek to the last message
- big_consumer.seek(-1, 2)
-
- # Consume giant message successfully
- message = big_consumer.get_message(block=False, timeout=10)
- self.assertIsNotNone(message)
- self.assertEqual(message.message.value, huge_message)
-
- big_consumer.stop()
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_offset_behavior__resuming_behavior(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.consumer(
- group='test_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # Grab the first 195 messages
- output_msgs1 = [ consumer1.get_message().message.value for _ in range(195) ]
- self.assert_message_count(output_msgs1, 195)
-
- # The total offset across both partitions should be at 180
- consumer2 = self.consumer(
- group='test_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # 181-200
- self.assert_message_count([ message for message in consumer2 ], 20)
-
- consumer1.stop()
- consumer2.stop()
-
- @unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_multi_process_offset_behavior__resuming_behavior(self):
- self.send_messages(0, range(0, 100))
- self.send_messages(1, range(100, 200))
-
- # Start a consumer
- consumer1 = self.consumer(
- consumer=MultiProcessConsumer,
- group='test_multi_process_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # Grab the first 195 messages
- output_msgs1 = []
- idx = 0
- for message in consumer1:
- output_msgs1.append(message.message.value)
- idx += 1
- if idx >= 195:
- break
- self.assert_message_count(output_msgs1, 195)
-
- # The total offset across both partitions should be at 180
- consumer2 = self.consumer(
- consumer=MultiProcessConsumer,
- group='test_multi_process_offset_behavior__resuming_behavior',
- auto_commit=True,
- auto_commit_every_t = None,
- auto_commit_every_n = 20,
- )
-
- # 181-200
- self.assert_message_count([ message for message in consumer2 ], 20)
-
- consumer1.stop()
- consumer2.stop()
-
- # TODO: Make this a unit test -- should not require integration
- def test_fetch_buffer_size(self):
-
- # Test parameters (see issue 135 / PR 136)
- TEST_MESSAGE_SIZE=1048
- INIT_BUFFER_SIZE=1024
- MAX_BUFFER_SIZE=2048
- assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
- assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
- assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE
-
- self.send_messages(0, [ "x" * 1048 ])
- self.send_messages(1, [ "x" * 1048 ])
-
- consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
- messages = [ message for message in consumer ]
- self.assertEqual(len(messages), 2)
-
-
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer__blocking(kafka_consumer_factory, topic, send_messages):
TIMEOUT_MS = 500
diff --git a/test/test_context.py b/test/test_context.py
deleted file mode 100644
index 3d41ba6..0000000
--- a/test/test_context.py
+++ /dev/null
@@ -1,117 +0,0 @@
-"""
-OffsetCommitContext tests.
-"""
-from . import unittest
-
-from mock import MagicMock, patch
-
-from kafka.context import OffsetCommitContext
-from kafka.errors import OffsetOutOfRangeError
-
-
-class TestOffsetCommitContext(unittest.TestCase):
- """
- OffsetCommitContext tests.
- """
-
- def setUp(self):
- self.client = MagicMock()
- self.consumer = MagicMock()
- self.topic = "topic"
- self.group = "group"
- self.partition = 0
- self.consumer.topic = self.topic
- self.consumer.group = self.group
- self.consumer.client = self.client
- self.consumer.offsets = {self.partition: 0}
- self.context = OffsetCommitContext(self.consumer)
-
- def test_noop(self):
- """
- Should revert consumer after context exit with no mark() call.
- """
- with self.context:
- # advance offset
- self.consumer.offsets = {self.partition: 1}
-
- # offset restored
- self.assertEqual(self.consumer.offsets, {self.partition: 0})
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_mark(self):
- """
- Should remain at marked location ater context exit.
- """
- with self.context as context:
- context.mark(self.partition, 0)
- # advance offset
- self.consumer.offsets = {self.partition: 1}
-
- # offset sent to client
- self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
-
- # offset remains advanced
- self.assertEqual(self.consumer.offsets, {self.partition: 1})
-
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_mark_multiple(self):
- """
- Should remain at highest marked location after context exit.
- """
- with self.context as context:
- context.mark(self.partition, 0)
- context.mark(self.partition, 1)
- context.mark(self.partition, 2)
- # advance offset
- self.consumer.offsets = {self.partition: 3}
-
- # offset sent to client
- self.assertEqual(self.client.send_offset_commit_request.call_count, 1)
-
- # offset remains advanced
- self.assertEqual(self.consumer.offsets, {self.partition: 3})
-
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_rollback(self):
- """
- Should rollback to initial offsets on context exit with exception.
- """
- with self.assertRaises(Exception):
- with self.context as context:
- context.mark(self.partition, 0)
- # advance offset
- self.consumer.offsets = {self.partition: 1}
-
- raise Exception("Intentional failure")
-
- # offset rolled back (ignoring mark)
- self.assertEqual(self.consumer.offsets, {self.partition: 0})
-
- # and seek called with relative zero delta
- self.assertEqual(self.consumer.seek.call_count, 1)
- self.assertEqual(self.consumer.seek.call_args[0], (0, 1))
-
- def test_out_of_range(self):
- """
- Should reset to beginning of valid offsets on `OffsetOutOfRangeError`
- """
- def _seek(offset, whence):
- # seek must be called with 0, 0 to find the beginning of the range
- self.assertEqual(offset, 0)
- self.assertEqual(whence, 0)
- # set offsets to something different
- self.consumer.offsets = {self.partition: 100}
-
- with patch.object(self.consumer, "seek", _seek):
- with self.context:
- raise OffsetOutOfRangeError()
-
- self.assertEqual(self.consumer.offsets, {self.partition: 100})
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
deleted file mode 100644
index ad7dcb9..0000000
--- a/test/test_failover_integration.py
+++ /dev/null
@@ -1,240 +0,0 @@
-import logging
-import os
-import time
-
-from kafka import SimpleClient, SimpleConsumer, KeyedProducer
-from kafka.errors import (
- FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError)
-from kafka.producer.base import Producer
-from kafka.structs import TopicPartition
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, random_string
-
-
-log = logging.getLogger(__name__)
-
-
-class TestFailover(KafkaIntegrationTestCase):
- create_client = False
-
- def setUp(self):
- if not os.environ.get('KAFKA_VERSION'):
- self.skipTest('integration test requires KAFKA_VERSION')
-
- zk_chroot = random_string(10)
- replicas = 3
- partitions = 3
-
- # mini zookeeper, 3 kafka brokers
- self.zk = ZookeeperFixture.instance()
- kk_kwargs = {'zk_chroot': zk_chroot, 'replicas': replicas,
- 'partitions': partitions}
- self.brokers = [KafkaFixture.instance(i, self.zk, **kk_kwargs)
- for i in range(replicas)]
-
- hosts = ['%s:%d' % (b.host, b.port) for b in self.brokers]
- self.client = SimpleClient(hosts, timeout=2)
- super(TestFailover, self).setUp()
-
- def tearDown(self):
- super(TestFailover, self).tearDown()
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- self.client.close()
- for broker in self.brokers:
- broker.close()
- self.zk.close()
-
- def test_switch_leader(self):
- topic = self.topic
- partition = 0
-
- # Testing the base Producer class here so that we can easily send
- # messages to a specific partition, kill the leader for that partition
- # and check that after another broker takes leadership the producer
- # is able to resume sending messages
-
- # require that the server commit messages to all in-sync replicas
- # so that failover doesn't lose any messages on server-side
- # and we can assert that server-side message count equals client-side
- producer = Producer(self.client, async_send=False,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
-
- # Send 100 random messages to a specific partition
- self._send_random_messages(producer, topic, partition, 100)
-
- # kill leader for partition
- self._kill_leader(topic, partition)
-
- # expect failure, but don't wait more than 60 secs to recover
- recovered = False
- started = time.time()
- timeout = 60
- while not recovered and (time.time() - started) < timeout:
- try:
- log.debug("attempting to send 'success' message after leader killed")
- producer.send_messages(topic, partition, b'success')
- log.debug("success!")
- recovered = True
- except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError):
- log.debug("caught exception sending message -- will retry")
- continue
-
- # Verify we successfully sent the message
- self.assertTrue(recovered)
-
- # send some more messages to new leader
- self._send_random_messages(producer, topic, partition, 100)
-
- # count number of messages
- # Should be equal to 100 before + 1 recovery + 100 after
- # at_least=True because exactly once delivery isn't really a thing
- self.assert_message_count(topic, 201, partitions=(partition,),
- at_least=True)
-
- def test_switch_leader_async(self):
- topic = self.topic
- partition = 0
-
- # Test the base class Producer -- send_messages to a specific partition
- producer = Producer(self.client, async_send=True,
- batch_send_every_n=15,
- batch_send_every_t=3,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
- async_log_messages_on_error=False)
-
- # Send 10 random messages
- self._send_random_messages(producer, topic, partition, 10)
- self._send_random_messages(producer, topic, partition + 1, 10)
-
- # kill leader for partition
- self._kill_leader(topic, partition)
-
- log.debug("attempting to send 'success' message after leader killed")
-
- # in async mode, this should return immediately
- producer.send_messages(topic, partition, b'success')
- producer.send_messages(topic, partition + 1, b'success')
-
- # send to new leader
- self._send_random_messages(producer, topic, partition, 10)
- self._send_random_messages(producer, topic, partition + 1, 10)
-
- # Stop the producer and wait for it to shutdown
- producer.stop()
- started = time.time()
- timeout = 60
- while (time.time() - started) < timeout:
- if not producer.thread.is_alive():
- break
- time.sleep(0.1)
- else:
- self.fail('timeout waiting for producer queue to empty')
-
- # count number of messages
- # Should be equal to 10 before + 1 recovery + 10 after
- # at_least=True because exactly once delivery isn't really a thing
- self.assert_message_count(topic, 21, partitions=(partition,),
- at_least=True)
- self.assert_message_count(topic, 21, partitions=(partition + 1,),
- at_least=True)
-
- def test_switch_leader_keyed_producer(self):
- topic = self.topic
-
- producer = KeyedProducer(self.client, async_send=False)
-
- # Send 10 random messages
- for _ in range(10):
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
-
- # kill leader for partition 0
- self._kill_leader(topic, 0)
-
- recovered = False
- started = time.time()
- timeout = 60
- while not recovered and (time.time() - started) < timeout:
- try:
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
- if producer.partitioners[topic].partition(key) == 0:
- recovered = True
- except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
- NotLeaderForPartitionError):
- log.debug("caught exception sending message -- will retry")
- continue
-
- # Verify we successfully sent the message
- self.assertTrue(recovered)
-
- # send some more messages just to make sure no more exceptions
- for _ in range(10):
- key = random_string(3).encode('utf-8')
- msg = random_string(10).encode('utf-8')
- producer.send_messages(topic, key, msg)
-
- def test_switch_leader_simple_consumer(self):
- producer = Producer(self.client, async_send=False)
- consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
- self._send_random_messages(producer, self.topic, 0, 2)
- consumer.get_messages()
- self._kill_leader(self.topic, 0)
- consumer.get_messages()
-
- def _send_random_messages(self, producer, topic, partition, n):
- for j in range(n):
- msg = 'msg {0}: {1}'.format(j, random_string(10))
- log.debug('_send_random_message %s to %s:%d', msg, topic, partition)
- while True:
- try:
- producer.send_messages(topic, partition, msg.encode('utf-8'))
- except Exception:
- log.exception('failure in _send_random_messages - retrying')
- continue
- else:
- break
-
- def _kill_leader(self, topic, partition):
- leader = self.client.topics_to_brokers[TopicPartition(topic, partition)]
- broker = self.brokers[leader.nodeId]
- broker.close()
- return broker
-
- def assert_message_count(self, topic, check_count, timeout=10,
- partitions=None, at_least=False):
- hosts = ','.join(['%s:%d' % (broker.host, broker.port)
- for broker in self.brokers])
-
- client = SimpleClient(hosts, timeout=2)
- consumer = SimpleConsumer(client, None, topic,
- partitions=partitions,
- auto_commit=False,
- iter_timeout=timeout)
-
- started_at = time.time()
- pending = -1
- while pending < check_count and (time.time() - started_at < timeout):
- try:
- pending = consumer.pending(partitions)
- except FailedPayloadsError:
- pass
- time.sleep(0.5)
-
- consumer.stop()
- client.close()
-
- if pending < check_count:
- self.fail('Too few pending messages: found %d, expected %d' %
- (pending, check_count))
- elif pending > check_count and not at_least:
- self.fail('Too many pending messages: found %d, expected %d' %
- (pending, check_count))
- return True
diff --git a/test/test_package.py b/test/test_package.py
index e520f3f..aa42c9c 100644
--- a/test/test_package.py
+++ b/test/test_package.py
@@ -6,20 +6,20 @@ class TestPackage:
assert kafka1.codec.__name__ == "kafka.codec"
def test_submodule_namespace(self):
- import kafka.client as client1
- assert client1.__name__ == "kafka.client"
+ import kafka.client_async as client1
+ assert client1.__name__ == "kafka.client_async"
- from kafka import client as client2
- assert client2.__name__ == "kafka.client"
+ from kafka import client_async as client2
+ assert client2.__name__ == "kafka.client_async"
- from kafka.client import SimpleClient as SimpleClient1
- assert SimpleClient1.__name__ == "SimpleClient"
+ from kafka.client_async import KafkaClient as KafkaClient1
+ assert KafkaClient1.__name__ == "KafkaClient"
+
+ from kafka import KafkaClient as KafkaClient2
+ assert KafkaClient2.__name__ == "KafkaClient"
from kafka.codec import gzip_encode as gzip_encode1
assert gzip_encode1.__name__ == "gzip_encode"
- from kafka import SimpleClient as SimpleClient2
- assert SimpleClient2.__name__ == "SimpleClient"
-
from kafka.codec import snappy_encode
assert snappy_encode.__name__ == "snappy_encode"
diff --git a/test/test_partitioner.py b/test/test_partitioner.py
index 3a5264b..853fbf6 100644
--- a/test/test_partitioner.py
+++ b/test/test_partitioner.py
@@ -2,8 +2,7 @@ from __future__ import absolute_import
import pytest
-from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner
-from kafka.partitioner.hashed import murmur2
+from kafka.partitioner import DefaultPartitioner, murmur2
def test_default_partitioner():
@@ -22,45 +21,15 @@ def test_default_partitioner():
assert partitioner(None, all_partitions, []) in all_partitions
-def test_roundrobin_partitioner():
- partitioner = RoundRobinPartitioner()
- all_partitions = available = list(range(100))
- # partitioner should cycle between partitions
- i = 0
- max_partition = all_partitions[len(all_partitions) - 1]
- while i <= max_partition:
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
- i = 0
- while i <= int(max_partition / 2):
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
- # test dynamic partition re-assignment
- available = available[:-25]
-
- while i <= max(available):
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
- all_partitions = list(range(200))
- available = all_partitions
-
- max_partition = all_partitions[len(all_partitions) - 1]
- while i <= max_partition:
- assert i == partitioner(None, all_partitions, available)
- i += 1
-
-
@pytest.mark.parametrize("bytes_payload,partition_number", [
(b'', 681), (b'a', 524), (b'ab', 434), (b'abc', 107), (b'123456789', 566),
(b'\x00 ', 742)
])
def test_murmur2_java_compatibility(bytes_payload, partition_number):
- p = Murmur2Partitioner(range(1000))
+ partitioner = DefaultPartitioner()
+ all_partitions = available = list(range(1000))
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
- assert p.partition(bytes_payload) == partition_number
+ assert partitioner(bytes_payload, all_partitions, available) == partition_number
def test_murmur2_not_ascii():
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py
deleted file mode 100644
index 8f32cf8..0000000
--- a/test/test_producer_integration.py
+++ /dev/null
@@ -1,529 +0,0 @@
-import os
-import time
-import uuid
-
-import pytest
-from kafka.vendor.six.moves import range
-
-from kafka import (
- SimpleProducer, KeyedProducer,
- create_message, create_gzip_message, create_snappy_message,
- RoundRobinPartitioner, HashedPartitioner
-)
-from kafka.codec import has_snappy
-from kafka.errors import UnknownTopicOrPartitionError, LeaderNotAvailableError
-from kafka.producer.base import Producer
-from kafka.protocol.message import PartialMessage
-from kafka.structs import FetchRequestPayload, ProduceRequestPayload
-
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
-
-
-# TODO: This duplicates a TestKafkaProducerIntegration method temporarily
-# while the migration to pytest is in progress
-def assert_produce_request(client, topic, messages, initial_offset, message_ct,
- partition=0):
- """Verify the correctness of a produce request
- """
- produce = ProduceRequestPayload(topic, partition, messages=messages)
-
- # There should only be one response message from the server.
- # This will throw an exception if there's more than one.
- resp = client.send_produce_request([produce])
- assert_produce_response(resp, initial_offset)
-
- assert current_offset(client, topic, partition) == initial_offset + message_ct
-
-
-def assert_produce_response(resp, initial_offset):
- """Verify that a produce response is well-formed
- """
- assert len(resp) == 1
- assert resp[0].error == 0
- assert resp[0].offset == initial_offset
-
-
-@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
-def test_produce_many_simple(simple_client, topic):
- """Test multiple produces using the SimpleClient
- """
- start_offset = current_offset(simple_client, topic, 0)
-
- assert_produce_request(
- simple_client, topic,
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset,
- 100,
- )
-
- assert_produce_request(
- simple_client, topic,
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(100)],
- start_offset+100,
- 100,
- )
-
-
-class TestKafkaProducerIntegration(KafkaIntegrationTestCase):
-
- @classmethod
- def setUpClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk)
-
- @classmethod
- def tearDownClass(cls): # noqa
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- cls.server.close()
- cls.zk.close()
-
- def test_produce_10k_simple(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request(
- [create_message(("Test message %d" % i).encode('utf-8'))
- for i in range(10000)],
- start_offset,
- 10000,
- )
-
- def test_produce_many_gzip(self):
- start_offset = self.current_offset(self.topic, 0)
-
- message1 = create_gzip_message([
- (("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)])
- message2 = create_gzip_message([
- (("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)])
-
- self.assert_produce_request(
- [ message1, message2 ],
- start_offset,
- 200,
- )
-
- def test_produce_many_snappy(self):
- self.skipTest("All snappy integration tests fail with nosnappyjava")
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request([
- create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]),
- create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]),
- ],
- start_offset,
- 200,
- )
-
- def test_produce_mixed(self):
- start_offset = self.current_offset(self.topic, 0)
-
- msg_count = 1+100
- messages = [
- create_message(b"Just a plain message"),
- create_gzip_message([
- (("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]),
- ]
-
- # All snappy integration tests fail with nosnappyjava
- if False and has_snappy():
- msg_count += 100
- messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)]))
-
- self.assert_produce_request(messages, start_offset, msg_count)
-
- def test_produce_100k_gzipped(self):
- start_offset = self.current_offset(self.topic, 0)
-
- self.assert_produce_request([
- create_gzip_message([
- (("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
- for i in range(50000)])
- ],
- start_offset,
- 50000,
- )
-
- self.assert_produce_request([
- create_gzip_message([
- (("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
- for i in range(50000)])
- ],
- start_offset+50000,
- 50000,
- )
-
- ############################
- # SimpleProducer Tests #
- ############################
-
- def test_simple_producer_new_topic(self):
- producer = SimpleProducer(self.client)
- resp = producer.send_messages('new_topic', self.msg('foobar'))
- self.assert_produce_response(resp, 0)
- producer.stop()
-
- def test_simple_producer(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = SimpleProducer(self.client, random_start=False)
-
- # Goes to first partition, randomly.
- resp = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- self.assert_produce_response(resp, start_offsets[0])
-
- # Goes to the next partition, randomly.
- resp = producer.send_messages(self.topic, self.msg("three"))
- self.assert_produce_response(resp, start_offsets[1])
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two") ])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("three") ])
-
- # Goes back to the first partition because there's only two partitions
- resp = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
- self.assert_produce_response(resp, start_offsets[0]+2)
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("two"), self.msg("four"), self.msg("five") ])
-
- producer.stop()
-
- def test_producer_random_order(self):
- producer = SimpleProducer(self.client, random_start=True)
- resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- resp2 = producer.send_messages(self.topic, self.msg("three"))
- resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
-
- self.assertEqual(resp1[0].partition, resp3[0].partition)
- self.assertNotEqual(resp1[0].partition, resp2[0].partition)
-
- def test_producer_ordered_start(self):
- producer = SimpleProducer(self.client, random_start=False)
- resp1 = producer.send_messages(self.topic, self.msg("one"), self.msg("two"))
- resp2 = producer.send_messages(self.topic, self.msg("three"))
- resp3 = producer.send_messages(self.topic, self.msg("four"), self.msg("five"))
-
- self.assertEqual(resp1[0].partition, 0)
- self.assertEqual(resp2[0].partition, 1)
- self.assertEqual(resp3[0].partition, 0)
-
- def test_async_simple_producer(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = SimpleProducer(self.client, async_send=True, random_start=False)
- resp = producer.send_messages(self.topic, self.msg("one"))
- self.assertEqual(len(resp), 0)
-
- # flush messages
- producer.stop()
-
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
-
- def test_batched_simple_producer__triggers_by_message(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- # Configure batch producer
- batch_messages = 5
- batch_interval = 5
- producer = SimpleProducer(
- self.client,
- async_send=True,
- batch_send_every_n=batch_messages,
- batch_send_every_t=batch_interval,
- random_start=False)
-
- # Send 4 messages -- should not trigger a batch
- resp = producer.send_messages(
- self.topic,
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # It hasn't sent yet
- self.assert_fetch_offset(partitions[0], start_offsets[0], [])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [])
-
- # send 3 more messages -- should trigger batch on first 5
- resp = producer.send_messages(
- self.topic,
- self.msg("five"),
- self.msg("six"),
- self.msg("seven"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # Wait until producer has pulled all messages from internal queue
- # this should signal that the first batch was sent, and the producer
- # is now waiting for enough messages to batch again (or a timeout)
- timeout = 5
- start = time.time()
- while not producer.queue.empty():
- if time.time() - start > timeout:
- self.fail('timeout waiting for producer queue to empty')
- time.sleep(0.1)
-
- # send messages groups all *msgs in a single call to the same partition
- # so we should see all messages from the first call in one partition
- self.assert_fetch_offset(partitions[0], start_offsets[0], [
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- ])
-
- # Because we are batching every 5 messages, we should only see one
- self.assert_fetch_offset(partitions[1], start_offsets[1], [
- self.msg("five"),
- ])
-
- producer.stop()
-
- def test_batched_simple_producer__triggers_by_time(self):
- self.skipTest("Flakey test -- should be refactored or removed")
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- batch_interval = 5
- producer = SimpleProducer(
- self.client,
- async_send=True,
- batch_send_every_n=100,
- batch_send_every_t=batch_interval,
- random_start=False)
-
- # Send 5 messages and do a fetch
- resp = producer.send_messages(
- self.topic,
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # It hasn't sent yet
- self.assert_fetch_offset(partitions[0], start_offsets[0], [])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [])
-
- resp = producer.send_messages(self.topic,
- self.msg("five"),
- self.msg("six"),
- self.msg("seven"),
- )
-
- # Batch mode is async. No ack
- self.assertEqual(len(resp), 0)
-
- # Wait the timeout out
- time.sleep(batch_interval)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [
- self.msg("one"),
- self.msg("two"),
- self.msg("three"),
- self.msg("four"),
- ])
-
- self.assert_fetch_offset(partitions[1], start_offsets[1], [
- self.msg("five"),
- self.msg("six"),
- self.msg("seven"),
- ])
-
- producer.stop()
-
-
- ############################
- # KeyedProducer Tests #
- ############################
-
- @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
- def test_keyedproducer_null_payload(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
- key = "test"
-
- resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
- self.assert_produce_response(resp, start_offsets[0])
- resp = producer.send_messages(self.topic, self.key("key2"), None)
- self.assert_produce_response(resp, start_offsets[1])
- resp = producer.send_messages(self.topic, self.key("key3"), None)
- self.assert_produce_response(resp, start_offsets[0]+1)
- resp = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
- self.assert_produce_response(resp, start_offsets[1]+1)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), None ])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [ None, self.msg("four") ])
-
- producer.stop()
-
- def test_round_robin_partitioner(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner)
- resp1 = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
- resp2 = producer.send_messages(self.topic, self.key("key2"), self.msg("two"))
- resp3 = producer.send_messages(self.topic, self.key("key3"), self.msg("three"))
- resp4 = producer.send_messages(self.topic, self.key("key4"), self.msg("four"))
-
- self.assert_produce_response(resp1, start_offsets[0]+0)
- self.assert_produce_response(resp2, start_offsets[1]+0)
- self.assert_produce_response(resp3, start_offsets[0]+1)
- self.assert_produce_response(resp4, start_offsets[1]+1)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], [ self.msg("one"), self.msg("three") ])
- self.assert_fetch_offset(partitions[1], start_offsets[1], [ self.msg("two"), self.msg("four") ])
-
- producer.stop()
-
- def test_hashed_partitioner(self):
- partitions = self.client.get_partition_ids_for_topic(self.topic)
- start_offsets = [self.current_offset(self.topic, p) for p in partitions]
-
- producer = KeyedProducer(self.client, partitioner=HashedPartitioner)
- resp1 = producer.send_messages(self.topic, self.key("1"), self.msg("one"))
- resp2 = producer.send_messages(self.topic, self.key("2"), self.msg("two"))
- resp3 = producer.send_messages(self.topic, self.key("3"), self.msg("three"))
- resp4 = producer.send_messages(self.topic, self.key("3"), self.msg("four"))
- resp5 = producer.send_messages(self.topic, self.key("4"), self.msg("five"))
-
- offsets = {partitions[0]: start_offsets[0], partitions[1]: start_offsets[1]}
- messages = {partitions[0]: [], partitions[1]: []}
-
- keys = [self.key(k) for k in ["1", "2", "3", "3", "4"]]
- resps = [resp1, resp2, resp3, resp4, resp5]
- msgs = [self.msg(m) for m in ["one", "two", "three", "four", "five"]]
-
- for key, resp, msg in zip(keys, resps, msgs):
- k = hash(key) % 2
- partition = partitions[k]
- offset = offsets[partition]
- self.assert_produce_response(resp, offset)
- offsets[partition] += 1
- messages[partition].append(msg)
-
- self.assert_fetch_offset(partitions[0], start_offsets[0], messages[partitions[0]])
- self.assert_fetch_offset(partitions[1], start_offsets[1], messages[partitions[1]])
-
- producer.stop()
-
- def test_async_keyed_producer(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = KeyedProducer(self.client,
- partitioner=RoundRobinPartitioner,
- async_send=True,
- batch_send_every_t=1)
-
- resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))
- self.assertEqual(len(resp), 0)
-
- # wait for the server to report a new highwatermark
- while self.current_offset(self.topic, partition) == start_offset:
- time.sleep(0.1)
-
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
- producer.stop()
-
- ############################
- # Producer ACK Tests #
- ############################
-
- def test_acks_none(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = Producer(
- self.client,
- req_acks=Producer.ACK_NOT_REQUIRED,
- )
- resp = producer.send_messages(self.topic, partition, self.msg("one"))
-
- # No response from produce request with no acks required
- self.assertEqual(len(resp), 0)
-
- # But the message should still have been delivered
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
- producer.stop()
-
- def test_acks_local_write(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = Producer(
- self.client,
- req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
- )
- resp = producer.send_messages(self.topic, partition, self.msg("one"))
-
- self.assert_produce_response(resp, start_offset)
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
- producer.stop()
-
- def test_acks_cluster_commit(self):
- partition = self.client.get_partition_ids_for_topic(self.topic)[0]
- start_offset = self.current_offset(self.topic, partition)
-
- producer = Producer(
- self.client,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
- )
-
- resp = producer.send_messages(self.topic, partition, self.msg("one"))
- self.assert_produce_response(resp, start_offset)
- self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ])
-
- producer.stop()
-
- def assert_produce_request(self, messages, initial_offset, message_ct,
- partition=0):
- produce = ProduceRequestPayload(self.topic, partition, messages=messages)
-
- # There should only be one response message from the server.
- # This will throw an exception if there's more than one.
- resp = self.client.send_produce_request([ produce ])
- self.assert_produce_response(resp, initial_offset)
-
- self.assertEqual(self.current_offset(self.topic, partition), initial_offset + message_ct)
-
- def assert_produce_response(self, resp, initial_offset):
- self.assertEqual(len(resp), 1)
- self.assertEqual(resp[0].error, 0)
- self.assertEqual(resp[0].offset, initial_offset)
-
- def assert_fetch_offset(self, partition, start_offset, expected_messages):
- # There should only be one response message from the server.
- # This will throw an exception if there's more than one.
-
- resp, = self.client.send_fetch_request([FetchRequestPayload(self.topic, partition, start_offset, 1024)])
-
- self.assertEqual(resp.error, 0)
- self.assertEqual(resp.partition, partition)
- messages = [ x.message.value for x in resp.messages
- if not isinstance(x.message, PartialMessage) ]
-
- self.assertEqual(messages, expected_messages)
- self.assertEqual(resp.highwaterMark, start_offset+len(expected_messages))
diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py
deleted file mode 100644
index ab80ee7..0000000
--- a/test/test_producer_legacy.py
+++ /dev/null
@@ -1,257 +0,0 @@
-# -*- coding: utf-8 -*-
-
-import collections
-import logging
-import threading
-import time
-
-from mock import MagicMock, patch
-from . import unittest
-
-from kafka import SimpleClient, SimpleProducer, KeyedProducer
-from kafka.errors import (
- AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError)
-from kafka.producer.base import Producer, _send_upstream
-from kafka.protocol import CODEC_NONE
-from kafka.structs import (
- ProduceResponsePayload, RetryOptions, TopicPartition)
-
-from kafka.vendor.six.moves import queue, range
-
-
-class TestKafkaProducer(unittest.TestCase):
- def test_producer_message_types(self):
-
- producer = Producer(MagicMock())
- topic = b"test-topic"
- partition = 0
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'}, None,)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, partition, m)
-
- good_data_types = (b'a string!',)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, partition, m)
-
- def test_keyedproducer_message_types(self):
- client = MagicMock()
- client.get_partition_ids_for_topic.return_value = [0, 1]
- producer = KeyedProducer(client)
- topic = b"test-topic"
- key = b"testkey"
-
- bad_data_types = (u'你怎么样?', 12, ['a', 'list'],
- ('a', 'tuple'), {'a': 'dict'},)
- for m in bad_data_types:
- with self.assertRaises(TypeError):
- logging.debug("attempting to send message of type %s", type(m))
- producer.send_messages(topic, key, m)
-
- good_data_types = (b'a string!', None,)
- for m in good_data_types:
- # This should not raise an exception
- producer.send_messages(topic, key, m)
-
- def test_topic_message_types(self):
- client = MagicMock()
-
- def partitions(topic):
- return [0, 1]
-
- client.get_partition_ids_for_topic = partitions
-
- producer = SimpleProducer(client, random_start=False)
- topic = b"test-topic"
- producer.send_messages(topic, b'hi')
- assert client.send_produce_request.called
-
- @patch('kafka.producer.base._send_upstream')
- def test_producer_async_queue_overfilled(self, mock):
- queue_size = 2
- producer = Producer(MagicMock(), async_send=True,
- async_queue_maxsize=queue_size)
-
- topic = b'test-topic'
- partition = 0
- message = b'test-message'
-
- with self.assertRaises(AsyncProducerQueueFull):
- message_list = [message] * (queue_size + 1)
- producer.send_messages(topic, partition, *message_list)
- self.assertEqual(producer.queue.qsize(), queue_size)
- for _ in range(producer.queue.qsize()):
- producer.queue.get()
-
- def test_producer_sync_fail_on_error(self):
- error = FailedPayloadsError('failure')
- with patch.object(SimpleClient, 'load_metadata_for_topics'):
- with patch.object(SimpleClient, 'ensure_topic_exists'):
- with patch.object(SimpleClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
- with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
-
- client = SimpleClient(MagicMock())
- producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
-
- # This should not raise
- (response,) = producer.send_messages('foobar', b'test message')
- self.assertEqual(response, error)
-
- producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
- with self.assertRaises(FailedPayloadsError):
- producer.send_messages('foobar', b'test message')
-
- def test_cleanup_is_not_called_on_stopped_producer(self):
- producer = Producer(MagicMock(), async_send=True)
- producer.stopped = True
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 0)
-
- def test_cleanup_is_called_on_running_producer(self):
- producer = Producer(MagicMock(), async_send=True)
- producer.stopped = False
- with patch.object(producer, 'stop') as mocked_stop:
- producer._cleanup_func(producer)
- self.assertEqual(mocked_stop.call_count, 1)
-
-
-class TestKafkaProducerSendUpstream(unittest.TestCase):
-
- def setUp(self):
- self.client = MagicMock()
- self.queue = queue.Queue()
-
- def _run_process(self, retries_limit=3, sleep_timeout=1):
- # run _send_upstream process with the queue
- stop_event = threading.Event()
- retry_options = RetryOptions(limit=retries_limit,
- backoff_ms=50,
- retry_on_timeouts=False)
- self.thread = threading.Thread(
- target=_send_upstream,
- args=(self.queue, self.client, CODEC_NONE,
- 0.3, # batch time (seconds)
- 3, # batch length
- Producer.ACK_AFTER_LOCAL_WRITE,
- Producer.DEFAULT_ACK_TIMEOUT,
- retry_options,
- stop_event))
- self.thread.daemon = True
- self.thread.start()
- time.sleep(sleep_timeout)
- stop_event.set()
-
- def test_wo_retries(self):
-
- # lets create a queue and add 10 messages for 1 partition
- for i in range(10):
- self.queue.put((TopicPartition("test", 0), "msg %i", "key %i"))
-
- self._run_process()
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 4 non-void cals:
- # 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 4)
-
- def test_first_send_failed(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [FailedPayloadsError(req) for req in reqs]
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # plus 3 batches of 3 msgs each + 1 batch of 1 message
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def test_with_limited_retries(self):
-
- # lets create a queue and add 10 messages for 10 different partitions
- # to show how retries should work ideally
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i" % i, "key %i" % i))
-
- def send_side_effect(reqs, *args, **kwargs):
- return [FailedPayloadsError(req) for req in reqs]
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(3, 3)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 16 non-void calls:
- # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg +
- # 3 retries of the batches above = (1 + 3 retries) * 4 batches = 16
- self.assertEqual(self.client.send_produce_request.call_count, 16)
-
- def test_async_producer_not_leader(self):
-
- for i in range(10):
- self.queue.put((TopicPartition("test", i), "msg %i", "key %i"))
-
- # Mock offsets counter for closure
- offsets = collections.defaultdict(lambda: collections.defaultdict(lambda: 0))
- self.client.is_first_time = True
- def send_side_effect(reqs, *args, **kwargs):
- if self.client.is_first_time:
- self.client.is_first_time = False
- return [ProduceResponsePayload(req.topic, req.partition,
- NotLeaderForPartitionError.errno, -1)
- for req in reqs]
-
- responses = []
- for req in reqs:
- offset = offsets[req.topic][req.partition]
- offsets[req.topic][req.partition] += len(req.messages)
- responses.append(
- ProduceResponsePayload(req.topic, req.partition, 0, offset)
- )
- return responses
-
- self.client.send_produce_request.side_effect = send_side_effect
-
- self._run_process(2)
-
- # the queue should be void at the end of the test
- self.assertEqual(self.queue.empty(), True)
-
- # there should be 5 non-void calls: 1st failed batch of 3 msgs
- # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5
- self.assertEqual(self.client.send_produce_request.call_count, 5)
-
- def tearDown(self):
- for _ in range(self.queue.qsize()):
- self.queue.get()
diff --git a/test/test_protocol_legacy.py b/test/test_protocol_legacy.py
deleted file mode 100644
index 1341af0..0000000
--- a/test/test_protocol_legacy.py
+++ /dev/null
@@ -1,848 +0,0 @@
-#pylint: skip-file
-from contextlib import contextmanager
-import struct
-
-from kafka.vendor import six
-from mock import patch, sentinel
-from . import unittest
-
-from kafka.codec import has_snappy, gzip_decode, snappy_decode
-from kafka.errors import (
- ChecksumError, KafkaUnavailableError, UnsupportedCodecError,
- ConsumerFetchSizeTooSmall, ProtocolError)
-from kafka.protocol import (
- ATTRIBUTE_CODEC_MASK, CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, KafkaProtocol,
- create_message, create_gzip_message, create_snappy_message,
- create_message_set)
-from kafka.structs import (
- OffsetRequestPayload, OffsetResponsePayload,
- OffsetCommitRequestPayload, OffsetCommitResponsePayload,
- OffsetFetchRequestPayload, OffsetFetchResponsePayload,
- ProduceRequestPayload, ProduceResponsePayload,
- FetchRequestPayload, FetchResponsePayload,
- Message, OffsetAndMessage, BrokerMetadata, ConsumerMetadataResponse)
-
-
-class TestProtocol(unittest.TestCase):
- def test_create_message(self):
- payload = "test"
- key = "key"
- msg = create_message(payload, key)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, 0)
- self.assertEqual(msg.key, key)
- self.assertEqual(msg.value, payload)
-
- def test_create_gzip(self):
- payloads = [(b"v1", None), (b"v2", None)]
- msg = create_gzip_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
- self.assertEqual(msg.key, None)
- # Need to decode to check since gzipped payload is non-deterministic
- decoded = gzip_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", 1285512130), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v1", # Message contents
-
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", -711587208), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v2", # Message contents
- ])
-
- self.assertEqual(decoded, expect)
-
- def test_create_gzip_keyed(self):
- payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
- msg = create_gzip_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
- self.assertEqual(msg.key, None)
- # Need to decode to check since gzipped payload is non-deterministic
- decoded = gzip_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- self.assertEqual(decoded, expect)
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_create_snappy(self):
- payloads = [(b"v1", None), (b"v2", None)]
- msg = create_snappy_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
- self.assertEqual(msg.key, None)
- decoded = snappy_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", 1285512130), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v1", # Message contents
-
- struct.pack(">q", 0), # MsgSet offset
- struct.pack(">i", 16), # MsgSet size
- struct.pack(">i", -711587208), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", -1), # -1 indicates a null key
- struct.pack(">i", 2), # Msg length (bytes)
- b"v2", # Message contents
- ])
-
- self.assertEqual(decoded, expect)
-
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_create_snappy_keyed(self):
- payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
- msg = create_snappy_message(payloads)
- self.assertEqual(msg.magic, 0)
- self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
- self.assertEqual(msg.key, None)
- decoded = snappy_decode(msg.value)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- self.assertEqual(decoded, expect)
-
- def test_encode_message_header(self):
- expect = b"".join([
- struct.pack(">h", 10), # API Key
- struct.pack(">h", 0), # API Version
- struct.pack(">i", 4), # Correlation Id
- struct.pack(">h", len("client3")), # Length of clientId
- b"client3", # ClientId
- ])
-
- encoded = KafkaProtocol._encode_message_header(b"client3", 4, 10)
- self.assertEqual(encoded, expect)
-
- def test_encode_message(self):
- message = create_message(b"test", b"key")
- encoded = KafkaProtocol._encode_message(message)
- expect = b"".join([
- struct.pack(">i", -1427009701), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 3), # Length of key
- b"key", # key
- struct.pack(">i", 4), # Length of value
- b"test", # value
- ])
-
- self.assertEqual(encoded, expect)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message(self):
- encoded = b"".join([
- struct.pack(">i", -1427009701), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 3), # Length of key
- b"key", # key
- struct.pack(">i", 4), # Length of value
- b"test", # value
- ])
-
- offset = 10
- (returned_offset, decoded_message) = list(KafkaProtocol._decode_message(encoded, offset))[0]
-
- self.assertEqual(returned_offset, offset)
- self.assertEqual(decoded_message, create_message(b"test", b"key"))
-
- def test_encode_message_failure(self):
- with self.assertRaises(ProtocolError):
- KafkaProtocol._encode_message(Message(1, 0, "key", "test"))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_message_set(self):
- message_set = [
- create_message(b"v1", b"k1"),
- create_message(b"v2", b"k2")
- ]
-
- encoded = KafkaProtocol._encode_message_set(message_set)
- expect = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- self.assertEqual(encoded, expect)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_set(self):
- encoded = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 1), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- ])
-
- msgs = list(KafkaProtocol._decode_message_set_iter(encoded))
- self.assertEqual(len(msgs), 2)
- msg1, msg2 = msgs
-
- returned_offset1, decoded_message1 = msg1
- returned_offset2, decoded_message2 = msg2
-
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1", b"k1"))
-
- self.assertEqual(returned_offset2, 1)
- self.assertEqual(decoded_message2, create_message(b"v2", b"k2"))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_gzip(self):
- gzip_encoded = (b'\xc0\x11\xb2\xf0\x00\x01\xff\xff\xff\xff\x00\x00\x000'
- b'\x1f\x8b\x08\x00\xa1\xc1\xc5R\x02\xffc`\x80\x03\x01'
- b'\x9f\xf9\xd1\x87\x18\x18\xfe\x03\x01\x90\xc7Tf\xc8'
- b'\x80$wu\x1aW\x05\x92\x9c\x11\x00z\xc0h\x888\x00\x00'
- b'\x00')
- offset = 11
- messages = list(KafkaProtocol._decode_message(gzip_encoded, offset))
-
- self.assertEqual(len(messages), 2)
- msg1, msg2 = messages
-
- returned_offset1, decoded_message1 = msg1
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1"))
-
- returned_offset2, decoded_message2 = msg2
- self.assertEqual(returned_offset2, 0)
- self.assertEqual(decoded_message2, create_message(b"v2"))
-
- @unittest.skip('needs updating for new protocol classes')
- @unittest.skipUnless(has_snappy(), "Snappy not available")
- def test_decode_message_snappy(self):
- snappy_encoded = (b'\xec\x80\xa1\x95\x00\x02\xff\xff\xff\xff\x00\x00'
- b'\x00,8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff'
- b'\xff\xff\xff\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5'
- b'\x96\nx\x00\x00\xff\xff\xff\xff\x00\x00\x00\x02v2')
- offset = 11
- messages = list(KafkaProtocol._decode_message(snappy_encoded, offset))
- self.assertEqual(len(messages), 2)
-
- msg1, msg2 = messages
-
- returned_offset1, decoded_message1 = msg1
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1, create_message(b"v1"))
-
- returned_offset2, decoded_message2 = msg2
- self.assertEqual(returned_offset2, 0)
- self.assertEqual(decoded_message2, create_message(b"v2"))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_checksum_error(self):
- invalid_encoded_message = b"This is not a valid encoded message"
- iter = KafkaProtocol._decode_message(invalid_encoded_message, 0)
- self.assertRaises(ChecksumError, list, iter)
-
- # NOTE: The error handling in _decode_message_set_iter() is questionable.
- # If it's modified, the next two tests might need to be fixed.
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_set_fetch_size_too_small(self):
- with self.assertRaises(ConsumerFetchSizeTooSmall):
- list(KafkaProtocol._decode_message_set_iter('a'))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_message_set_stop_iteration(self):
- encoded = b"".join([
- struct.pack(">q", 0), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", 1474775406), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k1", # Key
- struct.pack(">i", 2), # Length of value
- b"v1", # Value
-
- struct.pack(">q", 1), # MsgSet Offset
- struct.pack(">i", 18), # Msg Size
- struct.pack(">i", -16383415), # CRC
- struct.pack(">bb", 0, 0), # Magic, flags
- struct.pack(">i", 2), # Length of key
- b"k2", # Key
- struct.pack(">i", 2), # Length of value
- b"v2", # Value
- b"@1$%(Y!", # Random padding
- ])
-
- msgs = MessageSet.decode(io.BytesIO(encoded))
- self.assertEqual(len(msgs), 2)
- msg1, msg2 = msgs
-
- returned_offset1, msg_size1, decoded_message1 = msg1
- returned_offset2, msg_size2, decoded_message2 = msg2
-
- self.assertEqual(returned_offset1, 0)
- self.assertEqual(decoded_message1.value, b"v1")
- self.assertEqual(decoded_message1.key, b"k1")
-
- self.assertEqual(returned_offset2, 1)
- self.assertEqual(decoded_message2.value, b"v2")
- self.assertEqual(decoded_message2.key, b"k2")
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_produce_request(self):
- requests = [
- ProduceRequestPayload("topic1", 0, [
- kafka.protocol.message.Message(b"a"),
- kafka.protocol.message.Message(b"b")
- ]),
- ProduceRequestPayload("topic2", 1, [
- kafka.protocol.message.Message(b"c")
- ])
- ]
-
- msg_a_binary = KafkaProtocol._encode_message(create_message(b"a"))
- msg_b_binary = KafkaProtocol._encode_message(create_message(b"b"))
- msg_c_binary = KafkaProtocol._encode_message(create_message(b"c"))
-
- header = b"".join([
- struct.pack('>i', 0x94), # The length of the message overall
- struct.pack('>h', 0), # Msg Header, Message type = Produce
- struct.pack('>h', 0), # Msg Header, API version
- struct.pack('>i', 2), # Msg Header, Correlation ID
- struct.pack('>h7s', 7, b"client1"), # Msg Header, The client ID
- struct.pack('>h', 2), # Num acks required
- struct.pack('>i', 100), # Request Timeout
- struct.pack('>i', 2), # The number of requests
- ])
-
- total_len = len(msg_a_binary) + len(msg_b_binary)
- topic1 = b"".join([
- struct.pack('>h6s', 6, b'topic1'), # The topic1
- struct.pack('>i', 1), # One message set
- struct.pack('>i', 0), # Partition 0
- struct.pack('>i', total_len + 24), # Size of the incoming message set
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_a_binary)), # Length of message
- msg_a_binary, # Actual message
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_b_binary)), # Length of message
- msg_b_binary, # Actual message
- ])
-
- topic2 = b"".join([
- struct.pack('>h6s', 6, b'topic2'), # The topic1
- struct.pack('>i', 1), # One message set
- struct.pack('>i', 1), # Partition 1
- struct.pack('>i', len(msg_c_binary) + 12), # Size of the incoming message set
- struct.pack('>q', 0), # No offset specified
- struct.pack('>i', len(msg_c_binary)), # Length of message
- msg_c_binary, # Actual message
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_produce_request(b"client1", 2, requests, 2, 100)
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_produce_response(self):
- t1 = b"topic1"
- t2 = b"topic2"
- _long = int
- if six.PY2:
- _long = long
- encoded = struct.pack('>iih%dsiihqihqh%dsiihq' % (len(t1), len(t2)),
- 2, 2, len(t1), t1, 2, 0, 0, _long(10), 1, 1, _long(20),
- len(t2), t2, 1, 0, 0, _long(30))
- responses = list(KafkaProtocol.decode_produce_response(encoded))
- self.assertEqual(responses,
- [ProduceResponse(t1, 0, 0, _long(10)),
- ProduceResponse(t1, 1, 1, _long(20)),
- ProduceResponse(t2, 0, 0, _long(30))])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_fetch_request(self):
- requests = [
- FetchRequest(b"topic1", 0, 10, 1024),
- FetchRequest(b"topic2", 1, 20, 100),
- ]
-
- header = b"".join([
- struct.pack('>i', 89), # The length of the message overall
- struct.pack('>h', 1), # Msg Header, Message type = Fetch
- struct.pack('>h', 0), # Msg Header, API version
- struct.pack('>i', 3), # Msg Header, Correlation ID
- struct.pack('>h7s', 7, b"client1"),# Msg Header, The client ID
- struct.pack('>i', -1), # Replica Id
- struct.pack('>i', 2), # Max wait time
- struct.pack('>i', 100), # Min bytes
- struct.pack('>i', 2), # Num requests
- ])
-
- topic1 = b"".join([
- struct.pack('>h6s', 6, b'topic1'),# Topic
- struct.pack('>i', 1), # Num Payloads
- struct.pack('>i', 0), # Partition 0
- struct.pack('>q', 10), # Offset
- struct.pack('>i', 1024), # Max Bytes
- ])
-
- topic2 = b"".join([
- struct.pack('>h6s', 6, b'topic2'),# Topic
- struct.pack('>i', 1), # Num Payloads
- struct.pack('>i', 1), # Partition 0
- struct.pack('>q', 20), # Offset
- struct.pack('>i', 100), # Max Bytes
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_fetch_request(b"client1", 3, requests, 2, 100)
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_fetch_response(self):
- t1 = b"topic1"
- t2 = b"topic2"
- msgs = [create_message(msg)
- for msg in [b"message1", b"hi", b"boo", b"foo", b"so fun!"]]
- ms1 = KafkaProtocol._encode_message_set([msgs[0], msgs[1]])
- ms2 = KafkaProtocol._encode_message_set([msgs[2]])
- ms3 = KafkaProtocol._encode_message_set([msgs[3], msgs[4]])
-
- encoded = struct.pack('>iih%dsiihqi%dsihqi%dsh%dsiihqi%ds' %
- (len(t1), len(ms1), len(ms2), len(t2), len(ms3)),
- 4, 2, len(t1), t1, 2, 0, 0, 10, len(ms1), ms1, 1,
- 1, 20, len(ms2), ms2, len(t2), t2, 1, 0, 0, 30,
- len(ms3), ms3)
-
- responses = list(KafkaProtocol.decode_fetch_response(encoded))
- def expand_messages(response):
- return FetchResponsePayload(response.topic, response.partition,
- response.error, response.highwaterMark,
- list(response.messages))
-
- expanded_responses = list(map(expand_messages, responses))
- expect = [FetchResponsePayload(t1, 0, 0, 10, [OffsetAndMessage(0, msgs[0]),
- OffsetAndMessage(0, msgs[1])]),
- FetchResponsePayload(t1, 1, 1, 20, [OffsetAndMessage(0, msgs[2])]),
- FetchResponsePayload(t2, 0, 0, 30, [OffsetAndMessage(0, msgs[3]),
- OffsetAndMessage(0, msgs[4])])]
- self.assertEqual(expanded_responses, expect)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_metadata_request_no_topics(self):
- expected = b"".join([
- struct.pack(">i", 17), # Total length of the request
- struct.pack('>h', 3), # API key metadata fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"),# The client ID
- struct.pack('>i', 0), # No topics, give all the data!
- ])
-
- encoded = KafkaProtocol.encode_metadata_request(b"cid", 4)
-
- self.assertEqual(encoded, expected)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_metadata_request_with_topics(self):
- expected = b"".join([
- struct.pack(">i", 25), # Total length of the request
- struct.pack('>h', 3), # API key metadata fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"),# The client ID
- struct.pack('>i', 2), # Number of topics in the request
- struct.pack('>h2s', 2, b"t1"), # Topic "t1"
- struct.pack('>h2s', 2, b"t2"), # Topic "t2"
- ])
-
- encoded = KafkaProtocol.encode_metadata_request(b"cid", 4, [b"t1", b"t2"])
-
- self.assertEqual(encoded, expected)
-
- def _create_encoded_metadata_response(self, brokers, topics):
- encoded = []
- encoded.append(struct.pack('>ii', 3, len(brokers)))
- for broker in brokers:
- encoded.append(struct.pack('>ih%dsi' % len(broker.host),
- broker.nodeId, len(broker.host),
- broker.host, broker.port))
-
- encoded.append(struct.pack('>i', len(topics)))
- for topic in topics:
- encoded.append(struct.pack('>hh%dsi' % len(topic.topic),
- topic.error, len(topic.topic),
- topic.topic, len(topic.partitions)))
- for metadata in topic.partitions:
- encoded.append(struct.pack('>hiii', metadata.error,
- metadata.partition, metadata.leader,
- len(metadata.replicas)))
- if len(metadata.replicas) > 0:
- encoded.append(struct.pack('>%di' % len(metadata.replicas),
- *metadata.replicas))
-
- encoded.append(struct.pack('>i', len(metadata.isr)))
- if len(metadata.isr) > 0:
- encoded.append(struct.pack('>%di' % len(metadata.isr),
- *metadata.isr))
- return b''.join(encoded)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_metadata_response(self):
- node_brokers = [
- BrokerMetadata(0, b"brokers1.kafka.rdio.com", 1000),
- BrokerMetadata(1, b"brokers1.kafka.rdio.com", 1001),
- BrokerMetadata(3, b"brokers2.kafka.rdio.com", 1000)
- ]
-
- '''
- topic_partitions = [
- TopicMetadata(b"topic1", 0, [
- PartitionMetadata(b"topic1", 0, 1, (0, 2), (2,), 0),
- PartitionMetadata(b"topic1", 1, 3, (0, 1), (0, 1), 1)
- ]),
- TopicMetadata(b"topic2", 1, [
- PartitionMetadata(b"topic2", 0, 0, (), (), 0),
- ]),
- ]
- encoded = self._create_encoded_metadata_response(node_brokers,
- topic_partitions)
- decoded = KafkaProtocol.decode_metadata_response(encoded)
- self.assertEqual(decoded, (node_brokers, topic_partitions))
- '''
-
- def test_encode_consumer_metadata_request(self):
- expected = b"".join([
- struct.pack(">i", 17), # Total length of the request
- struct.pack('>h', 10), # API key consumer metadata
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"),# The client ID
- struct.pack('>h2s', 2, b"g1"), # Group "g1"
- ])
-
- encoded = KafkaProtocol.encode_consumer_metadata_request(b"cid", 4, b"g1")
-
- self.assertEqual(encoded, expected)
-
- def test_decode_consumer_metadata_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">h", 0), # No Error
- struct.pack(">i", 1), # Broker ID
- struct.pack(">h23s", 23, b"brokers1.kafka.rdio.com"), # Broker Host
- struct.pack(">i", 1000), # Broker Port
- ])
-
- results = KafkaProtocol.decode_consumer_metadata_response(encoded)
- self.assertEqual(results,
- ConsumerMetadataResponse(error = 0, nodeId = 1, host = b'brokers1.kafka.rdio.com', port = 1000)
- )
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_request(self):
- expected = b"".join([
- struct.pack(">i", 21), # Total length of the request
- struct.pack('>h', 2), # Message type = offset fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"), # The client ID
- struct.pack('>i', -1), # Replica Id
- struct.pack('>i', 0), # No topic/partitions
- ])
-
- encoded = KafkaProtocol.encode_offset_request(b"cid", 4)
-
- self.assertEqual(encoded, expected)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_request__no_payload(self):
- expected = b"".join([
- struct.pack(">i", 65), # Total length of the request
-
- struct.pack('>h', 2), # Message type = offset fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 4), # Correlation ID
- struct.pack('>h3s', 3, b"cid"), # The client ID
- struct.pack('>i', -1), # Replica Id
- struct.pack('>i', 1), # Num topics
- struct.pack(">h6s", 6, b"topic1"),# Topic for the request
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 3), # Partition 3
- struct.pack(">q", -1), # No time offset
- struct.pack(">i", 1), # One offset requested
-
- struct.pack(">i", 4), # Partition 3
- struct.pack(">q", -1), # No time offset
- struct.pack(">i", 1), # One offset requested
- ])
-
- encoded = KafkaProtocol.encode_offset_request(b"cid", 4, [
- OffsetRequest(b'topic1', 3, -1, 1),
- OffsetRequest(b'topic1', 4, -1, 1),
- ])
-
- self.assertEqual(encoded, expected)
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_offset_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">i", 1), # One topics
- struct.pack(">h6s", 6, b"topic1"),# First topic
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 2), # Partition 2
- struct.pack(">h", 0), # No error
- struct.pack(">i", 1), # One offset
- struct.pack(">q", 4), # Offset 4
-
- struct.pack(">i", 4), # Partition 4
- struct.pack(">h", 0), # No error
- struct.pack(">i", 1), # One offset
- struct.pack(">q", 8), # Offset 8
- ])
-
- results = KafkaProtocol.decode_offset_response(encoded)
- self.assertEqual(set(results), set([
- OffsetResponse(topic = b'topic1', partition = 2, error = 0, offsets=(4,)),
- OffsetResponse(topic = b'topic1', partition = 4, error = 0, offsets=(8,)),
- ]))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_commit_request(self):
- header = b"".join([
- struct.pack('>i', 99), # Total message length
-
- struct.pack('>h', 8), # Message type = offset commit
- struct.pack('>h', 0), # API version
- struct.pack('>i', 42), # Correlation ID
- struct.pack('>h9s', 9, b"client_id"),# The client ID
- struct.pack('>h8s', 8, b"group_id"), # The group to commit for
- struct.pack('>i', 2), # Num topics
- ])
-
- topic1 = b"".join([
- struct.pack(">h6s", 6, b"topic1"), # Topic for the request
- struct.pack(">i", 2), # Two partitions
- struct.pack(">i", 0), # Partition 0
- struct.pack(">q", 123), # Offset 123
- struct.pack(">h", -1), # Null metadata
- struct.pack(">i", 1), # Partition 1
- struct.pack(">q", 234), # Offset 234
- struct.pack(">h", -1), # Null metadata
- ])
-
- topic2 = b"".join([
- struct.pack(">h6s", 6, b"topic2"), # Topic for the request
- struct.pack(">i", 1), # One partition
- struct.pack(">i", 2), # Partition 2
- struct.pack(">q", 345), # Offset 345
- struct.pack(">h", -1), # Null metadata
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_offset_commit_request(b"client_id", 42, b"group_id", [
- OffsetCommitRequest(b"topic1", 0, 123, None),
- OffsetCommitRequest(b"topic1", 1, 234, None),
- OffsetCommitRequest(b"topic2", 2, 345, None),
- ])
-
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_offset_commit_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">i", 1), # One topic
- struct.pack(">h6s", 6, b"topic1"),# First topic
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 2), # Partition 2
- struct.pack(">h", 0), # No error
-
- struct.pack(">i", 4), # Partition 4
- struct.pack(">h", 0), # No error
- ])
-
- results = KafkaProtocol.decode_offset_commit_response(encoded)
- self.assertEqual(set(results), set([
- OffsetCommitResponse(topic = b'topic1', partition = 2, error = 0),
- OffsetCommitResponse(topic = b'topic1', partition = 4, error = 0),
- ]))
-
- @unittest.skip('needs updating for new protocol classes')
- def test_encode_offset_fetch_request(self):
- header = b"".join([
- struct.pack('>i', 69), # Total message length
- struct.pack('>h', 9), # Message type = offset fetch
- struct.pack('>h', 0), # API version
- struct.pack('>i', 42), # Correlation ID
- struct.pack('>h9s', 9, b"client_id"),# The client ID
- struct.pack('>h8s', 8, b"group_id"), # The group to commit for
- struct.pack('>i', 2), # Num topics
- ])
-
- topic1 = b"".join([
- struct.pack(">h6s", 6, b"topic1"), # Topic for the request
- struct.pack(">i", 2), # Two partitions
- struct.pack(">i", 0), # Partition 0
- struct.pack(">i", 1), # Partition 1
- ])
-
- topic2 = b"".join([
- struct.pack(">h6s", 6, b"topic2"), # Topic for the request
- struct.pack(">i", 1), # One partitions
- struct.pack(">i", 2), # Partition 2
- ])
-
- expected1 = b"".join([ header, topic1, topic2 ])
- expected2 = b"".join([ header, topic2, topic1 ])
-
- encoded = KafkaProtocol.encode_offset_fetch_request(b"client_id", 42, b"group_id", [
- OffsetFetchRequest(b"topic1", 0),
- OffsetFetchRequest(b"topic1", 1),
- OffsetFetchRequest(b"topic2", 2),
- ])
-
- self.assertIn(encoded, [ expected1, expected2 ])
-
- @unittest.skip('needs updating for new protocol classes')
- def test_decode_offset_fetch_response(self):
- encoded = b"".join([
- struct.pack(">i", 42), # Correlation ID
- struct.pack(">i", 1), # One topics
- struct.pack(">h6s", 6, b"topic1"),# First topic
- struct.pack(">i", 2), # Two partitions
-
- struct.pack(">i", 2), # Partition 2
- struct.pack(">q", 4), # Offset 4
- struct.pack(">h4s", 4, b"meta"), # Metadata
- struct.pack(">h", 0), # No error
-
- struct.pack(">i", 4), # Partition 4
- struct.pack(">q", 8), # Offset 8
- struct.pack(">h4s", 4, b"meta"), # Metadata
- struct.pack(">h", 0), # No error
- ])
-
- results = KafkaProtocol.decode_offset_fetch_response(encoded)
- self.assertEqual(set(results), set([
- OffsetFetchResponse(topic = b'topic1', partition = 2, offset = 4, error = 0, metadata = b"meta"),
- OffsetFetchResponse(topic = b'topic1', partition = 4, offset = 8, error = 0, metadata = b"meta"),
- ]))
-
- @contextmanager
- def mock_create_message_fns(self):
- import kafka.protocol
- with patch.object(kafka.protocol.legacy, "create_message",
- return_value=sentinel.message):
- with patch.object(kafka.protocol.legacy, "create_gzip_message",
- return_value=sentinel.gzip_message):
- with patch.object(kafka.protocol.legacy, "create_snappy_message",
- return_value=sentinel.snappy_message):
- yield
-
- def test_create_message_set(self):
- messages = [(1, "k1"), (2, "k2"), (3, "k3")]
-
- # Default codec is CODEC_NONE. Expect list of regular messages.
- expect = [sentinel.message] * len(messages)
- with self.mock_create_message_fns():
- message_set = create_message_set(messages)
- self.assertEqual(message_set, expect)
-
- # CODEC_NONE: Expect list of regular messages.
- expect = [sentinel.message] * len(messages)
- with self.mock_create_message_fns():
- message_set = create_message_set(messages, CODEC_NONE)
- self.assertEqual(message_set, expect)
-
- # CODEC_GZIP: Expect list of one gzip-encoded message.
- expect = [sentinel.gzip_message]
- with self.mock_create_message_fns():
- message_set = create_message_set(messages, CODEC_GZIP)
- self.assertEqual(message_set, expect)
-
- # CODEC_SNAPPY: Expect list of one snappy-encoded message.
- expect = [sentinel.snappy_message]
- with self.mock_create_message_fns():
- message_set = create_message_set(messages, CODEC_SNAPPY)
- self.assertEqual(message_set, expect)
-
- # Unknown codec should raise UnsupportedCodecError.
- with self.assertRaises(UnsupportedCodecError):
- create_message_set(messages, -1)
diff --git a/test/test_util.py b/test/test_util.py
deleted file mode 100644
index a4dbaa5..0000000
--- a/test/test_util.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# -*- coding: utf-8 -*-
-import struct
-
-from kafka.vendor import six
-from . import unittest
-
-import kafka.errors
-import kafka.structs
-import kafka.util
-
-
-class UtilTest(unittest.TestCase):
- @unittest.skip("Unwritten")
- def test_relative_unpack(self):
- pass
-
- def test_write_int_string(self):
- self.assertEqual(
- kafka.util.write_int_string(b'some string'),
- b'\x00\x00\x00\x0bsome string'
- )
-
- def test_write_int_string__unicode(self):
- with self.assertRaises(TypeError) as cm:
- kafka.util.write_int_string(u'unicode')
- #: :type: TypeError
- te = cm.exception
- if six.PY2:
- self.assertIn('unicode', str(te))
- else:
- self.assertIn('str', str(te))
- self.assertIn('to be bytes', str(te))
-
- def test_write_int_string__empty(self):
- self.assertEqual(
- kafka.util.write_int_string(b''),
- b'\x00\x00\x00\x00'
- )
-
- def test_write_int_string__null(self):
- self.assertEqual(
- kafka.util.write_int_string(None),
- b'\xff\xff\xff\xff'
- )
-
- def test_read_short_string(self):
- self.assertEqual(kafka.util.read_short_string(b'\xff\xff', 0), (None, 2))
- self.assertEqual(kafka.util.read_short_string(b'\x00\x00', 0), (b'', 2))
- self.assertEqual(kafka.util.read_short_string(b'\x00\x0bsome string', 0), (b'some string', 13))
-
- def test_relative_unpack2(self):
- self.assertEqual(
- kafka.util.relative_unpack('>hh', b'\x00\x01\x00\x00\x02', 0),
- ((1, 0), 4)
- )
-
- def test_relative_unpack3(self):
- with self.assertRaises(kafka.errors.BufferUnderflowError):
- kafka.util.relative_unpack('>hh', '\x00', 0)
-
- def test_group_by_topic_and_partition(self):
- t = kafka.structs.TopicPartition
-
- l = [
- t("a", 1),
- t("a", 2),
- t("a", 3),
- t("b", 3),
- ]
-
- self.assertEqual(kafka.util.group_by_topic_and_partition(l), {
- "a": {
- 1: t("a", 1),
- 2: t("a", 2),
- 3: t("a", 3),
- },
- "b": {
- 3: t("b", 3),
- }
- })
-
- # should not be able to group duplicate topic-partitions
- t1 = t("a", 1)
- with self.assertRaises(AssertionError):
- kafka.util.group_by_topic_and_partition([t1, t1])
diff --git a/test/testutil.py b/test/testutil.py
index 650f9bf..77a6673 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -4,18 +4,6 @@ import os
import random
import string
import time
-import uuid
-
-import pytest
-from . import unittest
-
-from kafka import SimpleClient
-from kafka.errors import (
- LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
- NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- FailedPayloadsError
-)
-from kafka.structs import OffsetRequestPayload
def random_string(length):
@@ -32,21 +20,6 @@ def env_kafka_version():
return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
-def current_offset(client, topic, partition, kafka_broker=None):
- """Get the current offset of a topic's partition
- """
- try:
- offsets, = client.send_offset_request([OffsetRequestPayload(topic,
- partition, -1, 1)])
- except Exception:
- # XXX: We've seen some UnknownErrors here and can't debug w/o server logs
- if kafka_broker:
- kafka_broker.dump_logs()
- raise
- else:
- return offsets.offsets[0]
-
-
def assert_message_count(messages, num_messages):
"""Check that we received the expected number of messages with no duplicates."""
# Make sure we got them all
@@ -58,84 +31,6 @@ def assert_message_count(messages, num_messages):
assert len(unique_messages) == num_messages
-class KafkaIntegrationTestCase(unittest.TestCase):
- create_client = True
- topic = None
- zk = None
- server = None
-
- def setUp(self):
- super(KafkaIntegrationTestCase, self).setUp()
- if not os.environ.get('KAFKA_VERSION'):
- self.skipTest('Integration test requires KAFKA_VERSION')
-
- if not self.topic:
- topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
- self.topic = topic
-
- if self.create_client:
- self.client = SimpleClient('%s:%d' % (self.server.host, self.server.port))
-
- timeout = time.time() + 30
- while time.time() < timeout:
- try:
- self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
- if self.client.has_metadata_for_topic(topic):
- break
- except (LeaderNotAvailableError, InvalidTopicError):
- time.sleep(1)
- else:
- raise KafkaTimeoutError('Timeout loading topic metadata!')
-
-
- # Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors
- # TODO: It might be a good idea to move this to self.client.ensure_topic_exists
- for partition in self.client.get_partition_ids_for_topic(self.topic):
- while True:
- try:
- req = OffsetRequestPayload(self.topic, partition, -1, 100)
- self.client.send_offset_request([req])
- break
- except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e:
- if time.time() > timeout:
- raise KafkaTimeoutError('Timeout loading topic metadata!')
- time.sleep(.1)
-
- self._messages = {}
-
- def tearDown(self):
- super(KafkaIntegrationTestCase, self).tearDown()
- if not os.environ.get('KAFKA_VERSION'):
- return
-
- if self.create_client:
- self.client.close()
-
- def current_offset(self, topic, partition):
- try:
- offsets, = self.client.send_offset_request([OffsetRequestPayload(topic,
- partition, -1, 1)])
- except Exception:
- # XXX: We've seen some UnknownErrors here and can't debug w/o server logs
- self.zk.child.dump_logs()
- self.server.child.dump_logs()
- raise
- else:
- return offsets.offsets[0]
-
- def msgs(self, iterable):
- return [self.msg(x) for x in iterable]
-
- def msg(self, s):
- if s not in self._messages:
- self._messages[s] = '%s-%s-%s' % (s, self.id(), str(uuid.uuid4()))
-
- return self._messages[s].encode('utf-8')
-
- def key(self, k):
- return k.encode('utf-8')
-
-
class Timer(object):
def __enter__(self):
self.start = time.time()
diff --git a/tox.ini b/tox.ini
index 14255d0..06403d6 100644
--- a/tox.ini
+++ b/tox.ini
@@ -3,7 +3,6 @@ envlist = py{26,27,34,35,36,37,py}, docs
[pytest]
testpaths = kafka test
-doctest_optionflags = modules
addopts = --durations=10
log_format = %(created)f %(filename)-23s %(threadName)s %(message)s
@@ -19,7 +18,6 @@ deps =
lz4
xxhash
crc32c
- py26: unittest2
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =