diff options
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | kafka/conn.py | 19 | ||||
-rw-r--r-- | kafka/consumer/group.py | 10 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 3 |
4 files changed, 22 insertions, 16 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index a90c0d4..602c0c1 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -548,7 +548,7 @@ class KafkaClient(object): task_future.success(result) # If we got a future that is already done, don't block in _poll - if future and future.is_done: + if future is not None and future.is_done: timeout = 0 else: idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() @@ -566,7 +566,7 @@ class KafkaClient(object): # If all we had was a timeout (future is None) - only do one poll # If we do have a future, we keep looping until it is done - if not future or future.is_done: + if future is None or future.is_done: break return responses @@ -678,7 +678,7 @@ class KafkaClient(object): conn = self._conns.get(node_id) connected = conn is not None and conn.connected() blacked_out = conn is not None and conn.blacked_out() - curr_inflight = len(conn.in_flight_requests) if conn else 0 + curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 if connected and curr_inflight == 0: # if we find an established connection # with no in-flight requests, we can stop right away diff --git a/kafka/conn.py b/kafka/conn.py index 9a9e786..4c21b8c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -544,20 +544,19 @@ class BrokerConnection(object): return future.success(True) def _try_authenticate_gssapi(self, future): - data = b'' - gssname = self.config['sasl_kerberos_service_name'] + '@' + self.hostname - ctx_Name = gssapi.Name(gssname, name_type=gssapi.NameType.hostbased_service) - ctx_CanonName = ctx_Name.canonicalize(gssapi.MechType.kerberos) - log.debug('%s: canonical Servicename: %s', self, ctx_CanonName) - ctx_Context = gssapi.SecurityContext(name=ctx_CanonName, usage='initiate') - log.debug("%s: initiator name: %s", self, ctx_Context.initiator_name) + gssapi_name = gssapi.Name( + self.config['sasl_kerberos_service_name'] + '@' + self.hostname, + name_type=gssapi.NameType.hostbased_service + ).canonicalize(gssapi.MechType.kerberos) + log.debug('%s: GSSAPI name: %s', self, gssapi_name) # Exchange tokens until authentication either succeeds or fails + client_ctx = gssapi.SecurityContext(name=gssapi_name, usage='initiate') received_token = None try: - while not ctx_Context.complete: + while not client_ctx.complete: # calculate an output token from kafka token (or None if first iteration) - output_token = ctx_Context.step(received_token) + output_token = client_ctx.step(received_token) # pass output token to kafka try: @@ -582,7 +581,7 @@ class BrokerConnection(object): except Exception as e: return future.failure(e) - log.info('%s: Authenticated as %s via GSSAPI', self, gssname) + log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) return future.success(True) def blacked_out(self): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index cbfd720..985a733 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -219,6 +219,8 @@ class KafkaConsumer(six.Iterator): Default: None sasl_plain_password (str): Password for sasl PLAIN authentication. Default: None + sasl_kerberos_service_name (str): Service name to include in GSSAPI + sasl mechanism handshake. Default: 'kafka' Note: Configuration parameters are described in more detail at @@ -274,6 +276,7 @@ class KafkaConsumer(six.Iterator): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_service_name': 'kafka' } def __init__(self, *topics, **configs): @@ -990,9 +993,10 @@ class KafkaConsumer(six.Iterator): partition and no offset reset policy is defined. """ # Lookup any positions for partitions which are awaiting reset (which may be the - # case if the user called seekToBeginning or seekToEnd. We do this check first to - # avoid an unnecessary lookup of committed offsets (which typically occurs when - # the user is manually assigning partitions and managing their own offsets). + # case if the user called :meth:`seek_to_beginning` or :meth:`seek_to_end`. We do + # this check first to avoid an unnecessary lookup of committed offsets (which + # typically occurs when the user is manually assigning partitions and managing + # their own offsets). self._fetcher.reset_offsets_if_needed(partitions) if not self._subscription.has_all_fetch_positions(): diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 5638b61..0ffc29c 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -263,6 +263,8 @@ class KafkaProducer(object): Default: None sasl_plain_password (str): password for sasl PLAIN authentication. Default: None + sasl_kerberos_service_name (str): Service name to include in GSSAPI + sasl mechanism handshake. Default: 'kafka' Note: Configuration parameters are described in more detail at @@ -309,6 +311,7 @@ class KafkaProducer(object): 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, + 'sasl_kerberos_service_name': 'kafka' } _COMPRESSORS = { |