summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-05-10 16:12:19 -0700
committerJeff Widman <jeff@jeffwidman.com>2018-05-23 15:19:01 -0700
commit11cf3973bfc64ab0b4e471fc56dae911df1ec8d9 (patch)
tree474937a8c01bb32a3b12a944d9a9ad6b32d81800
parent9221fcf83528b5c3657e43636cb84c1d18025acd (diff)
downloadkafka-python-11cf3973bfc64ab0b4e471fc56dae911df1ec8d9.tar.gz
Stop shadowing `ConnectionError`
In Python3, `ConnectionError` is a native exception. So rename our custom one to `KafkaConnectionError` to prevent accidentally shadowing the native one. Note that there are still valid uses of `ConnectionError` in this code. They already expect a native Python3 `ConnectionError`, and also already handle the Python2 compatibility issues.
-rw-r--r--kafka/client.py12
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py28
-rw-r--r--kafka/errors.py6
-rw-r--r--kafka/producer/base.py1
-rw-r--r--test/test_client.py2
-rw-r--r--test/test_conn.py4
-rw-r--r--test/test_failover_integration.py6
8 files changed, 30 insertions, 31 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 10b1724..789d4da 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -11,7 +11,7 @@ import select
from kafka.vendor import six
import kafka.errors
-from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
+from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
@@ -73,7 +73,7 @@ class SimpleClient(object):
conn = self._conns[host_key]
if not conn.connect_blocking(self.timeout):
conn.close()
- raise ConnectionError("%s:%s (%s)" % (host, port, afi))
+ raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi))
return conn
def _get_leader_for_partition(self, topic, partition):
@@ -156,7 +156,7 @@ class SimpleClient(object):
for (host, port, afi) in hosts:
try:
conn = self._get_conn(host, port, afi)
- except ConnectionError:
+ except KafkaConnectionError:
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
host, port, afi)
continue
@@ -242,7 +242,7 @@ class SimpleClient(object):
host, port, afi = get_ip_port_afi(broker.host)
try:
conn = self._get_conn(host, broker.port, afi)
- except ConnectionError:
+ except KafkaConnectionError:
refresh_metadata = True
failed_payloads(broker_payloads)
continue
@@ -344,8 +344,8 @@ class SimpleClient(object):
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
- except ConnectionError as e:
- log.warning('ConnectionError attempting to send request %s '
+ except KafkaConnectionError as e:
+ log.warning('KafkaConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)
for payload in payloads:
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 9556eca..a9704fa 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -602,7 +602,7 @@ class KafkaClient(object):
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
pass
- conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
+ conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
self._idle_expiry_manager.update(conn.node_id)
diff --git a/kafka/conn.py b/kafka/conn.py
index daaa234..f67edfb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -327,7 +327,7 @@ class BrokerConnection(object):
self.last_attempt = time.time()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
- self.close(Errors.ConnectionError('DNS failure'))
+ self.close(Errors.KafkaConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
@@ -381,12 +381,12 @@ class BrokerConnection(object):
log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret)
errstr = errno.errorcode.get(ret, 'UNKNOWN')
- self.close(Errors.ConnectionError('{} {}'.format(ret, errstr)))
+ self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
# Connection timed out
elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
- self.close(Errors.ConnectionError('timeout'))
+ self.close(Errors.KafkaConnectionError('timeout'))
# Needs retry
else:
@@ -463,7 +463,7 @@ class BrokerConnection(object):
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
- self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
+ self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
return False
@@ -488,7 +488,7 @@ class BrokerConnection(object):
return False
elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
- if not isinstance(ex, Errors.ConnectionError):
+ if not isinstance(ex, Errors.KafkaConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()
@@ -558,8 +558,8 @@ class BrokerConnection(object):
data = self._recv_bytes_blocking(4)
except ConnectionError as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
@@ -621,7 +621,7 @@ class BrokerConnection(object):
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
except Exception as e:
@@ -701,7 +701,7 @@ class BrokerConnection(object):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
- Default: kafka.errors.ConnectionError.
+ Default: kafka.errors.KafkaConnectionError.
"""
if self.state is ConnectionStates.DISCONNECTED:
if error is not None:
@@ -733,7 +733,7 @@ class BrokerConnection(object):
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
elif not self.connected():
- return future.failure(Errors.ConnectionError(str(self)))
+ return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request)
@@ -753,7 +753,7 @@ class BrokerConnection(object):
self._sensors.bytes_sent.record(total_bytes)
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
@@ -781,7 +781,7 @@ class BrokerConnection(object):
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
- self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
+ self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
return ()
elif not self.in_flight_requests:
@@ -821,7 +821,7 @@ class BrokerConnection(object):
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
- self.close(error=Errors.ConnectionError('socket disconnected'))
+ self.close(error=Errors.KafkaConnectionError('socket disconnected'))
return []
else:
recvd.append(data)
@@ -833,7 +833,7 @@ class BrokerConnection(object):
break
log.exception('%s: Error receiving network data'
' closing socket', self)
- self.close(error=Errors.ConnectionError(e))
+ self.close(error=Errors.KafkaConnectionError(e))
return []
except BlockingIOError:
if six.PY3:
diff --git a/kafka/errors.py b/kafka/errors.py
index c70853c..f4c8740 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -447,7 +447,7 @@ class FailedPayloadsError(KafkaError):
self.payload = payload
-class ConnectionError(KafkaError):
+class KafkaConnectionError(KafkaError):
retriable = True
invalid_metadata = True
@@ -517,13 +517,13 @@ def check_error(response):
RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
- ConnectionError, FailedPayloadsError
+ KafkaConnectionError, FailedPayloadsError
)
RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- LeaderNotAvailableError, ConnectionError
+ LeaderNotAvailableError, KafkaConnectionError
)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e8d6c3d..c9dd6c3 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -372,7 +372,6 @@ class Producer(object):
Raises:
FailedPayloadsError: low-level connection error, can be caused by
networking failures, or a malformed request.
- ConnectionError:
KafkaUnavailableError: all known brokers are down when attempting
to refresh metadata.
LeaderNotAvailableError: topic or partition is initializing or
diff --git a/test/test_client.py b/test/test_client.py
index d02c621..c53983c 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -8,7 +8,7 @@ from . import unittest
from kafka import SimpleClient
from kafka.errors import (
KafkaUnavailableError, LeaderNotAvailableError, KafkaTimeoutError,
- UnknownTopicOrPartitionError, ConnectionError, FailedPayloadsError)
+ UnknownTopicOrPartitionError, FailedPayloadsError)
from kafka.future import Future
from kafka.protocol import KafkaProtocol, create_message
from kafka.protocol.metadata import MetadataResponse
diff --git a/test/test_conn.py b/test/test_conn.py
index 12a32ef..fbdeeb9 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -99,7 +99,7 @@ def test_send_disconnected(conn):
conn.state = ConnectionStates.DISCONNECTED
f = conn.send('foobar')
assert f.failed() is True
- assert isinstance(f.exception, Errors.ConnectionError)
+ assert isinstance(f.exception, Errors.KafkaConnectionError)
def test_send_connecting(conn):
@@ -162,7 +162,7 @@ def test_send_error(_socket, conn):
_socket.send.side_effect = socket.error
f = conn.send(req)
assert f.failed() is True
- assert isinstance(f.exception, Errors.ConnectionError)
+ assert isinstance(f.exception, Errors.KafkaConnectionError)
assert _socket.close.call_count == 1
assert conn.state is ConnectionStates.DISCONNECTED
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 797e1c8..ad7dcb9 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -4,7 +4,7 @@ import time
from kafka import SimpleClient, SimpleConsumer, KeyedProducer
from kafka.errors import (
- FailedPayloadsError, ConnectionError, RequestTimedOutError,
+ FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError)
from kafka.producer.base import Producer
from kafka.structs import TopicPartition
@@ -79,7 +79,7 @@ class TestFailover(KafkaIntegrationTestCase):
producer.send_messages(topic, partition, b'success')
log.debug("success!")
recovered = True
- except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
+ except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError):
log.debug("caught exception sending message -- will retry")
continue
@@ -167,7 +167,7 @@ class TestFailover(KafkaIntegrationTestCase):
producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0:
recovered = True
- except (FailedPayloadsError, ConnectionError, RequestTimedOutError,
+ except (FailedPayloadsError, KafkaConnectionError, RequestTimedOutError,
NotLeaderForPartitionError):
log.debug("caught exception sending message -- will retry")
continue