summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py6
-rw-r--r--kafka/conn.py19
-rw-r--r--kafka/consumer/group.py10
-rw-r--r--kafka/producer/kafka.py3
4 files changed, 22 insertions, 16 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index a90c0d4..602c0c1 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -548,7 +548,7 @@ class KafkaClient(object):
task_future.success(result)
# If we got a future that is already done, don't block in _poll
- if future and future.is_done:
+ if future is not None and future.is_done:
timeout = 0
else:
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
@@ -566,7 +566,7 @@ class KafkaClient(object):
# If all we had was a timeout (future is None) - only do one poll
# If we do have a future, we keep looping until it is done
- if not future or future.is_done:
+ if future is None or future.is_done:
break
return responses
@@ -678,7 +678,7 @@ class KafkaClient(object):
conn = self._conns.get(node_id)
connected = conn is not None and conn.connected()
blacked_out = conn is not None and conn.blacked_out()
- curr_inflight = len(conn.in_flight_requests) if conn else 0
+ curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
if connected and curr_inflight == 0:
# if we find an established connection
# with no in-flight requests, we can stop right away
diff --git a/kafka/conn.py b/kafka/conn.py
index 9a9e786..4c21b8c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -544,20 +544,19 @@ class BrokerConnection(object):
return future.success(True)
def _try_authenticate_gssapi(self, future):
- data = b''
- gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
- ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service)
- ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos)
- log.debug('%s: canonical Servicename: %s', self, ctx_CanonName)
- ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate')
- log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name)
+ gssapi_name = gssapi.Name(
+ self.config['sasl_kerberos_service_name'] + '@' + self.hostname,
+ name_type=gssapi.NameType.hostbased_service
+ ).canonicalize(gssapi.MechType.kerberos)
+ log.debug('%s: GSSAPI name: %s', self, gssapi_name)
# Exchange tokens until authentication either succeeds or fails
+ client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate')
received_token = None
try:
- while not ctx_Context.complete:
+ while not client_ctx.complete:
# calculate an output token from kafka token (or None if first iteration)
- output_token = ctx_Context.step(received_token)
+ output_token = client_ctx.step(received_token)
# pass output token to kafka
try:
@@ -582,7 +581,7 @@ class BrokerConnection(object):
except Exception as e:
return future.failure(e)
- log.info('%s: Authenticated as %s via GSSAPI', self, gssname)
+ log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
return future.success(True)
def blacked_out(self):
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index cbfd720..985a733 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -219,6 +219,8 @@ class KafkaConsumer(six.Iterator):
Default: None
sasl_plain_password (str): Password for sasl PLAIN authentication.
Default: None
+ sasl_kerberos_service_name (str): Service name to include in GSSAPI
+ sasl mechanism handshake. Default: 'kafka'
Note:
Configuration parameters are described in more detail at
@@ -274,6 +276,7 @@ class KafkaConsumer(six.Iterator):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
+ 'sasl_kerberos_service_name': 'kafka'
}
def __init__(self, *topics, **configs):
@@ -990,9 +993,10 @@ class KafkaConsumer(six.Iterator):
partition and no offset reset policy is defined.
"""
# Lookup any positions for partitions which are awaiting reset (which may be the
- # case if the user called seekToBeginning or seekToEnd. We do this check first to
- # avoid an unnecessary lookup of committed offsets (which typically occurs when
- # the user is manually assigning partitions and managing their own offsets).
+ # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do
+ # this check first to avoid an unnecessary lookup of committed offsets (which
+ # typically occurs when the user is manually assigning partitions and managing
+ # their own offsets).
self._fetcher.reset_offsets_if_needed(partitions)
if not self._subscription.has_all_fetch_positions():
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 5638b61..0ffc29c 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -263,6 +263,8 @@ class KafkaProducer(object):
Default: None
sasl_plain_password (str): password for sasl PLAIN authentication.
Default: None
+ sasl_kerberos_service_name (str): Service name to include in GSSAPI
+ sasl mechanism handshake. Default: 'kafka'
Note:
Configuration parameters are described in more detail at
@@ -309,6 +311,7 @@ class KafkaProducer(object):
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
+ 'sasl_kerberos_service_name': 'kafka'
}
_COMPRESSORS = {