summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2018-03-09 14:33:13 -0500
committerDana Powers <dana.powers@rd.io>2018-03-09 15:01:46 -0500
commit1b9843e69dfbdb139eec8fda9b8d779d80e41ea7 (patch)
treec7224a179ecdc67d8e08edba0d7dafddddb45e5e
parent1ffdd5caf7f10fb5372780cb9a5ac4a906cac342 (diff)
downloadkafka-python-ipv6_scope_id.tar.gz
Connect with sockaddrs to support non-zero ipv6 scope idsipv6_scope_id
-rw-r--r--kafka/conn.py23
-rw-r--r--test/test_conn.py27
2 files changed, 22 insertions, 28 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 4bbd744..e88499c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -215,9 +215,8 @@ class BrokerConnection(object):
self.host = host
self.port = port
self.afi = afi
- self._sock_ip = host
- self._sock_port = port
self._sock_afi = afi
+ self._sock_addr = None
self.in_flight_requests = collections.deque()
self._api_versions = None
@@ -279,13 +278,12 @@ class BrokerConnection(object):
return False
return True
- def _next_afi_host_port(self):
+ def _next_afi_sockaddr(self):
if not self._gai:
if not self._dns_lookup():
return
afi, _, __, ___, sockaddr = self._gai.pop(0)
- host, port = sockaddr[:2]
- return (afi, host, port)
+ return (afi, sockaddr)
def connect_blocking(self, timeout=float('inf')):
if self.connected():
@@ -327,13 +325,13 @@ class BrokerConnection(object):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
self.last_attempt = time.time()
- next_lookup = self._next_afi_host_port()
+ next_lookup = self._next_afi_sockaddr()
if not next_lookup:
self.close(Errors.ConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
- self._sock_afi, self._sock_ip, self._sock_port = next_lookup
+ self._sock_afi, self._sock_addr = next_lookup
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
for option in self.config['socket_options']:
@@ -348,9 +346,8 @@ class BrokerConnection(object):
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
- log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
- self.port, self._sock_ip, self._sock_port,
- AFI_NAMES[self._sock_afi])
+ log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
+ self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -358,7 +355,7 @@ class BrokerConnection(object):
request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
- ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
+ ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno
@@ -1009,9 +1006,9 @@ class BrokerConnection(object):
return version
def __str__(self):
- return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
+ return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % (
self.node_id, self.host, self.port, self.state,
- self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
+ AFI_NAMES[self._sock_afi], self._sock_addr)
class BrokerConnectionMetrics(object):
diff --git a/test/test_conn.py b/test/test_conn.py
index 44ee9ee..12a32ef 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -258,33 +258,31 @@ def test_lookup_on_connect():
assert conn.host == hostname
assert conn.port == port
assert conn.afi == socket.AF_UNSPEC
- ip1 = '127.0.0.1'
afi1 = socket.AF_INET
+ sockaddr1 = ('127.0.0.1', 9092)
mock_return1 = [
- (afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)),
+ (afi1, socket.SOCK_STREAM, 6, '', sockaddr1),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m:
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
- conn.close()
- assert conn._sock_ip == ip1
- assert conn._sock_port == 9092
assert conn._sock_afi == afi1
+ assert conn._sock_addr == sockaddr1
+ conn.close()
- ip2 = '::1'
afi2 = socket.AF_INET6
+ sockaddr2 = ('::1', 9092, 0, 0)
mock_return2 = [
- (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
+ (afi2, socket.SOCK_STREAM, 6, '', sockaddr2),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
- conn.close()
- assert conn._sock_ip == ip2
- assert conn._sock_port == 9092
assert conn._sock_afi == afi2
+ assert conn._sock_addr == sockaddr2
+ conn.close()
def test_relookup_on_failure():
@@ -300,17 +298,16 @@ def test_relookup_on_failure():
assert conn.disconnected()
assert conn.last_attempt > last_attempt
- ip2 = '127.0.0.2'
afi2 = socket.AF_INET
+ sockaddr2 = ('127.0.0.2', 9092)
mock_return2 = [
- (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)),
+ (afi2, socket.SOCK_STREAM, 6, '', sockaddr2),
]
with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m:
conn.last_attempt = 0
conn.connect()
m.assert_called_once_with(hostname, port, 0, 1)
- conn.close()
- assert conn._sock_ip == ip2
- assert conn._sock_port == 9092
assert conn._sock_afi == afi2
+ assert conn._sock_addr == sockaddr2
+ conn.close()