diff options
Diffstat (limited to 'oslo_messaging/_drivers/impl_kafka.py')
-rw-r--r-- | oslo_messaging/_drivers/impl_kafka.py | 159 |
1 files changed, 86 insertions, 73 deletions
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index deb5da0..676b1ad 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -33,7 +33,6 @@ import tenacity from oslo_messaging._drivers import base from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers.kafka_driver import kafka_options -from oslo_messaging._drivers import pool as driver_pool from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LW from oslo_serialization import jsonutils @@ -81,17 +80,21 @@ def pack_message(ctxt, msg): return msg -def target_to_topic(target, priority=None): +def concat(sep, items): + return sep.join(filter(bool, items)) + + +def target_to_topic(target, priority=None, vhost=None): """Convert target into topic string :param target: Message destination target :type target: oslo_messaging.Target :param priority: Notification priority :type priority: string + :param priority: Notification vhost + :type priority: string """ - if not priority: - return target.topic - return target.topic + '.' + priority + return concat(".", [target.topic, priority, vhost]) def retry_on_retriable_kafka_error(exc): @@ -114,22 +117,12 @@ def with_reconnect(retries=None): class Connection(object): - def __init__(self, conf, url, purpose): + def __init__(self, conf, url): - self.client = None - driver_conf = conf.oslo_messaging_kafka - self.batch_size = driver_conf.producer_batch_size - self.linger_ms = driver_conf.producer_batch_timeout * 1000 self.conf = conf - self.producer = None - self.producer_lock = threading.Lock() - self.consumer = None - self.consumer_timeout = driver_conf.kafka_consumer_timeout - self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes - self.group_id = driver_conf.consumer_group self.url = url + self.virtual_host = url.virtual_host self._parse_url() - self._consume_loop_stopped = False def _parse_url(self): driver_conf = self.conf.oslo_messaging_kafka @@ -145,33 +138,22 @@ class Connection(object): self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host, driver_conf.kafka_default_port)) - def notify_send(self, topic, ctxt, msg, retry): - """Send messages to Kafka broker. + def reset(self): + """Reset a connection so it can be used again.""" + pass - :param topic: String of the topic - :param ctxt: context for the messages - :param msg: messages for publishing - :param retry: the number of retry - """ - retry = retry if retry >= 0 else None - message = pack_message(ctxt, msg) - message = jsonutils.dumps(message) - @with_reconnect(retries=retry) - def wrapped_with_reconnect(): - self._ensure_producer() - # NOTE(sileht): This returns a future, we can use get() - # if we want to block like other driver - future = self.producer.send(topic, message) - future.get() +class ConsumerConnection(Connection): - try: - wrapped_with_reconnect() - except Exception: - # NOTE(sileht): if something goes wrong close the producer - # connection - self._close_producer() - raise + def __init__(self, conf, url): + + super(ConsumerConnection, self).__init__(conf, url) + driver_conf = self.conf.oslo_messaging_kafka + self.consumer = None + self.consumer_timeout = driver_conf.kafka_consumer_timeout + self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes + self.group_id = driver_conf.consumer_group + self._consume_loop_stopped = False @with_reconnect() def _poll_messages(self, timeout): @@ -215,16 +197,67 @@ class Connection(object): def stop_consuming(self): self._consume_loop_stopped = True - def reset(self): - """Reset a connection so it can be used again.""" - pass - def close(self): - self._close_producer() if self.consumer: self.consumer.close() self.consumer = None + @with_reconnect() + def declare_topic_consumer(self, topics, group=None): + # TODO(Support for manual/auto_commit functionality) + # When auto_commit is False, consumer can manually notify + # the completion of the subscription. + # Currently we don't support for non auto commit option + self.consumer = kafka.KafkaConsumer( + *topics, group_id=(group or self.group_id), + bootstrap_servers=self.hostaddrs, + max_partition_fetch_bytes=self.max_fetch_bytes, + selector=KAFKA_SELECTOR + ) + + +class ProducerConnection(Connection): + + def __init__(self, conf, url): + + super(ProducerConnection, self).__init__(conf, url) + driver_conf = self.conf.oslo_messaging_kafka + self.batch_size = driver_conf.producer_batch_size + self.linger_ms = driver_conf.producer_batch_timeout * 1000 + self.producer = None + self.producer_lock = threading.Lock() + + def notify_send(self, topic, ctxt, msg, retry): + """Send messages to Kafka broker. + + :param topic: String of the topic + :param ctxt: context for the messages + :param msg: messages for publishing + :param retry: the number of retry + """ + retry = retry if retry >= 0 else None + message = pack_message(ctxt, msg) + message = jsonutils.dumps(message) + + @with_reconnect(retries=retry) + def wrapped_with_reconnect(): + self._ensure_producer() + # NOTE(sileht): This returns a future, we can use get() + # if we want to block like other driver + future = self.producer.send(topic, message) + future.get() + + try: + wrapped_with_reconnect() + except Exception: + # NOTE(sileht): if something goes wrong close the producer + # connection + self._close_producer() + raise + + def close(self): + self._close_producer() + def _close_producer(self): with self.producer_lock: if self.producer: @@ -243,19 +276,6 @@ class Connection(object): batch_size=self.batch_size, selector=KAFKA_SELECTOR) - @with_reconnect() - def declare_topic_consumer(self, topics, group=None): - # TODO(Support for manual/auto_commit functionality) - # When auto_commit is False, consumer can manually notify - # the completion of the subscription. - # Currently we don't support for non auto commit option - self.consumer = kafka.KafkaConsumer( - *topics, group_id=(group or self.group_id), - bootstrap_servers=self.hostaddrs, - max_partition_fetch_bytes=self.max_fetch_bytes, - selector=KAFKA_SELECTOR - ) - class OsloKafkaMessage(base.RpcIncomingMessage): @@ -314,17 +334,12 @@ class KafkaDriver(base.BaseDriver): super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) - # the pool configuration properties - max_size = self.conf.oslo_messaging_kafka.pool_size - min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size - ttl = self.conf.oslo_messaging_kafka.conn_pool_ttl - - self.connection_pool = driver_pool.ConnectionPool( - self.conf, max_size, min_size, ttl, - self._url, Connection) self.listeners = [] + self.virtual_host = url.virtual_host + self.pconn = ProducerConnection(conf, url) def cleanup(self): + self.pconn.close() for c in self.listeners: c.close() self.listeners = [] @@ -351,8 +366,9 @@ class KafkaDriver(base.BaseDriver): N means N retries :type retry: int """ - with self._get_connection(purpose=driver_common.PURPOSE_SEND) as conn: - conn.notify_send(target_to_topic(target), ctxt, message, retry) + self.pconn.notify_send(target_to_topic(target, + vhost=self.virtual_host), + ctxt, message, retry) def listen(self, target, batch_size, batch_timeout): raise NotImplementedError( @@ -370,7 +386,7 @@ class KafkaDriver(base.BaseDriver): :param pool: consumer group of Kafka consumers :type pool: string """ - conn = self._get_connection(purpose=driver_common.PURPOSE_LISTEN) + conn = ConsumerConnection(self.conf, self._url) topics = set() for target, priority in targets_and_priorities: topics.add(target_to_topic(target, priority)) @@ -380,6 +396,3 @@ class KafkaDriver(base.BaseDriver): listener = KafkaListener(conn) return base.PollStyleListenerAdapter(listener, batch_size, batch_timeout) - - def _get_connection(self, purpose): - return driver_common.ConnectionContext(self.connection_pool, purpose) |