summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-09-30 14:00:27 -0700
committerDana Powers <dana.powers@gmail.com>2017-09-30 14:00:48 -0700
commit04607f18bd5639e9ab82501cc36d9d6318f62412 (patch)
tree282ff0ef62cae71d716a88d317f359d2319369f8
parentb1ae45c10d46e881492b0fd37f0919cac6d79224 (diff)
downloadkafka-python-sasl_fixes.tar.gz
Small fixes to SASL documentation and logging; validate security_protocolsasl_fixes
-rw-r--r--kafka/conn.py47
1 files changed, 26 insertions, 21 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index af01efa..5b436e5 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -112,7 +112,8 @@ class BrokerConnection(object):
to apply to broker connection sockets. Default:
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
security_protocol (str): Protocol used to communicate with brokers.
- Valid values are: PLAINTEXT, SSL. Default: PLAINTEXT.
+ Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+ Default: PLAINTEXT.
ssl_context (ssl.SSLContext): pre-configured SSLContext for wrapping
socket connections. If provided, all other ssl_* configurations
will be ignored. Default: None.
@@ -145,13 +146,15 @@ class BrokerConnection(object):
metrics (kafka.metrics.Metrics): Optionally provide a metrics
instance for capturing network IO stats. Default: None.
metric_group_prefix (str): Prefix for metric names. Default: ''
- sasl_mechanism (str): string picking sasl mechanism when security_protocol
- is SASL_PLAINTEXT or SASL_SSL. Currently only PLAIN is supported.
- Default: None
+ sasl_mechanism (str): Authentication mechanism when security_protocol
+ is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
+ PLAIN, GSSAPI. Default: PLAIN
sasl_plain_username (str): username for sasl PLAIN authentication.
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
+ sasl_kerberos_service_name (str): Service name to include in GSSAPI
+ sasl mechanism handshake. Default: 'kafka'
"""
DEFAULT_CONFIG = {
@@ -179,12 +182,10 @@ class BrokerConnection(object):
'sasl_mechanism': 'PLAIN',
'sasl_plain_username': None,
'sasl_plain_password': None,
- 'sasl_kerberos_service_name':'kafka'
+ 'sasl_kerberos_service_name': 'kafka'
}
- if gssapi is None:
- SASL_MECHANISMS = ('PLAIN',)
- else:
- SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
+ SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
+ SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
def __init__(self, host, port, afi, **configs):
self.hostname = host
@@ -213,6 +214,9 @@ class BrokerConnection(object):
(socket.SOL_SOCKET, socket.SO_SNDBUF,
self.config['send_buffer_bytes']))
+ assert self.config['security_protocol'] in self.SECURITY_PROTOCOLS, (
+ 'security_protcol must be in ' + ', '.join(self.SECURITY_PROTOCOLS))
+
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
assert ssl_available, "Python wasn't built with SSL support"
@@ -224,7 +228,7 @@ class BrokerConnection(object):
assert self.config['sasl_plain_password'] is not None, 'sasl_plain_password required for PLAIN sasl'
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
- assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_servicename_kafka required for GSSAPI sasl'
+ assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'
self.state = ConnectionStates.DISCONNECTED
self._reset_reconnect_backoff()
@@ -332,6 +336,7 @@ class BrokerConnection(object):
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
else:
+ # security_protocol PLAINTEXT
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
@@ -367,7 +372,6 @@ class BrokerConnection(object):
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self._try_authenticate():
- log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
log.debug('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
@@ -500,21 +504,21 @@ class BrokerConnection(object):
if data != b'\x00\x00\x00\x00':
return future.failure(Errors.AuthenticationFailedError())
+ log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
return future.success(True)
def _try_authenticate_gssapi(self, future):
-
data = b''
gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
- ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
+ ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
- ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
- #Exchange tokens until authentication either suceeded or failed:
+ ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
+ # Exchange tokens until authentication either succeeds or fails:
received_token = None
try:
while not ctx_Context.complete:
- #calculate the output token
+ # calculate the output token
try:
output_token = ctx_Context.step(received_token)
except GSSError as e:
@@ -533,10 +537,10 @@ class BrokerConnection(object):
size = Int32.encode(len(msg))
self._sock.sendall(size + msg)
- # The server will send a token back. processing of this token either
- # establishes a security context, or needs further token exchange
- # the gssapi will be able to identify the needed next step
- # The connection is closed on failure
+ # The server will send a token back. Processing of this token either
+ # establishes a security context, or it needs further token exchange.
+ # The gssapi will be able to identify the needed next step.
+ # The connection is closed on failure.
response = self._sock.recv(2000)
self._sock.setblocking(False)
@@ -546,7 +550,7 @@ class BrokerConnection(object):
future.failure(error)
self.close(error=error)
- #pass the received token back to gssapi, strip the first 4 bytes
+ # pass the received token back to gssapi, strip the first 4 bytes
received_token = response[4:]
except Exception as e:
@@ -555,6 +559,7 @@ class BrokerConnection(object):
future.failure(error)
self.close(error=error)
+ log.info('%s: Authenticated as %s', self, gssname)
return future.success(True)
def blacked_out(self):