summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-09 10:29:08 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-09 10:29:08 -0700
commit0c94b83a2dff8113b5fd7c16df8a11ca03c4377b (patch)
tree54c8520e94af2d72ca715c4db9bb855fbfa5574d
parentcda2d59da4ff952adae1a75d906eaa3a99ac7f67 (diff)
parent097198cceaed97d5b804166d0c76a816c8dfead0 (diff)
downloadkafka-python-0c94b83a2dff8113b5fd7c16df8a11ca03c4377b.tar.gz
Merge pull request #621 from dpkp/ssl_support
Support SSL connections
-rw-r--r--.gitignore1
-rw-r--r--kafka/client_async.py39
-rw-r--r--kafka/conn.py94
-rw-r--r--kafka/consumer/group.py21
-rw-r--r--kafka/producer/kafka.py21
-rw-r--r--servers/0.10.0.0/resources/kafka.properties13
-rw-r--r--servers/0.9.0.0/resources/kafka.properties13
-rw-r--r--servers/0.9.0.1/resources/kafka.properties13
-rw-r--r--test/conftest.py3
-rw-r--r--test/fixtures.py14
10 files changed, 216 insertions, 16 deletions
diff --git a/.gitignore b/.gitignore
index 7e28e05..13be591 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,6 +6,7 @@ dist
MANIFEST
env
servers/*/kafka-bin*
+servers/*/resources/ssl*
.coverage*
.noseids
docs/_build
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 36e808c..2eb86cf 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -53,6 +53,12 @@ class KafkaClient(object):
'send_buffer_bytes': None,
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
}
def __init__(self, **configs):
@@ -90,6 +96,21 @@ class KafkaClient(object):
brokers or partitions. Default: 300000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
"""
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
@@ -168,8 +189,10 @@ class KafkaClient(object):
def _conn_state_change(self, node_id, conn):
if conn.connecting():
- self._connecting.add(node_id)
- self._selector.register(conn._sock, selectors.EVENT_WRITE)
+ # SSL connections can enter this state 2x (second during Handshake)
+ if node_id not in self._connecting:
+ self._connecting.add(node_id)
+ self._selector.register(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
@@ -412,7 +435,9 @@ class KafkaClient(object):
def _poll(self, timeout, sleep=True):
# select on reads across all connected sockets, blocking up to timeout
assert self.in_flight_request_count() > 0 or self._connecting or sleep
+
responses = []
+ processed = set()
for key, events in self._selector.select(timeout):
if key.fileobj is self._wake_r:
self._clear_wake_fd()
@@ -420,6 +445,7 @@ class KafkaClient(object):
elif not (events & selectors.EVENT_READ):
continue
conn = key.data
+ processed.add(conn)
while conn.in_flight_requests:
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -428,6 +454,15 @@ class KafkaClient(object):
if not response:
break
responses.append(response)
+
+ # Check for additional pending SSL bytes
+ if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
+ # TODO: optimize
+ for conn in self._conns.values():
+ if conn not in processed and conn.connected() and conn._sock.pending():
+ response = conn.recv()
+ if response:
+ responses.append(response)
return responses
def in_flight_request_count(self, node_id=None):
diff --git a/kafka/conn.py b/kafka/conn.py
index 28c09d9..f13ab64 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -5,6 +5,7 @@ import logging
import io
from random import shuffle
import socket
+import ssl
import struct
from threading import local
import time
@@ -29,11 +30,25 @@ log = logging.getLogger(__name__)
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
+# support older ssl libraries
+try:
+ assert ssl.SSLWantReadError
+ assert ssl.SSLWantWriteError
+ assert ssl.SSLZeroReturnError
+except:
+ log.warning('old ssl module detected.'
+ ' ssl error handling may not operate cleanly.'
+ ' Consider upgrading to python 3.5 or 2.7')
+ ssl.SSLWantReadError = ssl.SSLError
+ ssl.SSLWantWriteError = ssl.SSLError
+ ssl.SSLZeroReturnError = ssl.SSLError
+
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
+ HANDSHAKE = '<handshake>'
CONNECTED = '<connected>'
@@ -49,6 +64,12 @@ class BrokerConnection(object):
'max_in_flight_requests_per_connection': 5,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
'api_version': (0, 8, 2), # default to most restrictive
'state_change_callback': lambda conn: True,
}
@@ -66,6 +87,9 @@ class BrokerConnection(object):
self.state = ConnectionStates.DISCONNECTED
self._sock = None
+ self._ssl_context = None
+ if self.config['ssl_context'] is not None:
+ self._ssl_context = self.config['ssl_context']
self._rbuffer = io.BytesIO()
self._receiving = False
self._next_payload_bytes = 0
@@ -87,6 +111,8 @@ class BrokerConnection(object):
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes'])
self._sock.setblocking(False)
+ if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
+ self._wrap_ssl()
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
self.config['state_change_callback'](self)
@@ -103,7 +129,11 @@ class BrokerConnection(object):
# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
- self.state = ConnectionStates.CONNECTED
+ if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
+ log.debug('%s: initiating SSL handshake', str(self))
+ self.state = ConnectionStates.HANDSHAKE
+ else:
+ self.state = ConnectionStates.CONNECTED
self.config['state_change_callback'](self)
# Connection failed
@@ -122,8 +152,60 @@ class BrokerConnection(object):
else:
pass
+ if self.state is ConnectionStates.HANDSHAKE:
+ if self._try_handshake():
+ log.debug('%s: completed SSL handshake.', str(self))
+ self.state = ConnectionStates.CONNECTED
+ self.config['state_change_callback'](self)
+
return self.state
+ def _wrap_ssl(self):
+ assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
+ if self._ssl_context is None:
+ log.debug('%s: configuring default SSL Context', str(self))
+ self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # pylint: disable=no-member
+ self._ssl_context.options |= ssl.OP_NO_SSLv2 # pylint: disable=no-member
+ self._ssl_context.options |= ssl.OP_NO_SSLv3 # pylint: disable=no-member
+ self._ssl_context.verify_mode = ssl.CERT_OPTIONAL
+ if self.config['ssl_check_hostname']:
+ self._ssl_context.check_hostname = True
+ if self.config['ssl_cafile']:
+ log.info('%s: Loading SSL CA from %s', str(self), self.config['ssl_cafile'])
+ self._ssl_context.load_verify_locations(self.config['ssl_cafile'])
+ self._ssl_context.verify_mode = ssl.CERT_REQUIRED
+ if self.config['ssl_certfile'] and self.config['ssl_keyfile']:
+ log.info('%s: Loading SSL Cert from %s', str(self), self.config['ssl_certfile'])
+ log.info('%s: Loading SSL Key from %s', str(self), self.config['ssl_keyfile'])
+ self._ssl_context.load_cert_chain(
+ certfile=self.config['ssl_certfile'],
+ keyfile=self.config['ssl_keyfile'])
+ log.debug('%s: wrapping socket in ssl context', str(self))
+ try:
+ self._sock = self._ssl_context.wrap_socket(
+ self._sock,
+ server_hostname=self.host,
+ do_handshake_on_connect=False)
+ except ssl.SSLError:
+ log.exception('%s: Failed to wrap socket in SSLContext!', str(self))
+ self.close()
+ self.last_failure = time.time()
+
+ def _try_handshake(self):
+ assert self.config['security_protocol'] in ('SSL', 'SASL_SSL')
+ try:
+ self._sock.do_handshake()
+ return True
+ # old ssl in python2.6 will swallow all SSLErrors here...
+ except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
+ pass
+ except ssl.SSLZeroReturnError:
+ log.warning('SSL connection closed by server during handshake.')
+ self.close()
+ # Other SSLErrors will be raised to user
+
+ return False
+
def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
@@ -140,8 +222,10 @@ class BrokerConnection(object):
return self.state is ConnectionStates.CONNECTED
def connecting(self):
- """Return True iff socket is in intermediate connecting state."""
- return self.state is ConnectionStates.CONNECTING
+ """Returns True if still connecting (this may encompass several
+ different states, such as SSL handshake, authorization, etc)."""
+ return self.state in (ConnectionStates.CONNECTING,
+ ConnectionStates.HANDSHAKE)
def disconnected(self):
"""Return True iff socket is closed"""
@@ -260,6 +344,8 @@ class BrokerConnection(object):
# An extremely small, but non-zero, probability that there are
# more than 0 but not yet 4 bytes available to read
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
+ except ssl.SSLWantReadError:
+ return None
except ConnectionError as e:
if six.PY2 and e.errno == errno.EWOULDBLOCK:
return None
@@ -286,6 +372,8 @@ class BrokerConnection(object):
staged_bytes = self._rbuffer.tell()
try:
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
+ except ssl.SSLWantReadError:
+ return None
except ConnectionError as e:
# Extremely small chance that we have exactly 4 bytes for a
# header, but nothing to read in the body yet
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 151e644..0a78e7f 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -122,6 +122,21 @@ class KafkaConsumer(six.Iterator):
consumer_timeout_ms (int): number of millisecond to throw a timeout
exception to the consumer if no message is available for
consumption. Default: -1 (dont throw exception)
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
api_version (str): specify which kafka API version to use.
0.9 enables full group coordination features; 0.8.2 enables
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
@@ -158,6 +173,12 @@ class KafkaConsumer(six.Iterator):
'send_buffer_bytes': None,
'receive_buffer_bytes': None,
'consumer_timeout_ms': -1,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
'api_version': 'auto',
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
#'metric_reporters': None,
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0aecdc5..1862f8d 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -192,6 +192,21 @@ class KafkaProducer(object):
max_in_flight_requests_per_connection (int): Requests are pipelined
to kafka brokers up to this number of maximum requests per
broker connection. Default: 5.
+ security_protocol (str): Protocol used to communicate with brokers.
+ Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
+ socket connections. If provided, all other ssl_* configurations
+ will be ignored. Default: None.
+ ssl_check_hostname (bool): flag to configure whether ssl handshake
+ should verify that the certificate matches the brokers hostname.
+ default: true.
+ ssl_cafile (str): optional filename of ca file to use in certificate
+ veriication. default: none.
+ ssl_certfile (str): optional filename of file in pem format containing
+ the client certificate, as well as any ca certificates needed to
+ establish the certificate's authenticity. default: none.
+ ssl_keyfile (str): optional filename containing the client private key.
+ default: none.
api_version (str): specify which kafka API version to use.
If set to 'auto', will attempt to infer the broker version by
probing various APIs. Default: auto
@@ -222,6 +237,12 @@ class KafkaProducer(object):
'send_buffer_bytes': None,
'reconnect_backoff_ms': 50,
'max_in_flight_requests_per_connection': 5,
+ 'security_protocol': 'PLAINTEXT',
+ 'ssl_context': None,
+ 'ssl_check_hostname': True,
+ 'ssl_cafile': None,
+ 'ssl_certfile': None,
+ 'ssl_keyfile': None,
'api_version': 'auto',
}
diff --git a/servers/0.10.0.0/resources/kafka.properties b/servers/0.10.0.0/resources/kafka.properties
index 2fd9c54..7a19a11 100644
--- a/servers/0.10.0.0/resources/kafka.properties
+++ b/servers/0.10.0.0/resources/kafka.properties
@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
+listeners={transport}://{host}:{port}
+security.inter.broker.protocol={transport}
+
+ssl.keystore.location={ssl_dir}/server.keystore.jks
+ssl.keystore.password=foobar
+ssl.key.password=foobar
+ssl.truststore.location={ssl_dir}/server.truststore.jks
+ssl.truststore.password=foobar
+
# The port the socket server listens on
-port={port}
+#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-host.name={host}
+#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
diff --git a/servers/0.9.0.0/resources/kafka.properties b/servers/0.9.0.0/resources/kafka.properties
index 0592c1e..b70a0da 100644
--- a/servers/0.9.0.0/resources/kafka.properties
+++ b/servers/0.9.0.0/resources/kafka.properties
@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
+listeners={transport}://{host}:{port}
+security.inter.broker.protocol={transport}
+
+ssl.keystore.location={ssl_dir}/server.keystore.jks
+ssl.keystore.password=foobar
+ssl.key.password=foobar
+ssl.truststore.location={ssl_dir}/server.truststore.jks
+ssl.truststore.password=foobar
+
# The port the socket server listens on
-port={port}
+#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-host.name={host}
+#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
diff --git a/servers/0.9.0.1/resources/kafka.properties b/servers/0.9.0.1/resources/kafka.properties
index 2fd9c54..7a19a11 100644
--- a/servers/0.9.0.1/resources/kafka.properties
+++ b/servers/0.9.0.1/resources/kafka.properties
@@ -21,11 +21,20 @@ broker.id={broker_id}
############################# Socket Server Settings #############################
+listeners={transport}://{host}:{port}
+security.inter.broker.protocol={transport}
+
+ssl.keystore.location={ssl_dir}/server.keystore.jks
+ssl.keystore.password=foobar
+ssl.key.password=foobar
+ssl.truststore.location={ssl_dir}/server.truststore.jks
+ssl.truststore.password=foobar
+
# The port the socket server listens on
-port={port}
+#port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
-host.name={host}
+#host.name=localhost
# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured. Otherwise, it will use the value returned from
diff --git a/test/conftest.py b/test/conftest.py
index 1f37960..c2ef1dd 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -51,7 +51,8 @@ def conn(mocker):
return state
conn._set_conn_state = _set_conn_state
conn.connect.side_effect = lambda: conn.state
- conn.connecting = lambda: conn.state is ConnectionStates.CONNECTING
+ conn.connecting = lambda: conn.state in (ConnectionStates.CONNECTING,
+ ConnectionStates.HANDSHAKE)
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn
diff --git a/test/fixtures.py b/test/fixtures.py
index e25ac22..826d037 100644
--- a/test/fixtures.py
+++ b/test/fixtures.py
@@ -182,8 +182,8 @@ class ZookeeperFixture(Fixture):
class KafkaFixture(Fixture):
@classmethod
- def instance(cls, broker_id, zk_host, zk_port,
- zk_chroot=None, port=None, replicas=1, partitions=2):
+ def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, port=None,
+ transport='PLAINTEXT', replicas=1, partitions=2):
if zk_chroot is None:
zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
if "KAFKA_URI" in os.environ:
@@ -194,16 +194,21 @@ class KafkaFixture(Fixture):
if port is None:
port = get_open_port()
host = "127.0.0.1"
- fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot,
+ fixture = KafkaFixture(host, port, broker_id,
+ zk_host, zk_port, zk_chroot,
+ transport=transport,
replicas=replicas, partitions=partitions)
fixture.open()
return fixture
- def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
+ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot,
+ replicas=1, partitions=2, transport='PLAINTEXT'):
self.host = host
self.port = port
self.broker_id = broker_id
+ self.transport = transport.upper()
+ self.ssl_dir = self.test_resource('ssl')
self.zk_host = zk_host
self.zk_port = zk_port
@@ -233,6 +238,7 @@ class KafkaFixture(Fixture):
self.out("Running local instance...")
log.info(" host = %s", self.host)
log.info(" port = %s", self.port)
+ log.info(" transport = %s", self.transport)
log.info(" broker_id = %s", self.broker_id)
log.info(" zk_host = %s", self.zk_host)
log.info(" zk_port = %s", self.zk_port)