summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Smith <ansmith@redhat.com>2018-09-17 13:21:42 -0400
committerAndy Smith <ansmith@redhat.com>2018-12-04 11:25:07 -0500
commit5a842ae15582e4eedfb1b2510eaf4a8997701f58 (patch)
tree283024cba8b0b6b736f85978aba5568280af1d51
parent252844879d9e34c81b1777a92ad4407fab4a6853 (diff)
downloadoslo-messaging-5a842ae15582e4eedfb1b2510eaf4a8997701f58.tar.gz
Switch driver to confluent-kafka client library
This patch switches the kafka python client from kafka-python to confluent-kafka due to documented threading issues with the kafka-python consumer and the recommendation to use multiplrocessing. The confluent-kafka client leverages the high performance librdkafka C client and is safe for multiple thread use. This patch: * switches to confluent-kafka library * revises consumer and producer message operations * utilizes event.tpool method for confluent-kafka blocking calls * updates unit tests * adds kafka specific timeouts for functional tests * adds release note Depends-On: Ice374dca539b8ed1b1965b75379bad5140121483 Change-Id: Idfb9fe3700d882c8285c6dc56b0620951178eba2
-rw-r--r--.zuul.yaml5
-rw-r--r--bindep.txt2
-rw-r--r--doc/requirements.txt2
-rw-r--r--lower-constraints.txt2
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py245
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py59
-rw-r--r--oslo_messaging/tests/functional/test_functional.py41
-rw-r--r--oslo_messaging/tests/functional/utils.py3
-rw-r--r--releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml13
-rwxr-xr-xsetup-test-env-kafka.sh2
-rw-r--r--setup.cfg3
-rw-r--r--test-requirements.txt3
12 files changed, 222 insertions, 158 deletions
diff --git a/.zuul.yaml b/.zuul.yaml
index 0f7bfad..bf8704f 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -77,7 +77,7 @@
devstack_plugins:
devstack-plugin-amqp1: git://git.openstack.org/openstack/devstack-plugin-amqp1
zuul_copy_output:
- '{{ devstack_base_dir }}/logs/qdrouterd.log': logs
+ '{{ devstack_log_dir }}/qdrouterd.log': logs
- job:
@@ -102,8 +102,7 @@
devstack_plugins:
devstack-plugin-kafka: git://git.openstack.org/openstack/devstack-plugin-kafka
zuul_copy_output:
- '{{ devstack_base_dir }}/logs/qdrouterd.log': logs
-
+ '{{ devstack_log_dir }}/server.log': logs
- job:
name: oslo.messaging-src-dsvm-full-kafka-centos-7
diff --git a/bindep.txt b/bindep.txt
index 36f4ccd..c5ad6cb 100644
--- a/bindep.txt
+++ b/bindep.txt
@@ -32,6 +32,8 @@ swig [platform:rpm amqp1]
# kafka dpkg
openjdk-8-jdk [platform:dpkg kafka]
+librdkafka1 [platform:dpkg kafka]
# kafka rpm
java-1.8.0-openjdk [platform:rpm kafka]
+librdkafka [platform:rpm kafka]
diff --git a/doc/requirements.txt b/doc/requirements.txt
index 5d126f7..9df2d5b 100644
--- a/doc/requirements.txt
+++ b/doc/requirements.txt
@@ -8,6 +8,6 @@ reno>=2.5.0 # Apache-2.0
# imported when the source code is parsed for generating documentation:
fixtures>=3.0.0 # Apache-2.0/BSD
-kafka-python>=1.3.1 # Apache-2.0
+confluent-kafka>=0.11.6 # Apache-2.0
pyngus>=2.2.0 # Apache-2.0
tenacity>=3.2.1 # Apache-2.0
diff --git a/lower-constraints.txt b/lower-constraints.txt
index ef88244..a7e932d 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -7,6 +7,7 @@ cachetools==2.0.0
cffi==1.7.0
cliff==2.8.0
cmd2==0.8.0
+confluent-kafka==0.11.6
contextlib2==0.4.0
coverage==4.0
debtcollector==1.2.0
@@ -26,7 +27,6 @@ hacking==0.12.0
imagesize==0.7.1
iso8601==0.1.11
Jinja2==2.10
-kafka-python==1.3.1
keystoneauth1==3.4.0
kombu==4.0.0
linecache2==1.0.0
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index 20b97ca..1871be1 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -12,54 +12,30 @@
# License for the specific language governing permissions and limitations
# under the License.
-# Following code fixes 2 issues with kafka-python and
-# The current release of eventlet (0.19.0) does not actually remove
-# select.poll [1]. Because of kafka-python.selectors34 selects
-# PollSelector instead of SelectSelector [2]. PollSelector relies on
-# select.poll, which does not work when eventlet/greenlet is used. This
-# bug in evenlet is fixed in the master branch [3], but there's no
-# release of eventlet that includes this fix at this point.
-
-import json
+import logging
import threading
-import kafka
-from kafka.client_async import selectors
-import kafka.errors
-from oslo_log import log as logging
+import confluent_kafka
+from confluent_kafka import KafkaException
+from oslo_serialization import jsonutils
from oslo_utils import eventletutils
-import tenacity
+from oslo_utils import importutils
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._i18n import _LE
-from oslo_messaging._i18n import _LW
-from oslo_serialization import jsonutils
-
-import logging as l
-l.basicConfig(level=l.INFO)
-l.getLogger("kafka").setLevel(l.WARN)
-l.getLogger("stevedore").setLevel(l.WARN)
-if eventletutils.is_monkey_patched('select'):
- # monkeypatch the vendored SelectSelector._select like eventlet does
- # https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
- from eventlet.green import select
- selectors.SelectSelector._select = staticmethod(select.select)
-
- # Force to use the select selectors
- KAFKA_SELECTOR = selectors.SelectSelector
-else:
- KAFKA_SELECTOR = selectors.DefaultSelector
+if eventletutils.EVENTLET_AVAILABLE:
+ tpool = importutils.try_import('eventlet.tpool')
LOG = logging.getLogger(__name__)
def unpack_message(msg):
+ """Unpack context and msg."""
context = {}
message = None
- msg = json.loads(msg)
+ msg = jsonutils.loads(msg)
message = driver_common.deserialize_msg(msg)
context = message['_context']
del message['_context']
@@ -68,7 +44,6 @@ def unpack_message(msg):
def pack_message(ctxt, msg):
"""Pack context into msg."""
-
if isinstance(ctxt, dict):
context_d = ctxt
else:
@@ -97,25 +72,28 @@ def target_to_topic(target, priority=None, vhost=None):
return concat(".", [target.topic, priority, vhost])
-def retry_on_retriable_kafka_error(exc):
- return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable)
+class ConsumerTimeout(KafkaException):
+ pass
-def with_reconnect(retries=None):
- def decorator(func):
- @tenacity.retry(
- retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error),
- wait=tenacity.wait_fixed(1),
- stop=tenacity.stop_after_attempt(retries),
- reraise=True
- )
- def wrapper(*args, **kwargs):
- return func(*args, **kwargs)
- return wrapper
- return decorator
+class AssignedPartition(object):
+ """This class is used by the ConsumerConnection to track the
+ assigned partitions.
+ """
+ def __init__(self, topic, partition):
+ super(AssignedPartition, self).__init__()
+ self.topic = topic
+ self.partition = partition
+ self.skey = '%s %d' % (self.topic, self.partition)
+
+ def to_dict(self):
+ return {'topic': self.topic, 'partition': self.partition}
class Connection(object):
+ """This is the base class for consumer and producer connections for
+ transport attributes.
+ """
def __init__(self, conf, url):
@@ -141,7 +119,7 @@ class Connection(object):
self.password = host.password
else:
if self.username != host.username:
- LOG.warning(_LW("Different transport usernames detected"))
+ LOG.warning("Different transport usernames detected")
if host.hostname:
self.hostaddrs.append("%s:%s" % (host.hostname, host.port))
@@ -152,7 +130,8 @@ class Connection(object):
class ConsumerConnection(Connection):
-
+ """This is the class for kafka topic/assigned partition consumer
+ """
def __init__(self, conf, url):
super(ConsumerConnection, self).__init__(conf, url)
@@ -160,28 +139,59 @@ class ConsumerConnection(Connection):
self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
self.group_id = self.driver_conf.consumer_group
- self.enable_auto_commit = self.driver_conf.enable_auto_commit
+ self.use_auto_commit = self.driver_conf.enable_auto_commit
self.max_poll_records = self.driver_conf.max_poll_records
self._consume_loop_stopped = False
+ self.assignment_dict = dict()
+
+ def find_assignment(self, topic, partition):
+ """Find and return existing assignment based on topic and partition"""
+ skey = '%s %d' % (topic, partition)
+ return self.assignment_dict.get(skey)
+
+ def on_assign(self, consumer, topic_partitions):
+ """Rebalance on_assign callback"""
+ assignment = [AssignedPartition(p.topic, p.partition)
+ for p in topic_partitions]
+ self.assignment_dict = {a.skey: a for a in assignment}
+ for t in topic_partitions:
+ LOG.debug("Topic %s assigned to partition %d",
+ t.topic, t.partition)
+
+ def on_revoke(self, consumer, topic_partitions):
+ """Rebalance on_revoke callback"""
+ self.assignment_dict = dict()
+ for t in topic_partitions:
+ LOG.debug("Topic %s revoked from partition %d",
+ t.topic, t.partition)
- @with_reconnect()
def _poll_messages(self, timeout):
- messages = self.consumer.poll(timeout * 1000.0)
- messages = [record.value
- for records in messages.values() if records
- for record in records]
- if not messages:
- # NOTE(sileht): really ? you return payload but no messages...
- # simulate timeout to consume message again
- raise kafka.errors.ConsumerNoMoreData()
-
- if not self.enable_auto_commit:
- self.consumer.commit()
+ """Consume messages, callbacks and return list of messages"""
+ msglist = self.consumer.consume(self.max_poll_records,
+ timeout)
+
+ if ((len(self.assignment_dict) == 0) or (len(msglist) == 0)):
+ raise ConsumerTimeout()
+
+ messages = []
+ for message in msglist:
+ if message is None:
+ break
+ a = self.find_assignment(message.topic(), message.partition())
+ if a is None:
+ LOG.warning(("Message for %s received on unassigned "
+ "partition %d"),
+ message.topic(), message.partition())
+ else:
+ messages.append(message.value())
+
+ if not self.use_auto_commit:
+ self.consumer.commit(asynchronous=False)
return messages
def consume(self, timeout=None):
- """Receive up to 'max_fetch_messages' messages.
+ """Receive messages.
:param timeout: poll timeout in seconds
"""
@@ -199,12 +209,14 @@ class ConsumerConnection(Connection):
if self._consume_loop_stopped:
return
try:
+ if eventletutils.is_monkey_patched('thread'):
+ return tpool.execute(self._poll_messages, poll_timeout)
return self._poll_messages(poll_timeout)
- except kafka.errors.ConsumerNoMoreData as exc:
+ except ConsumerTimeout as exc:
poll_timeout = timer.check_return(
_raise_timeout, exc, maximum=self.consumer_timeout)
except Exception:
- LOG.exception(_LE("Failed to consume messages"))
+ LOG.exception("Failed to consume messages")
return
def stop_consuming(self):
@@ -215,21 +227,25 @@ class ConsumerConnection(Connection):
self.consumer.close()
self.consumer = None
- @with_reconnect()
def declare_topic_consumer(self, topics, group=None):
- self.consumer = kafka.KafkaConsumer(
- *topics, group_id=(group or self.group_id),
- enable_auto_commit=self.enable_auto_commit,
- bootstrap_servers=self.hostaddrs,
- max_partition_fetch_bytes=self.max_fetch_bytes,
- max_poll_records=self.max_poll_records,
- security_protocol=self.security_protocol,
- sasl_mechanism=self.sasl_mechanism,
- sasl_plain_username=self.username,
- sasl_plain_password=self.password,
- ssl_cafile=self.ssl_cafile,
- selector=KAFKA_SELECTOR
- )
+ conf = {
+ 'bootstrap.servers': ",".join(self.hostaddrs),
+ 'group.id': (group or self.group_id),
+ 'enable.auto.commit': self.use_auto_commit,
+ 'max.partition.fetch.bytes': self.max_fetch_bytes,
+ 'security.protocol': self.security_protocol,
+ 'sasl.mechanism': self.sasl_mechanism,
+ 'sasl.username': self.username,
+ 'sasl.password': self.password,
+ 'ssl.ca.location': self.ssl_cafile,
+ 'enable.partition.eof': False,
+ 'default.topic.config': {'auto.offset.reset': 'latest'}
+ }
+ LOG.debug("Subscribing to %s as %s", topics, (group or self.group_id))
+ self.consumer = confluent_kafka.Consumer(conf)
+ self.consumer.subscribe(topics,
+ on_assign=self.on_assign,
+ on_revoke=self.on_revoke)
class ProducerConnection(Connection):
@@ -242,6 +258,20 @@ class ProducerConnection(Connection):
self.producer = None
self.producer_lock = threading.Lock()
+ def _produce_message(self, topic, message):
+ while True:
+ try:
+ self.producer.produce(topic, message)
+ except KafkaException as e:
+ LOG.error("Produce message failed: %s" % str(e))
+ except BufferError:
+ LOG.debug("Produce message queue full, waiting for deliveries")
+ self.producer.poll(0.5)
+ continue
+ break
+
+ self.producer.poll(0)
+
def notify_send(self, topic, ctxt, msg, retry):
"""Send messages to Kafka broker.
@@ -254,16 +284,11 @@ class ProducerConnection(Connection):
message = pack_message(ctxt, msg)
message = jsonutils.dumps(message).encode('utf-8')
- @with_reconnect(retries=retry)
- def wrapped_with_reconnect():
- self._ensure_producer()
- # NOTE(sileht): This returns a future, we can use get()
- # if we want to block like other driver
- future = self.producer.send(topic, message)
- future.get()
-
try:
- wrapped_with_reconnect()
+ self._ensure_producer()
+ if eventletutils.is_monkey_patched('thread'):
+ return tpool.execute(self._produce_message, topic, message)
+ return self._produce_message(topic, message)
except Exception:
# NOTE(sileht): if something goes wrong close the producer
# connection
@@ -276,7 +301,10 @@ class ProducerConnection(Connection):
def _close_producer(self):
with self.producer_lock:
if self.producer:
- self.producer.close()
+ try:
+ self.producer.flush()
+ except KafkaException:
+ LOG.error("Flush error during producer close")
self.producer = None
def _ensure_producer(self):
@@ -285,16 +313,17 @@ class ProducerConnection(Connection):
with self.producer_lock:
if self.producer:
return
- self.producer = kafka.KafkaProducer(
- bootstrap_servers=self.hostaddrs,
- linger_ms=self.linger_ms,
- batch_size=self.batch_size,
- security_protocol=self.security_protocol,
- sasl_mechanism=self.sasl_mechanism,
- sasl_plain_username=self.username,
- sasl_plain_password=self.password,
- ssl_cafile=self.ssl_cafile,
- selector=KAFKA_SELECTOR)
+ conf = {
+ 'bootstrap.servers': ",".join(self.hostaddrs),
+ 'linger.ms': self.linger_ms,
+ 'batch.num.messages': self.batch_size,
+ 'security.protocol': self.security_protocol,
+ 'sasl.mechanism': self.sasl_mechanism,
+ 'sasl.username': self.username,
+ 'sasl.password': self.password,
+ 'ssl.ca.location': self.ssl_cafile
+ }
+ self.producer = confluent_kafka.Producer(conf)
class OsloKafkaMessage(base.RpcIncomingMessage):
@@ -303,13 +332,13 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
super(OsloKafkaMessage, self).__init__(ctxt, message)
def requeue(self):
- LOG.warning(_LW("requeue is not supported"))
+ LOG.warning("requeue is not supported")
def reply(self, reply=None, failure=None):
- LOG.warning(_LW("reply is not supported"))
+ LOG.warning("reply is not supported")
def heartbeat(self):
- LOG.warning(_LW("heartbeat is not supported"))
+ LOG.warning("heartbeat is not supported")
class KafkaListener(base.PollStyleListener):
@@ -347,8 +376,9 @@ class KafkaListener(base.PollStyleListener):
class KafkaDriver(base.BaseDriver):
- """Note: Current implementation of this driver is experimental.
- We will have functional and/or integrated testing enabled for this driver.
+ """Kafka Driver
+
+ Note: Current implementation of this driver is experimental.
"""
def __init__(self, conf, url, default_exchange=None,
@@ -366,6 +396,7 @@ class KafkaDriver(base.BaseDriver):
for c in self.listeners:
c.close()
self.listeners = []
+ LOG.info("Kafka messaging driver shutdown")
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None):
@@ -414,9 +445,9 @@ class KafkaDriver(base.BaseDriver):
:type pool: string
"""
conn = ConsumerConnection(self.conf, self._url)
- topics = set()
+ topics = []
for target, priority in targets_and_priorities:
- topics.add(target_to_topic(target, priority))
+ topics.append(target_to_topic(target, priority))
conn.declare_topic_consumer(topics, pool)
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index dff6176..80af576 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -11,8 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import kafka
-import kafka.errors
from six.moves import mock
import testscenarios
@@ -77,6 +75,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
self.addCleanup(transport.cleanup)
driver = transport._driver
+ self.assertIsInstance(driver, kafka_driver.KafkaDriver)
self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
self.assertEqual(self.expected['username'], driver.pconn.username)
self.assertEqual(self.expected['password'], driver.pconn.password)
@@ -101,14 +100,20 @@ class TestKafkaDriver(test_utils.BaseTestCase):
def test_send_notification(self):
target = oslo_messaging.Target(topic="topic_test")
- with mock.patch("kafka.KafkaProducer") as fake_producer_class:
- fake_producer = fake_producer_class.return_value
- fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable
- self.assertRaises(kafka.errors.NoBrokersAvailable,
- self.driver.send_notification,
- target, {}, {"payload": ["test_1"]},
- None, retry=3)
- self.assertEqual(3, fake_producer.send.call_count)
+ with mock.patch("confluent_kafka.Producer") as producer:
+ self.driver.send_notification(
+ target, {}, {"payload": ["test_1"]},
+ None, retry=3)
+ producer.assert_called_once_with({
+ 'bootstrap.servers': '',
+ 'linger.ms': mock.ANY,
+ 'batch.num.messages': mock.ANY,
+ 'security.protocol': 'PLAINTEXT',
+ 'sasl.mechanism': 'PLAIN',
+ 'sasl.username': mock.ANY,
+ 'sasl.password': mock.ANY,
+ 'ssl.ca.location': ''
+ })
def test_listen(self):
target = oslo_messaging.Target(topic="topic_test")
@@ -119,23 +124,22 @@ class TestKafkaDriver(test_utils.BaseTestCase):
targets_and_priorities = [
(oslo_messaging.Target(topic="topic_test_1"), "sample"),
]
- expected_topics = ["topic_test_1.sample"]
- with mock.patch("kafka.KafkaConsumer") as consumer:
+ with mock.patch("confluent_kafka.Consumer") as consumer:
self.driver.listen_for_notifications(
targets_and_priorities, "kafka_test", 1000, 10)
- consumer.assert_called_once_with(
- *expected_topics, group_id="kafka_test",
- enable_auto_commit=mock.ANY,
- bootstrap_servers=[],
- max_partition_fetch_bytes=mock.ANY,
- max_poll_records=mock.ANY,
- security_protocol='PLAINTEXT',
- sasl_mechanism='PLAIN',
- sasl_plain_username=mock.ANY,
- sasl_plain_password=mock.ANY,
- ssl_cafile='',
- selector=mock.ANY
- )
+ consumer.assert_called_once_with({
+ 'bootstrap.servers': '',
+ 'enable.partition.eof': False,
+ 'group.id': 'kafka_test',
+ 'enable.auto.commit': mock.ANY,
+ 'max.partition.fetch.bytes': mock.ANY,
+ 'security.protocol': 'PLAINTEXT',
+ 'sasl.mechanism': 'PLAIN',
+ 'sasl.username': mock.ANY,
+ 'sasl.password': mock.ANY,
+ 'ssl.ca.location': '',
+ 'default.topic.config': {'auto.offset.reset': 'latest'}
+ })
def test_cleanup(self):
listeners = [mock.MagicMock(), mock.MagicMock()]
@@ -155,10 +159,9 @@ class TestKafkaConnection(test_utils.BaseTestCase):
def test_notify(self):
- with mock.patch("kafka.KafkaProducer") as fake_producer_class:
- fake_producer = fake_producer_class.return_value
+ with mock.patch("confluent_kafka.Producer") as producer:
self.driver.pconn.notify_send("fake_topic",
{"fake_ctxt": "fake_param"},
{"fake_text": "fake_message_1"},
10)
- self.assertEqual(2, len(fake_producer.send.mock_calls))
+ assert producer.call_count == 1
diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py
index 6e13577..0ce17ed 100644
--- a/oslo_messaging/tests/functional/test_functional.py
+++ b/oslo_messaging/tests/functional/test_functional.py
@@ -328,8 +328,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# NOTE(sileht): Each test must not use the same topics
# to be run in parallel
+ # NOTE(ansmith): kafka partition assignment delay requires
+ # longer timeouts for test completion
+
def test_simple(self):
+ get_timeout = 1
if self.url.startswith("kafka://"):
+ get_timeout = 5
self.conf.set_override('consumer_group', 'test_simple',
group='oslo_messaging_kafka')
@@ -338,14 +343,16 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
notifier = listener.notifier('abc')
notifier.info({}, 'test', 'Hello World!')
- event = listener.events.get(timeout=1)
+ event = listener.events.get(timeout=get_timeout)
self.assertEqual('info', event[0])
self.assertEqual('test', event[1])
self.assertEqual('Hello World!', event[2])
self.assertEqual('abc', event[3])
def test_multiple_topics(self):
+ get_timeout = 1
if self.url.startswith("kafka://"):
+ get_timeout = 5
self.conf.set_override('consumer_group', 'test_multiple_topics',
group='oslo_messaging_kafka')
@@ -363,7 +370,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
received = {}
while len(received) < len(sent):
- e = listener.events.get(timeout=1)
+ e = listener.events.get(timeout=get_timeout)
received[e[3]] = e
for key in received:
@@ -374,10 +381,15 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(expected[2], actual[2])
def test_multiple_servers(self):
+ timeout = 0.5
if self.url.startswith("amqp:"):
self.skipTest("QPID-6307")
- if self.url.startswith("kafka"):
- self.skipTest("Kafka: Need to be fixed")
+ if self.url.startswith("kafka://"):
+ self.skipTest("Kafka: needs to be fixed")
+ timeout = 5
+ self.conf.set_override('consumer_group',
+ 'test_multiple_servers',
+ group='oslo_messaging_kafka')
listener_a = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['test-topic']))
@@ -391,15 +403,17 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
for event_type, payload in events_out:
n.info({}, event_type, payload)
- events_in = [[(e[1], e[2]) for e in listener_a.get_events()],
- [(e[1], e[2]) for e in listener_b.get_events()]]
+ events_in = [[(e[1], e[2]) for e in listener_a.get_events(timeout)],
+ [(e[1], e[2]) for e in listener_b.get_events(timeout)]]
self.assertThat(events_in, utils.IsValidDistributionOf(events_out))
for stream in events_in:
self.assertThat(len(stream), matchers.GreaterThan(0))
def test_independent_topics(self):
+ get_timeout = 0.5
if self.url.startswith("kafka://"):
+ get_timeout = 5
self.conf.set_override('consumer_group',
'test_independent_topics_a',
group='oslo_messaging_kafka')
@@ -425,7 +439,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
b.info({}, event_type, payload)
def check_received(listener, publisher, messages):
- actuals = sorted([listener.events.get(timeout=0.5)
+ actuals = sorted([listener.events.get(timeout=get_timeout)
for __ in range(len(a_out))])
expected = sorted([['info', m[0], m[1], publisher]
for m in messages])
@@ -435,7 +449,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
check_received(listener_b, "pub-2", b_out)
def test_all_categories(self):
+ get_timeout = 1
if self.url.startswith("kafka://"):
+ get_timeout = 5
self.conf.set_override('consumer_group', 'test_all_categories',
group='oslo_messaging_kafka')
@@ -451,7 +467,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# order between events with different categories is not guaranteed
received = {}
for expected in events:
- e = listener.events.get(timeout=1)
+ e = listener.events.get(timeout=get_timeout)
received[e[0]] = e
for expected in events:
@@ -461,6 +477,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(expected[3], actual[2])
def test_simple_batch(self):
+ get_timeout = 3
+ batch_timeout = 2
if self.url.startswith("amqp:"):
backend = os.environ.get("AMQP1_BACKEND")
if backend == "qdrouterd":
@@ -468,18 +486,21 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# sender pends until batch_size or timeout reached
self.skipTest("qdrouterd backend")
if self.url.startswith("kafka://"):
+ get_timeout = 10
+ batch_timeout = 5
self.conf.set_override('consumer_group', 'test_simple_batch',
group='oslo_messaging_kafka')
listener = self.useFixture(
utils.BatchNotificationFixture(self.conf, self.url,
['test_simple_batch'],
- batch_size=100, batch_timeout=2))
+ batch_size=100,
+ batch_timeout=batch_timeout))
notifier = listener.notifier('abc')
for i in six.moves.range(0, 205):
notifier.info({}, 'test%s' % i, 'Hello World!')
- events = listener.get_events(timeout=3)
+ events = listener.get_events(timeout=get_timeout)
self.assertEqual(3, len(events))
self.assertEqual(100, len(events[0][1]))
self.assertEqual(100, len(events[1][1]))
diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py
index fcebba8..4d403a0 100644
--- a/oslo_messaging/tests/functional/utils.py
+++ b/oslo_messaging/tests/functional/utils.py
@@ -313,9 +313,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
kafka_options.register_opts(conf, transport_url)
- self.config(producer_batch_size=0,
- group='oslo_messaging_kafka')
-
class NotificationFixture(fixtures.Fixture):
def __init__(self, conf, url, topics, batch=None):
diff --git a/releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml b/releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml
new file mode 100644
index 0000000..ed2fcae
--- /dev/null
+++ b/releasenotes/notes/kafka-client-library-change-fe16d5a34550db7f.yaml
@@ -0,0 +1,13 @@
+---
+fixes:
+ - |
+ Threading issues with the kafka-python consumer client were identified
+ and documented. The driver has been updated to integrate the
+ confluent-kafka python library. The confluent-kafka client
+ leverages the high performance librdkafka C client and is safe
+ for multiple thread use.
+upgrade:
+ - |
+ With the change in the client library used, projects using the
+ Kafka driver should use extras oslo.messaging[kafka] to pull in
+ dependencies for the driver.
diff --git a/setup-test-env-kafka.sh b/setup-test-env-kafka.sh
index 8d58cdb..98f1d91 100755
--- a/setup-test-env-kafka.sh
+++ b/setup-test-env-kafka.sh
@@ -4,7 +4,7 @@ set -e
. tools/functions.sh
SCALA_VERSION=${SCALA_VERSION:-"2.12"}
-KAFKA_VERSION=${KAFKA_VERSION:-"1.1.0"}
+KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"}
if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then
DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX)
diff --git a/setup.cfg b/setup.cfg
index 3c03ffa..c431072 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -25,8 +25,7 @@ classifier =
amqp1 =
pyngus>=2.2.0 # Apache-2.0
kafka =
- kafka-python>=1.3.1 # Apache-2.0
- tenacity>=4.4.0 # Apache-2.0
+ confluent-kafka>=0.11.6 # Apache-2.0
[files]
packages =
diff --git a/test-requirements.txt b/test-requirements.txt
index 22636cb..25b019b 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -14,8 +14,7 @@ oslotest>=3.2.0 # Apache-2.0
pifpaf>=0.10.0 # Apache-2.0
# for test_impl_kafka
-tenacity>=4.4.0 # Apache-2.0
-kafka-python>=1.3.1 # Apache-2.0
+confluent-kafka>=0.11.6 # Apache-2.0
# when we can require tox>= 1.4, this can go into tox.ini:
# [testenv:cover]