summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-04 19:54:01 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-09 09:24:39 -0700
commit01f03656cc613a2281d22521da4a016c7fa4a8ba (patch)
tree418d675f4bb16ef2662c0c8ce2ca60bbb2c18183
parentd1bfccfce1a9c1784ad17a38faf84d8fdab1e8ce (diff)
downloadkafka-python-01f03656cc613a2281d22521da4a016c7fa4a8ba.tar.gz
Add SSL configuration kwargs to KafkaClient, KafkaConsumer, KafkaProducer
-rw-r--r--kafka/client_async.py21
-rw-r--r--kafka/consumer/group.py21
-rw-r--r--kafka/producer/kafka.py21
3 files changed, 63 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b91ae35..2eb86cf 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -53,6 +53,12 @@ class KafkaClient(object):
'send_buffer_bytes': None,
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
}
def __init__(self, **configs):
@@ -90,6 +96,21 @@ class KafkaClient(object):
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 151e644..0a78e7f 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of millisecond to throw a timeout
exception to the consumer if no message is available for
consumption. Default: -1 (dont throw exception)
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
api_version (str): specify which kafka API version to use.
0.9 enables full group coordination features; 0.8.2 enables
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
@@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
#'metric_reporters': None,
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0aecdc5..1862f8d 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -192,6 +192,21 @@ class KafkaProducer(object):
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
api_version (str): specify which kafka API version to use.
If set to 'auto', will attempt to infer the broker version by
probing various APIs. Default: auto
@@ -222,6 +237,12 @@ class KafkaProducer(object):
'send_buffer_bytes': None,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
'api_version': 'auto',
}