summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhang-shaoman <zhang.shaoman@zte.com.cn>2019-05-28 16:15:57 +0800
committerzhang-shaoman <zhang.shaoman@zte.com.cn>2019-06-20 11:18:35 +0800
commit9a752862e2cf5a9d7b011b9815c77c38a14cb756 (patch)
treea5761eccd13d0d039b999c19dcebe81fd23e446e
parentfe0ac3195ef1dcb1b481f0f5cf6ff2b55ca6056d (diff)
downloadoslo-messaging-9a752862e2cf5a9d7b011b9815c77c38a14cb756.tar.gz
Support kafka message compression
When the message is large, in order to improve the efficiency of kafka, we need to compress the message before send it, so we need to support kafka message compression. Change-Id: I9e86d43ad934c1f82dc3dcf93d317538f9d2568e Implements: blueprint support-kafka-compression
-rw-r--r--doc/source/admin/kafka.rst7
-rw-r--r--oslo_messaging/_drivers/impl_kafka.py2
-rw-r--r--oslo_messaging/_drivers/kafka_driver/kafka_options.py7
-rw-r--r--oslo_messaging/tests/drivers/test_impl_kafka.py1
-rw-r--r--oslo_messaging/tests/functional/test_functional.py24
5 files changed, 41 insertions, 0 deletions
diff --git a/doc/source/admin/kafka.rst b/doc/source/admin/kafka.rst
index 3c0505c..10361a9 100644
--- a/doc/source/admin/kafka.rst
+++ b/doc/source/admin/kafka.rst
@@ -183,6 +183,13 @@ producer_batch_timeout
producer_batch_size
The maximum number of messages batched into one message set
+compression_codec
+ The compression codec for all data generated by the producer, valid values
+ are: none, gzip, snappy, lz4, zstd. Note that the legal option of this
+ depends on the kafka version, please refer to `kafka documentation`_.
+
+.. _kafka documentation: https://kafka.apache.org/documentation/
+
Security Options
^^^^^^^^^^^^^^^^
diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py
index ea3d102..f38af75 100644
--- a/oslo_messaging/_drivers/impl_kafka.py
+++ b/oslo_messaging/_drivers/impl_kafka.py
@@ -255,6 +255,7 @@ class ProducerConnection(Connection):
super(ProducerConnection, self).__init__(conf, url)
self.batch_size = self.driver_conf.producer_batch_size
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
+ self.compression_codec = self.driver_conf.compression_codec
self.producer = None
self.producer_lock = threading.Lock()
@@ -317,6 +318,7 @@ class ProducerConnection(Connection):
'bootstrap.servers': ",".join(self.hostaddrs),
'linger.ms': self.linger_ms,
'batch.num.messages': self.batch_size,
+ 'compression.codec': self.compression_codec,
'security.protocol': self.security_protocol,
'sasl.mechanism': self.sasl_mechanism,
'sasl.username': self.username,
diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
index 5fbe7b2..42c990c 100644
--- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py
+++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py
@@ -48,6 +48,13 @@ KAFKA_OPTS = [
cfg.IntOpt('producer_batch_size', default=16384,
help='Size of batch for the producer async send'),
+ cfg.StrOpt('compression_codec', default='none',
+ choices=['none', 'gzip', 'snappy', 'lz4', 'zstd'],
+ help='The compression codec for all data generated by the '
+ 'producer. Valid values are: gzip, snappy, lz4, zstd. If '
+ 'not set, compression will not be used. Note that the '
+ 'legal option of this depends on the kafka version'),
+
cfg.BoolOpt('enable_auto_commit',
default=False,
help='Enable asynchronous consumer commits'),
diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py
index 80af576..0af8c05 100644
--- a/oslo_messaging/tests/drivers/test_impl_kafka.py
+++ b/oslo_messaging/tests/drivers/test_impl_kafka.py
@@ -108,6 +108,7 @@ class TestKafkaDriver(test_utils.BaseTestCase):
'bootstrap.servers': '',
'linger.ms': mock.ANY,
'batch.num.messages': mock.ANY,
+ 'compression.codec': 'none',
'security.protocol': 'PLAINTEXT',
'sasl.mechanism': 'PLAIN',
'sasl.username': mock.ANY,
diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py
index 4fa8b48..7fc5bd5 100644
--- a/oslo_messaging/tests/functional/test_functional.py
+++ b/oslo_messaging/tests/functional/test_functional.py
@@ -505,3 +505,27 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(100, len(events[0][1]))
self.assertEqual(100, len(events[1][1]))
self.assertEqual(5, len(events[2][1]))
+
+ def test_compression(self):
+ get_timeout = 1
+ if self.url.startswith("amqp:"):
+ self.conf.set_override('kombu_compression', 'gzip',
+ group='oslo_messaging_rabbit')
+ if self.url.startswith("kafka://"):
+ get_timeout = 5
+ self.conf.set_override('compression_codec', 'gzip',
+ group='oslo_messaging_kafka')
+ self.conf.set_override('consumer_group', 'test_compression',
+ group='oslo_messaging_kafka')
+
+ listener = self.useFixture(
+ utils.NotificationFixture(self.conf, self.url,
+ ['test_compression']))
+ notifier = listener.notifier('abc')
+
+ notifier.info({}, 'test', 'Hello World!')
+ event = listener.events.get(timeout=get_timeout)
+ self.assertEqual('info', event[0])
+ self.assertEqual('test', event[1])
+ self.assertEqual('Hello World!', event[2])
+ self.assertEqual('abc', event[3])