summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Smith <ansmith@redhat.com>2017-12-27 11:09:02 -0500
committerAndrew Smith <ansmith@redhat.com>2017-12-27 12:57:39 -0500
commit1ccdccddaa838a4715fd21e8eb67e207d172c240 (patch)
tree022d7d6ecdb0c7e3beb3538319447a0f9f0ec33a
parente43240168f9c4cbafb98c1ee4eff927ecc5d24f8 (diff)
downloadoslo-messaging-1ccdccddaa838a4715fd21e8eb67e207d172c240.tar.gz
Add kafka driver vhost emulation5.35.0
Emulate vhost support by adding the virtual host name to the topic created on the kafka server. Also, update connection management for producer/consumer. This patch: * updates target to topic generation * add consumer and producer connection classes * remove connection pool * update driver test Change-Id: Idd164444c04e9f465a43ee909af840a41bb090c0
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py159
-rw-r--r--oslo_messaging/_drivers/kafka_driver/kafka_options.py6
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py30
-rw-r--r--tox.ini2
4 files changed, 112 insertions, 85 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index deb5da0..676b1ad 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -33,7 +33,6 @@ import tenacity
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._drivers import pool as driver_pool
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LW
from oslo_serialization import jsonutils
@@ -81,17 +80,21 @@ def pack_message(ctxt, msg):
return msg
-def target_to_topic(target, priority=None):
+def concat(sep, items):
+ return sep.join(filter(bool, items))
+
+
+def target_to_topic(target, priority=None, vhost=None):
"""Convert target into topic string
:param target: Message destination target
:type target: oslo_messaging.Target
:param priority: Notification priority
:type priority: string
+ :param priority: Notification vhost
+ :type priority: string
"""
- if not priority:
- return target.topic
- return target.topic + '.' + priority
+ return concat(".", [target.topic, priority, vhost])
def retry_on_retriable_kafka_error(exc):
@@ -114,22 +117,12 @@ def with_reconnect(retries=None):
class Connection(object):
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url):
- self.client = None
- driver_conf = conf.oslo_messaging_kafka
- self.batch_size = driver_conf.producer_batch_size
- self.linger_ms = driver_conf.producer_batch_timeout * 1000
self.conf = conf
- self.producer = None
- self.producer_lock = threading.Lock()
- self.consumer = None
- self.consumer_timeout = driver_conf.kafka_consumer_timeout
- self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
- self.group_id = driver_conf.consumer_group
self.url = url
+ self.virtual_host = url.virtual_host
self._parse_url()
- self._consume_loop_stopped = False
def _parse_url(self):
driver_conf = self.conf.oslo_messaging_kafka
@@ -145,33 +138,22 @@ class Connection(object):
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
driver_conf.kafka_default_port))
- def notify_send(self, topic, ctxt, msg, retry):
- """Send messages to Kafka broker.
+ def reset(self):
+ """Reset a connection so it can be used again."""
+ pass
- :param topic: String of the topic
- :param ctxt: context for the messages
- :param msg: messages for publishing
- :param retry: the number of retry
- """
- retry = retry if retry >= 0 else None
- message = pack_message(ctxt, msg)
- message = jsonutils.dumps(message)
- @with_reconnect(retries=retry)
- def wrapped_with_reconnect():
- self._ensure_producer()
- # NOTE(sileht): This returns a future, we can use get()
- # if we want to block like other driver
- future = self.producer.send(topic, message)
- future.get()
+class ConsumerConnection(Connection):
- try:
- wrapped_with_reconnect()
- except Exception:
- # NOTE(sileht): if something goes wrong close the producer
- # connection
- self._close_producer()
- raise
+ def __init__(self, conf, url):
+
+ super(ConsumerConnection, self).__init__(conf, url)
+ driver_conf = self.conf.oslo_messaging_kafka
+ self.consumer = None
+ self.consumer_timeout = driver_conf.kafka_consumer_timeout
+ self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
+ self.group_id = driver_conf.consumer_group
+ self._consume_loop_stopped = False
@with_reconnect()
def _poll_messages(self, timeout):
@@ -215,16 +197,67 @@ class Connection(object):
def stop_consuming(self):
self._consume_loop_stopped = True
- def reset(self):
- """Reset a connection so it can be used again."""
- pass
-
def close(self):
- self._close_producer()
if self.consumer:
self.consumer.close()
self.consumer = None
+ @with_reconnect()
+ def declare_topic_consumer(self, topics, group=None):
+ # TODO(Support for manual/auto_commit functionality)
+ # When auto_commit is False, consumer can manually notify
+ # the completion of the subscription.
+ # Currently we don't support for non auto commit option
+ self.consumer = kafka.KafkaConsumer(
+ *topics, group_id=(group or self.group_id),
+ bootstrap_servers=self.hostaddrs,
+ max_partition_fetch_bytes=self.max_fetch_bytes,
+ selector=KAFKA_SELECTOR
+ )
+
+
+class ProducerConnection(Connection):
+
+ def __init__(self, conf, url):
+
+ super(ProducerConnection, self).__init__(conf, url)
+ driver_conf = self.conf.oslo_messaging_kafka
+ self.batch_size = driver_conf.producer_batch_size
+ self.linger_ms = driver_conf.producer_batch_timeout * 1000
+ self.producer = None
+ self.producer_lock = threading.Lock()
+
+ def notify_send(self, topic, ctxt, msg, retry):
+ """Send messages to Kafka broker.
+
+ :param topic: String of the topic
+ :param ctxt: context for the messages
+ :param msg: messages for publishing
+ :param retry: the number of retry
+ """
+ retry = retry if retry >= 0 else None
+ message = pack_message(ctxt, msg)
+ message = jsonutils.dumps(message)
+
+ @with_reconnect(retries=retry)
+ def wrapped_with_reconnect():
+ self._ensure_producer()
+ # NOTE(sileht): This returns a future, we can use get()
+ # if we want to block like other driver
+ future = self.producer.send(topic, message)
+ future.get()
+
+ try:
+ wrapped_with_reconnect()
+ except Exception:
+ # NOTE(sileht): if something goes wrong close the producer
+ # connection
+ self._close_producer()
+ raise
+
+ def close(self):
+ self._close_producer()
+
def _close_producer(self):
with self.producer_lock:
if self.producer:
@@ -243,19 +276,6 @@ class Connection(object):
batch_size=self.batch_size,
selector=KAFKA_SELECTOR)
- @with_reconnect()
- def declare_topic_consumer(self, topics, group=None):
- # TODO(Support for manual/auto_commit functionality)
- # When auto_commit is False, consumer can manually notify
- # the completion of the subscription.
- # Currently we don't support for non auto commit option
- self.consumer = kafka.KafkaConsumer(
- *topics, group_id=(group or self.group_id),
- bootstrap_servers=self.hostaddrs,
- max_partition_fetch_bytes=self.max_fetch_bytes,
- selector=KAFKA_SELECTOR
- )
-
class OsloKafkaMessage(base.RpcIncomingMessage):
@@ -314,17 +334,12 @@ class KafkaDriver(base.BaseDriver):
super(KafkaDriver, self).__init__(
conf, url, default_exchange, allowed_remote_exmods)
- # the pool configuration properties
- max_size = self.conf.oslo_messaging_kafka.pool_size
- min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
- ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl
-
- self.connection_pool = driver_pool.ConnectionPool(
- self.conf, max_size, min_size, ttl,
- self._url, Connection)
self.listeners = []
+ self.virtual_host = url.virtual_host
+ self.pconn = ProducerConnection(conf, url)
def cleanup(self):
+ self.pconn.close()
for c in self.listeners:
c.close()
self.listeners = []
@@ -351,8 +366,9 @@ class KafkaDriver(base.BaseDriver):
N means N retries
:type retry: int
"""
- with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn:
- conn.notify_send(target_to_topic(target), ctxt, message, retry)
+ self.pconn.notify_send(target_to_topic(target,
+ vhost=self.virtual_host),
+ ctxt, message, retry)
def listen(self, target, batch_size, batch_timeout):
raise NotImplementedError(
@@ -370,7 +386,7 @@ class KafkaDriver(base.BaseDriver):
:param pool: consumer group of Kafka consumers
:type pool: string
"""
- conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN)
+ conn = ConsumerConnection(self.conf, self._url)
topics = set()
for target, priority in targets_and_priorities:
topics.add(target_to_topic(target, priority))
@@ -380,6 +396,3 @@ class KafkaDriver(base.BaseDriver):
listener = KafkaListener(conn)
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)
-
- def _get_connection(self, purpose):
- return driver_common.ConnectionContext(self.connection_pool, purpose)
diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
index 0bdcde5..398f707 100644
--- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py
+++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
@@ -33,12 +33,18 @@ KAFKA_OPTS = [
help='Default timeout(s) for Kafka consumers'),
cfg.IntOpt('pool_size', default=10,
+ deprecated_for_removal=True,
+ deprecated_reason='Driver no longer uses connection pool. ',
help='Pool Size for Kafka Consumers'),
cfg.IntOpt('conn_pool_min_size', default=2,
+ deprecated_for_removal=True,
+ deprecated_reason='Driver no longer uses connection pool. ',
help='The pool size limit for connections expiration policy'),
cfg.IntOpt('conn_pool_ttl', default=1200,
+ deprecated_for_removal=True,
+ deprecated_reason='Driver no longer uses connection pool. ',
help='The time-to-live in sec of idle connections in the pool'),
cfg.StrOpt('consumer_group', default="oslo_messaging_consumer",
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index e7dc115..f2f6f7f 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -17,7 +17,6 @@ from six.moves import mock
import testscenarios
import oslo_messaging
-from oslo_messaging._drivers import common as common_driver
from oslo_messaging._drivers import impl_kafka as kafka_driver
from oslo_messaging.tests import utils as test_utils
@@ -39,16 +38,24 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None,
- expected=dict(hostaddrs=['localhost:9092']))),
+ expected=dict(hostaddrs=['localhost:9092'],
+ vhost=None))),
('empty', dict(url='kafka:///',
- expected=dict(hostaddrs=['localhost:9092']))),
+ expected=dict(hostaddrs=['localhost:9092'],
+ vhost=''))),
('host', dict(url='kafka://127.0.0.1',
- expected=dict(hostaddrs=['127.0.0.1:9092']))),
+ expected=dict(hostaddrs=['127.0.0.1:9092'],
+ vhost=None))),
('port', dict(url='kafka://localhost:1234',
- expected=dict(hostaddrs=['localhost:1234']))),
+ expected=dict(hostaddrs=['localhost:1234'],
+ vhost=None))),
+ ('vhost', dict(url='kafka://localhost:1234/my_host',
+ expected=dict(hostaddrs=['localhost:1234'],
+ vhost='my_host'))),
('two', dict(url='kafka://localhost:1234,localhost2:1234',
expected=dict(hostaddrs=['localhost:1234',
- 'localhost2:1234']))),
+ 'localhost2:1234'],
+ vhost=None))),
]
@@ -62,8 +69,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
self.addCleanup(transport.cleanup)
driver = transport._driver
- conn = driver._get_connection(common_driver.PURPOSE_SEND)
- self.assertEqual(self.expected['hostaddrs'], conn.hostaddrs)
+ self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
+ self.assertEqual(self.expected['vhost'], driver.virtual_host)
class TestKafkaDriver(test_utils.BaseTestCase):
@@ -130,10 +137,11 @@ class TestKafkaConnection(test_utils.BaseTestCase):
self.driver = transport._driver
def test_notify(self):
- conn = self.driver._get_connection(common_driver.PURPOSE_SEND)
with mock.patch("kafka.KafkaProducer") as fake_producer_class:
fake_producer = fake_producer_class.return_value
- conn.notify_send("fake_topic", {"fake_ctxt": "fake_param"},
- {"fake_text": "fake_message_1"}, 10)
+ self.driver.pconn.notify_send("fake_topic",
+ {"fake_ctxt": "fake_param"},
+ {"fake_text": "fake_message_1"},
+ 10)
self.assertEqual(2, len(fake_producer.send.mock_calls))
diff --git a/tox.ini b/tox.ini
index bbf06cd..d772882 100644
--- a/tox.ini
+++ b/tox.ini
@@ -53,7 +53,7 @@ commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args=
[testenv:py27-func-kafka]
setenv =
{[testenv]setenv}
- TRANSPORT_URL=kafka://127.0.0.1:9092//
+ TRANSPORT_URL=kafka://127.0.0.1:9092/
OS_GROUP_REGEX=oslo_messaging.tests.functional
commands = {toxinidir}/setup-test-env-kafka.sh python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'