summaryrefslogtreecommitdiff
path: root/oslo_messaging/_drivers/impl_kafka.py
diff options
context:
space:
mode:
Diffstat (limited to 'oslo_messaging/_drivers/impl_kafka.py')
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py159
1 files changed, 86 insertions, 73 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index deb5da0..676b1ad 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -33,7 +33,6 @@ import tenacity
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers.kafka_driver import kafka_options
-from oslo_messaging._drivers import pool as driver_pool
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LW
from oslo_serialization import jsonutils
@@ -81,17 +80,21 @@ def pack_message(ctxt, msg):
return msg
-def target_to_topic(target, priority=None):
+def concat(sep, items):
+ return sep.join(filter(bool, items))
+
+
+def target_to_topic(target, priority=None, vhost=None):
"""Convert target into topic string
:param target: Message destination target
:type target: oslo_messaging.Target
:param priority: Notification priority
:type priority: string
+ :param priority: Notification vhost
+ :type priority: string
"""
- if not priority:
- return target.topic
- return target.topic + '.' + priority
+ return concat(".", [target.topic, priority, vhost])
def retry_on_retriable_kafka_error(exc):
@@ -114,22 +117,12 @@ def with_reconnect(retries=None):
class Connection(object):
- def __init__(self, conf, url, purpose):
+ def __init__(self, conf, url):
- self.client = None
- driver_conf = conf.oslo_messaging_kafka
- self.batch_size = driver_conf.producer_batch_size
- self.linger_ms = driver_conf.producer_batch_timeout * 1000
self.conf = conf
- self.producer = None
- self.producer_lock = threading.Lock()
- self.consumer = None
- self.consumer_timeout = driver_conf.kafka_consumer_timeout
- self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
- self.group_id = driver_conf.consumer_group
self.url = url
+ self.virtual_host = url.virtual_host
self._parse_url()
- self._consume_loop_stopped = False
def _parse_url(self):
driver_conf = self.conf.oslo_messaging_kafka
@@ -145,33 +138,22 @@ class Connection(object):
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
driver_conf.kafka_default_port))
- def notify_send(self, topic, ctxt, msg, retry):
- """Send messages to Kafka broker.
+ def reset(self):
+ """Reset a connection so it can be used again."""
+ pass
- :param topic: String of the topic
- :param ctxt: context for the messages
- :param msg: messages for publishing
- :param retry: the number of retry
- """
- retry = retry if retry >= 0 else None
- message = pack_message(ctxt, msg)
- message = jsonutils.dumps(message)
- @with_reconnect(retries=retry)
- def wrapped_with_reconnect():
- self._ensure_producer()
- # NOTE(sileht): This returns a future, we can use get()
- # if we want to block like other driver
- future = self.producer.send(topic, message)
- future.get()
+class ConsumerConnection(Connection):
- try:
- wrapped_with_reconnect()
- except Exception:
- # NOTE(sileht): if something goes wrong close the producer
- # connection
- self._close_producer()
- raise
+ def __init__(self, conf, url):
+
+ super(ConsumerConnection, self).__init__(conf, url)
+ driver_conf = self.conf.oslo_messaging_kafka
+ self.consumer = None
+ self.consumer_timeout = driver_conf.kafka_consumer_timeout
+ self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
+ self.group_id = driver_conf.consumer_group
+ self._consume_loop_stopped = False
@with_reconnect()
def _poll_messages(self, timeout):
@@ -215,16 +197,67 @@ class Connection(object):
def stop_consuming(self):
self._consume_loop_stopped = True
- def reset(self):
- """Reset a connection so it can be used again."""
- pass
-
def close(self):
- self._close_producer()
if self.consumer:
self.consumer.close()
self.consumer = None
+ @with_reconnect()
+ def declare_topic_consumer(self, topics, group=None):
+ # TODO(Support for manual/auto_commit functionality)
+ # When auto_commit is False, consumer can manually notify
+ # the completion of the subscription.
+ # Currently we don't support for non auto commit option
+ self.consumer = kafka.KafkaConsumer(
+ *topics, group_id=(group or self.group_id),
+ bootstrap_servers=self.hostaddrs,
+ max_partition_fetch_bytes=self.max_fetch_bytes,
+ selector=KAFKA_SELECTOR
+ )
+
+
+class ProducerConnection(Connection):
+
+ def __init__(self, conf, url):
+
+ super(ProducerConnection, self).__init__(conf, url)
+ driver_conf = self.conf.oslo_messaging_kafka
+ self.batch_size = driver_conf.producer_batch_size
+ self.linger_ms = driver_conf.producer_batch_timeout * 1000
+ self.producer = None
+ self.producer_lock = threading.Lock()
+
+ def notify_send(self, topic, ctxt, msg, retry):
+ """Send messages to Kafka broker.
+
+ :param topic: String of the topic
+ :param ctxt: context for the messages
+ :param msg: messages for publishing
+ :param retry: the number of retry
+ """
+ retry = retry if retry >= 0 else None
+ message = pack_message(ctxt, msg)
+ message = jsonutils.dumps(message)
+
+ @with_reconnect(retries=retry)
+ def wrapped_with_reconnect():
+ self._ensure_producer()
+ # NOTE(sileht): This returns a future, we can use get()
+ # if we want to block like other driver
+ future = self.producer.send(topic, message)
+ future.get()
+
+ try:
+ wrapped_with_reconnect()
+ except Exception:
+ # NOTE(sileht): if something goes wrong close the producer
+ # connection
+ self._close_producer()
+ raise
+
+ def close(self):
+ self._close_producer()
+
def _close_producer(self):
with self.producer_lock:
if self.producer:
@@ -243,19 +276,6 @@ class Connection(object):
batch_size=self.batch_size,
selector=KAFKA_SELECTOR)
- @with_reconnect()
- def declare_topic_consumer(self, topics, group=None):
- # TODO(Support for manual/auto_commit functionality)
- # When auto_commit is False, consumer can manually notify
- # the completion of the subscription.
- # Currently we don't support for non auto commit option
- self.consumer = kafka.KafkaConsumer(
- *topics, group_id=(group or self.group_id),
- bootstrap_servers=self.hostaddrs,
- max_partition_fetch_bytes=self.max_fetch_bytes,
- selector=KAFKA_SELECTOR
- )
-
class OsloKafkaMessage(base.RpcIncomingMessage):
@@ -314,17 +334,12 @@ class KafkaDriver(base.BaseDriver):
super(KafkaDriver, self).__init__(
conf, url, default_exchange, allowed_remote_exmods)
- # the pool configuration properties
- max_size = self.conf.oslo_messaging_kafka.pool_size
- min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
- ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl
-
- self.connection_pool = driver_pool.ConnectionPool(
- self.conf, max_size, min_size, ttl,
- self._url, Connection)
self.listeners = []
+ self.virtual_host = url.virtual_host
+ self.pconn = ProducerConnection(conf, url)
def cleanup(self):
+ self.pconn.close()
for c in self.listeners:
c.close()
self.listeners = []
@@ -351,8 +366,9 @@ class KafkaDriver(base.BaseDriver):
N means N retries
:type retry: int
"""
- with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn:
- conn.notify_send(target_to_topic(target), ctxt, message, retry)
+ self.pconn.notify_send(target_to_topic(target,
+ vhost=self.virtual_host),
+ ctxt, message, retry)
def listen(self, target, batch_size, batch_timeout):
raise NotImplementedError(
@@ -370,7 +386,7 @@ class KafkaDriver(base.BaseDriver):
:param pool: consumer group of Kafka consumers
:type pool: string
"""
- conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN)
+ conn = ConsumerConnection(self.conf, self._url)
topics = set()
for target, priority in targets_and_priorities:
topics.add(target_to_topic(target, priority))
@@ -380,6 +396,3 @@ class KafkaDriver(base.BaseDriver):
listener = KafkaListener(conn)
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)
-
- def _get_connection(self, purpose):
- return driver_common.ConnectionContext(self.connection_pool, purpose)