diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-04 19:54:01 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 09:24:39 -0700 |
commit | 01f03656cc613a2281d22521da4a016c7fa4a8ba (patch) | |
tree | 418d675f4bb16ef2662c0c8ce2ca60bbb2c18183 | |
parent | d1bfccfce1a9c1784ad17a38faf84d8fdab1e8ce (diff) | |
download | kafka-python-01f03656cc613a2281d22521da4a016c7fa4a8ba.tar.gz |
Add SSL configuration kwargs to KafkaClient, KafkaConsumer, KafkaProducer
-rw-r--r-- | kafka/client_async.py | 21 | ||||
-rw-r--r-- | kafka/consumer/group.py | 21 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 21 |
3 files changed, 63 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b91ae35..2eb86cf 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -53,6 +53,12 @@ class KafkaClient(object): 'send_buffer_bytes': None, 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, } def __init__(self, **configs): @@ -90,6 +96,21 @@ class KafkaClient(object): brokers or partitions. Default: 300000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 151e644..0a78e7f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator): consumer_timeout_ms (int): number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Default: -1 (dont throw exception) + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. api_version (str): specify which kafka API version to use. 0.9 enables full group coordination features; 0.8.2 enables kafka-storage offset commits; 0.8.1 enables zookeeper-storage @@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator): 'send_buffer_bytes': None, 'receive_buffer_bytes': None, 'consumer_timeout_ms': -1, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet #'metric_reporters': None, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0aecdc5..1862f8d 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -192,6 +192,21 @@ class KafkaProducer(object): max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. api_version (str): specify which kafka API version to use. If set to 'auto', will attempt to infer the broker version by probing various APIs. Default: auto @@ -222,6 +237,12 @@ class KafkaProducer(object): 'send_buffer_bytes': None, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': 'auto', } |