summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2018-03-09 10:17:43 -0500
committerGitHub <noreply@github.com>2018-03-09 10:17:43 -0500
commitce96752c3d4ca53222aebe1f824a47865bcb3aff (patch)
tree276a7cef95e35efc4b89d85edf91bd9edf0baba7
parent4abdb1baea2468408c36cc983dfef1e8b8f54654 (diff)
downloadkafka-python-ce96752c3d4ca53222aebe1f824a47865bcb3aff.tar.gz
Make BrokerConnection .host / .port / .afi immutable, use _sock_* attributes for current lookups (#1422)
-rw-r--r--kafka/conn.py40
-rw-r--r--test/test_conn.py29
2 files changed, 45 insertions, 24 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d778c31..798f85a 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -78,6 +78,14 @@ except ImportError:
gssapi = None
GSSError = None
+
+AFI_NAMES = {
+ socket.AF_UNSPEC: "unspecified",
+ socket.AF_INET: "IPv4",
+ socket.AF_INET6: "IPv6",
+}
+
+
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
@@ -204,13 +212,12 @@ class BrokerConnection(object):
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
def __init__(self, host, port, afi, **configs):
- self.hostname = host
self.host = host
self.port = port
self.afi = afi
- self._init_host = host
- self._init_port = port
- self._init_afi = afi
+ self._sock_ip = host
+ self._sock_port = port
+ self._sock_afi = afi
self.in_flight_requests = collections.deque()
self._api_versions = None
@@ -266,10 +273,10 @@ class BrokerConnection(object):
def _next_afi_host_port(self):
if not self._gai:
- self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
+ self._gai = dns_lookup(self.host, self.port, self.afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
- self._init_host, self._init_port, self._init_afi)
+ self.host, self.port, self.afi)
return
afi, _, __, ___, sockaddr = self._gai.pop(0)
@@ -286,8 +293,8 @@ class BrokerConnection(object):
return
else:
log.debug('%s: creating new socket', self)
- self.afi, self.host, self.port = next_lookup
- self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
+ self._sock_afi, self._sock_ip, self._sock_port = next_lookup
+ self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
@@ -301,7 +308,9 @@ class BrokerConnection(object):
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
- log.info('%s: connecting to %s:%d', self, self.host, self.port)
+ log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
+ self.port, self._sock_ip, self._sock_port,
+ AFI_NAMES[self._sock_afi])
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -309,7 +318,7 @@ class BrokerConnection(object):
request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
- ret = self._sock.connect_ex((self.host, self.port))
+ ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
except socket.error as err:
ret = err.errno
@@ -400,7 +409,7 @@ class BrokerConnection(object):
try:
self._sock = self._ssl_context.wrap_socket(
self._sock,
- server_hostname=self.hostname,
+ server_hostname=self.host,
do_handshake_on_connect=False)
except ssl.SSLError as e:
log.exception('%s: Failed to wrap socket in SSLContext!', self)
@@ -524,7 +533,7 @@ class BrokerConnection(object):
return future.success(True)
def _try_authenticate_gssapi(self, future):
- auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
+ auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host
gssapi_name = gssapi.Name(
auth_id,
name_type=gssapi.NameType.hostbased_service
@@ -962,9 +971,10 @@ class BrokerConnection(object):
self.config[key] = stashed[key]
return version
- def __repr__(self):
- return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
- self.node_id, self.hostname, self.host, self.port)
+ def __str__(self):
+ return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
+ self.node_id, self.host, self.port, self.state,
+ self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
class BrokerConnectionMetrics(object):
diff --git a/test/test_conn.py b/test/test_conn.py
index f35cebe..44ee9ee 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -255,20 +255,26 @@ def test_lookup_on_connect():
hostname = 'example.org'
port = 9092
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
- assert conn.host == conn.hostname == hostname
+ assert conn.host == hostname
+ assert conn.port == port
+ assert conn.afi == socket.AF_UNSPEC
ip1 = '127.0.0.1'
+ afi1 = socket.AF_INET
mock_return1 = [
- (2, 2, 17, '', (ip1, 9092)),
+ (afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
- assert conn.host == ip1
+ assert conn._sock_ip == ip1
+ assert conn._sock_port == 9092
+ assert conn._sock_afi == afi1
- ip2 = '127.0.0.2'
+ ip2 = '::1'
+ afi2 = socket.AF_INET6
mock_return2 = [
- (2, 2, 17, '', (ip2, 9092)),
+ (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
@@ -276,14 +282,16 @@ def test_lookup_on_connect():
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
- assert conn.host == ip2
+ assert conn._sock_ip == ip2
+ assert conn._sock_port == 9092
+ assert conn._sock_afi == afi2
def test_relookup_on_failure():
hostname = 'example.org'
port = 9092
conn = BrokerConnection(hostname, port, socket.AF_UNSPEC)
- assert conn.host == conn.hostname == hostname
+ assert conn.host == hostname
mock_return1 = []
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
last_attempt = conn.last_attempt
@@ -293,8 +301,9 @@ def test_relookup_on_failure():
assert conn.last_attempt > last_attempt
ip2 = '127.0.0.2'
+ afi2 = socket.AF_INET
mock_return2 = [
- (2, 2, 17, '', (ip2, 9092)),
+ (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
@@ -302,4 +311,6 @@ def test_relookup_on_failure():
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
conn.close()
- assert conn.host == ip2
+ assert conn._sock_ip == ip2
+ assert conn._sock_port == 9092
+ assert conn._sock_afi == afi2