diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-09 10:29:08 -0700 |
commit | 0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (patch) | |
tree | 54c8520e94af2d72ca715c4db9bb855fbfa5574d | |
parent | cda2d59da4ff952adae1a75d906eaa3a99ac7f67 (diff) | |
parent | 097198cceaed97d5b804166d0c76a816c8dfead0 (diff) | |
download | kafka-python-0c94b83a2dff8113b5fd7c16df8a11ca03c4377b.tar.gz |
Merge pull request #621 from dpkp/ssl_support
Support SSL connections
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | kafka/client_async.py | 39 | ||||
-rw-r--r-- | kafka/conn.py | 94 | ||||
-rw-r--r-- | kafka/consumer/group.py | 21 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 21 | ||||
-rw-r--r-- | servers/0.10.0.0/resources/kafka.properties | 13 | ||||
-rw-r--r-- | servers/0.9.0.0/resources/kafka.properties | 13 | ||||
-rw-r--r-- | servers/0.9.0.1/resources/kafka.properties | 13 | ||||
-rw-r--r-- | test/conftest.py | 3 | ||||
-rw-r--r-- | test/fixtures.py | 14 |
10 files changed, 216 insertions, 16 deletions
@@ -6,6 +6,7 @@ dist MANIFEST env servers/*/kafka-bin* +servers/*/resources/ssl* .coverage* .noseids docs/_build diff --git a/kafka/client_async.py b/kafka/client_async.py index 36e808c..2eb86cf 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -53,6 +53,12 @@ class KafkaClient(object): 'send_buffer_bytes': None, 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, } def __init__(self, **configs): @@ -90,6 +96,21 @@ class KafkaClient(object): brokers or partitions. Default: 300000 retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. """ self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: @@ -168,8 +189,10 @@ class KafkaClient(object): def _conn_state_change(self, node_id, conn): if conn.connecting(): - self._connecting.add(node_id) - self._selector.register(conn._sock, selectors.EVENT_WRITE) + # SSL connections can enter this state 2x (second during Handshake) + if node_id not in self._connecting: + self._connecting.add(node_id) + self._selector.register(conn._sock, selectors.EVENT_WRITE) elif conn.connected(): log.debug("Node %s connected", node_id) @@ -412,7 +435,9 @@ class KafkaClient(object): def _poll(self, timeout, sleep=True): # select on reads across all connected sockets, blocking up to timeout assert self.in_flight_request_count() > 0 or self._connecting or sleep + responses = [] + processed = set() for key, events in self._selector.select(timeout): if key.fileobj is self._wake_r: self._clear_wake_fd() @@ -420,6 +445,7 @@ class KafkaClient(object): elif not (events & selectors.EVENT_READ): continue conn = key.data + processed.add(conn) while conn.in_flight_requests: response = conn.recv() # Note: conn.recv runs callbacks / errbacks @@ -428,6 +454,15 @@ class KafkaClient(object): if not response: break responses.append(response) + + # Check for additional pending SSL bytes + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + # TODO: optimize + for conn in self._conns.values(): + if conn not in processed and conn.connected() and conn._sock.pending(): + response = conn.recv() + if response: + responses.append(response) return responses def in_flight_request_count(self, node_id=None): diff --git a/kafka/conn.py b/kafka/conn.py index 28c09d9..f13ab64 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -5,6 +5,7 @@ import logging import io from random import shuffle import socket +import ssl import struct from threading import local import time @@ -29,11 +30,25 @@ log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 +# support older ssl libraries +try: + assert ssl.SSLWantReadError + assert ssl.SSLWantWriteError + assert ssl.SSLZeroReturnError +except: + log.warning('old ssl module detected.' + ' ssl error handling may not operate cleanly.' + ' Consider upgrading to python 3.5 or 2.7') + ssl.SSLWantReadError = ssl.SSLError + ssl.SSLWantWriteError = ssl.SSLError + ssl.SSLZeroReturnError = ssl.SSLError + class ConnectionStates(object): DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' + HANDSHAKE = '<handshake>' CONNECTED = '<connected>' @@ -49,6 +64,12 @@ class BrokerConnection(object): 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': (0, 8, 2), # default to most restrictive 'state_change_callback': lambda conn: True, } @@ -66,6 +87,9 @@ class BrokerConnection(object): self.state = ConnectionStates.DISCONNECTED self._sock = None + self._ssl_context = None + if self.config['ssl_context'] is not None: + self._ssl_context = self.config['ssl_context'] self._rbuffer = io.BytesIO() self._receiving = False self._next_payload_bytes = 0 @@ -87,6 +111,8 @@ class BrokerConnection(object): self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, self.config['send_buffer_bytes']) self._sock.setblocking(False) + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + self._wrap_ssl() self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() self.config['state_change_callback'](self) @@ -103,7 +129,11 @@ class BrokerConnection(object): # Connection succeeded if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) - self.state = ConnectionStates.CONNECTED + if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): + log.debug('%s: initiating SSL handshake', str(self)) + self.state = ConnectionStates.HANDSHAKE + else: + self.state = ConnectionStates.CONNECTED self.config['state_change_callback'](self) # Connection failed @@ -122,8 +152,60 @@ class BrokerConnection(object): else: pass + if self.state is ConnectionStates.HANDSHAKE: + if self._try_handshake(): + log.debug('%s: completed SSL handshake.', str(self)) + self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) + return self.state + def _wrap_ssl(self): + assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') + if self._ssl_context is None: + log.debug('%s: configuring default SSL Context', str(self)) + self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member + self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member + self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member + self._ssl_context.verify_mode = ssl.CERT_OPTIONAL + if self.config['ssl_check_hostname']: + self._ssl_context.check_hostname = True + if self.config['ssl_cafile']: + log.info('%s: Loading SSL CA from %s', str(self), self.config['ssl_cafile']) + self._ssl_context.load_verify_locations(self.config['ssl_cafile']) + self._ssl_context.verify_mode = ssl.CERT_REQUIRED + if self.config['ssl_certfile'] and self.config['ssl_keyfile']: + log.info('%s: Loading SSL Cert from %s', str(self), self.config['ssl_certfile']) + log.info('%s: Loading SSL Key from %s', str(self), self.config['ssl_keyfile']) + self._ssl_context.load_cert_chain( + certfile=self.config['ssl_certfile'], + keyfile=self.config['ssl_keyfile']) + log.debug('%s: wrapping socket in ssl context', str(self)) + try: + self._sock = self._ssl_context.wrap_socket( + self._sock, + server_hostname=self.host, + do_handshake_on_connect=False) + except ssl.SSLError: + log.exception('%s: Failed to wrap socket in SSLContext!', str(self)) + self.close() + self.last_failure = time.time() + + def _try_handshake(self): + assert self.config['security_protocol'] in ('SSL', 'SASL_SSL') + try: + self._sock.do_handshake() + return True + # old ssl in python2.6 will swallow all SSLErrors here... + except (ssl.SSLWantReadError, ssl.SSLWantWriteError): + pass + except ssl.SSLZeroReturnError: + log.warning('SSL connection closed by server during handshake.') + self.close() + # Other SSLErrors will be raised to user + + return False + def blacked_out(self): """ Return true if we are disconnected from the given node and can't @@ -140,8 +222,10 @@ class BrokerConnection(object): return self.state is ConnectionStates.CONNECTED def connecting(self): - """Return True iff socket is in intermediate connecting state.""" - return self.state is ConnectionStates.CONNECTING + """Returns True if still connecting (this may encompass several + different states, such as SSL handshake, authorization, etc).""" + return self.state in (ConnectionStates.CONNECTING, + ConnectionStates.HANDSHAKE) def disconnected(self): """Return True iff socket is closed""" @@ -260,6 +344,8 @@ class BrokerConnection(object): # An extremely small, but non-zero, probability that there are # more than 0 but not yet 4 bytes available to read self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) + except ssl.SSLWantReadError: + return None except ConnectionError as e: if six.PY2 and e.errno == errno.EWOULDBLOCK: return None @@ -286,6 +372,8 @@ class BrokerConnection(object): staged_bytes = self._rbuffer.tell() try: self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) + except ssl.SSLWantReadError: + return None except ConnectionError as e: # Extremely small chance that we have exactly 4 bytes for a # header, but nothing to read in the body yet diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 151e644..0a78e7f 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator): consumer_timeout_ms (int): number of millisecond to throw a timeout exception to the consumer if no message is available for consumption. Default: -1 (dont throw exception) + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. api_version (str): specify which kafka API version to use. 0.9 enables full group coordination features; 0.8.2 enables kafka-storage offset commits; 0.8.1 enables zookeeper-storage @@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator): 'send_buffer_bytes': None, 'receive_buffer_bytes': None, 'consumer_timeout_ms': -1, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': 'auto', 'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet #'metric_reporters': None, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0aecdc5..1862f8d 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -192,6 +192,21 @@ class KafkaProducer(object): max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. + security_protocol (str): Protocol used to communicate with brokers. + Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT. + ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping + socket connections. If provided, all other ssl_* configurations + will be ignored. Default: None. + ssl_check_hostname (bool): flag to configure whether ssl handshake + should verify that the certificate matches the brokers hostname. + default: true. + ssl_cafile (str): optional filename of ca file to use in certificate + veriication. default: none. + ssl_certfile (str): optional filename of file in pem format containing + the client certificate, as well as any ca certificates needed to + establish the certificate's authenticity. default: none. + ssl_keyfile (str): optional filename containing the client private key. + default: none. api_version (str): specify which kafka API version to use. If set to 'auto', will attempt to infer the broker version by probing various APIs. Default: auto @@ -222,6 +237,12 @@ class KafkaProducer(object): 'send_buffer_bytes': None, 'reconnect_backoff_ms': 50, 'max_in_flight_requests_per_connection': 5, + 'security_protocol': 'PLAINTEXT', + 'ssl_context': None, + 'ssl_check_hostname': True, + 'ssl_cafile': None, + 'ssl_certfile': None, + 'ssl_keyfile': None, 'api_version': 'auto', } diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties index 2fd9c54..7a19a11 100644 --- a/servers/0.10.0.0/resources/kafka.properties +++ b/servers/0.10.0.0/resources/kafka.properties @@ -21,11 +21,20 @@ broker.id={broker_id} ############################# Socket Server Settings ############################# +listeners={transport}://{host}:{port} +security.inter.broker.protocol={transport} + +ssl.keystore.location={ssl_dir}/server.keystore.jks +ssl.keystore.password=foobar +ssl.key.password=foobar +ssl.truststore.location={ssl_dir}/server.truststore.jks +ssl.truststore.password=foobar + # The port the socket server listens on -port={port} +#port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces -host.name={host} +#host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties index 0592c1e..b70a0da 100644 --- a/servers/0.9.0.0/resources/kafka.properties +++ b/servers/0.9.0.0/resources/kafka.properties @@ -21,11 +21,20 @@ broker.id={broker_id} ############################# Socket Server Settings ############################# +listeners={transport}://{host}:{port} +security.inter.broker.protocol={transport} + +ssl.keystore.location={ssl_dir}/server.keystore.jks +ssl.keystore.password=foobar +ssl.key.password=foobar +ssl.truststore.location={ssl_dir}/server.truststore.jks +ssl.truststore.password=foobar + # The port the socket server listens on -port={port} +#port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces -host.name={host} +#host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties index 2fd9c54..7a19a11 100644 --- a/servers/0.9.0.1/resources/kafka.properties +++ b/servers/0.9.0.1/resources/kafka.properties @@ -21,11 +21,20 @@ broker.id={broker_id} ############################# Socket Server Settings ############################# +listeners={transport}://{host}:{port} +security.inter.broker.protocol={transport} + +ssl.keystore.location={ssl_dir}/server.keystore.jks +ssl.keystore.password=foobar +ssl.key.password=foobar +ssl.truststore.location={ssl_dir}/server.truststore.jks +ssl.truststore.password=foobar + # The port the socket server listens on -port={port} +#port=9092 # Hostname the broker will bind to. If not set, the server will bind to all interfaces -host.name={host} +#host.name=localhost # Hostname the broker will advertise to producers and consumers. If not set, it uses the # value for "host.name" if configured. Otherwise, it will use the value returned from diff --git a/test/conftest.py b/test/conftest.py index 1f37960..c2ef1dd 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -51,7 +51,8 @@ def conn(mocker): return state conn._set_conn_state = _set_conn_state conn.connect.side_effect = lambda: conn.state - conn.connecting = lambda: conn.state is ConnectionStates.CONNECTING + conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING, + ConnectionStates.HANDSHAKE) conn.connected = lambda: conn.state is ConnectionStates.CONNECTED conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED return conn diff --git a/test/fixtures.py b/test/fixtures.py index e25ac22..826d037 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -182,8 +182,8 @@ class ZookeeperFixture(Fixture): class KafkaFixture(Fixture): @classmethod - def instance(cls, broker_id, zk_host, zk_port, - zk_chroot=None, port=None, replicas=1, partitions=2): + def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, port=None, + transport='PLAINTEXT', replicas=1, partitions=2): if zk_chroot is None: zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_") if "KAFKA_URI" in os.environ: @@ -194,16 +194,21 @@ class KafkaFixture(Fixture): if port is None: port = get_open_port() host = "127.0.0.1" - fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, + fixture = KafkaFixture(host, port, broker_id, + zk_host, zk_port, zk_chroot, + transport=transport, replicas=replicas, partitions=partitions) fixture.open() return fixture - def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2): + def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, + replicas=1, partitions=2, transport='PLAINTEXT'): self.host = host self.port = port self.broker_id = broker_id + self.transport = transport.upper() + self.ssl_dir = self.test_resource('ssl') self.zk_host = zk_host self.zk_port = zk_port @@ -233,6 +238,7 @@ class KafkaFixture(Fixture): self.out("Running local instance...") log.info(" host = %s", self.host) log.info(" port = %s", self.port) + log.info(" transport = %s", self.transport) log.info(" broker_id = %s", self.broker_id) log.info(" zk_host = %s", self.zk_host) log.info(" zk_port = %s", self.zk_port) |