From cfa06f7380bc0bbe702151002d74d01476dcfe1b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 26 Jun 2016 19:28:59 -0700 Subject: Delete KafkaConnection class --- kafka/client.py | 3 +- kafka/conn.py | 187 ----------------------------------------- test/test_conn_legacy.py | 210 ----------------------------------------------- 3 files changed, 2 insertions(+), 398 deletions(-) delete mode 100644 test/test_conn_legacy.py diff --git a/kafka/client.py b/kafka/client.py index 8a34cc4..056d623 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -15,7 +15,7 @@ from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError, from kafka.structs import TopicPartition, BrokerMetadata from kafka.conn import ( - collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS, + collect_hosts, BrokerConnection, ConnectionStates, get_ip_port_afi) from kafka.protocol import KafkaProtocol @@ -32,6 +32,7 @@ log = logging.getLogger(__name__) class SimpleClient(object): CLIENT_ID = b'kafka-python' + DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a diff --git a/kafka/conn.py b/kafka/conn.py index 38829c6..5489d1f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,10 +6,7 @@ import io from random import shuffle import socket import ssl -import struct -from threading import local import time -import warnings import six @@ -27,7 +24,6 @@ if six.PY2: log = logging.getLogger(__name__) -DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 # support older ssl libraries @@ -745,186 +741,3 @@ def collect_hosts(hosts, randomize=True): shuffle(result) return result - - -class KafkaConnection(local): - """A socket connection to a single Kafka broker - - Arguments: - host: the host name or IP address of a kafka broker - port: the port number the kafka broker is listening on - timeout: default 120. The socket timeout for sending and receiving data - in seconds. None means no timeout, so a request can block forever. - """ - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): - warnings.warn('KafkaConnection has been deprecated and will be' - ' removed in a future release', DeprecationWarning) - super(KafkaConnection, self).__init__() - self.host = host - self.port = port - self.timeout = timeout - self._sock = None - - self.reinit() - - def __getnewargs__(self): - return (self.host, self.port, self.timeout) - - def __repr__(self): - return "" % (self.host, self.port) - - ################### - # Private API # - ################### - - def _raise_connection_error(self): - # Cleanup socket if we have one - if self._sock: - self.close() - - # And then raise - raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) - - def _read_bytes(self, num_bytes): - bytes_left = num_bytes - responses = [] - - log.debug("About to read %d bytes from Kafka", num_bytes) - - # Make sure we have a connection - if not self._sock: - self.reinit() - - while bytes_left: - - try: - # pylint: disable-msg=no-member - data = self._sock.recv(min(bytes_left, 4096)) - - # Receiving empty string from recv signals - # that the socket is in error. we will never get - # more data from this socket - if data == b'': - raise socket.error("Not enough data to read message -- did server kill socket?") - - except socket.error: - log.exception('Unable to receive data from Kafka') - self._raise_connection_error() - - bytes_left -= len(data) - log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) - responses.append(data) - - return b''.join(responses) - - ################## - # Public API # - ################## - - # TODO multiplex socket communication to allow for multi-threaded clients - - def get_connected_socket(self): - if not self._sock: - self.reinit() - return self._sock - - def send(self, request_id, payload): - """ - Send a request to Kafka - - Arguments:: - request_id (int): can be any int (used only for debug logging...) - payload: an encoded kafka packet (see KafkaProtocol) - """ - - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) - - # Make sure we have a connection - if not self._sock: - self.reinit() - - try: - # pylint: disable-msg=no-member - self._sock.sendall(payload) - except socket.error: - log.exception('Unable to send payload to Kafka') - self._raise_connection_error() - - def recv(self, request_id): - """ - Get a response packet from Kafka - - Arguments: - request_id: can be any int (only used for debug logging...) - - Returns: - str: Encoded kafka packet response from server - """ - log.debug("Reading response %d from Kafka" % request_id) - - # Make sure we have a connection - if not self._sock: - self.reinit() - - # Read the size off of the header - resp = self._read_bytes(4) - (size,) = struct.unpack('>i', resp) - - # Read the remainder of the response - resp = self._read_bytes(size) - return resp - - def copy(self): - """ - Create an inactive copy of the connection object, suitable for - passing to a background thread. - - The returned copy is not connected; you must call reinit() before - using. - """ - c = copy.deepcopy(self) - # Python 3 doesn't copy custom attributes of the threadlocal subclass - c.host = copy.copy(self.host) - c.port = copy.copy(self.port) - c.timeout = copy.copy(self.timeout) - c._sock = None - return c - - def close(self): - """ - Shutdown and close the connection socket - """ - log.debug("Closing socket connection for %s:%d" % (self.host, self.port)) - if self._sock: - # Call shutdown to be a good TCP client - # But expect an error if the socket has already been - # closed by the server - try: - # pylint: disable-msg=no-member - self._sock.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - - # Closing the socket should always succeed - self._sock.close() - self._sock = None - else: - log.debug("No socket found to close!") - - def reinit(self): - """ - Re-initialize the socket connection - close current socket (if open) - and start a fresh connection - raise ConnectionError on error - """ - log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) - - if self._sock: - self.close() - - try: - self._sock = socket.create_connection((self.host, self.port), self.timeout) - except socket.error: - log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) - self._raise_connection_error() diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py deleted file mode 100644 index ca3b17a..0000000 --- a/test/test_conn_legacy.py +++ /dev/null @@ -1,210 +0,0 @@ -import socket -import struct -from threading import Thread -import time - -import mock -from . import unittest - -from kafka.errors import ConnectionError -from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS -from test.testutil import Timer - - -class ConnTest(unittest.TestCase): - def setUp(self): - - self.config = { - 'host': 'localhost', - 'port': 9090, - 'request_id': 0, - 'payload': b'test data', - 'payload2': b'another packet' - } - - # Mocking socket.create_connection will cause _sock to always be a - # MagicMock() - patcher = mock.patch('socket.create_connection', spec=True) - self.MockCreateConn = patcher.start() - self.addCleanup(patcher.stop) - - # Also mock socket.sendall() to appear successful - self.MockCreateConn().sendall.return_value = None - - # And mock socket.recv() to return two payloads, then '', then raise - # Note that this currently ignores the num_bytes parameter to sock.recv() - payload_size = len(self.config['payload']) - payload2_size = len(self.config['payload2']) - self.MockCreateConn().recv.side_effect = [ - struct.pack('>i', payload_size), - struct.pack('>%ds' % payload_size, self.config['payload']), - struct.pack('>i', payload2_size), - struct.pack('>%ds' % payload2_size, self.config['payload2']), - b'' - ] - - # Create a connection object - self.conn = KafkaConnection(self.config['host'], self.config['port']) - - # Reset any mock counts caused by __init__ - self.MockCreateConn.reset_mock() - - def test_send(self): - self.conn.send(self.config['request_id'], self.config['payload']) - self.conn._sock.sendall.assert_called_with(self.config['payload']) - - def test_init_creates_socket_connection(self): - KafkaConnection(self.config['host'], self.config['port']) - self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS) - - def test_init_failure_raises_connection_error(self): - - def raise_error(*args): - raise socket.error - - assert socket.create_connection is self.MockCreateConn - socket.create_connection.side_effect=raise_error - with self.assertRaises(ConnectionError): - KafkaConnection(self.config['host'], self.config['port']) - - def test_send__reconnects_on_dirty_conn(self): - - # Dirty the connection - try: - self.conn._raise_connection_error() - except ConnectionError: - pass - - # Now test that sending attempts to reconnect - self.assertEqual(self.MockCreateConn.call_count, 0) - self.conn.send(self.config['request_id'], self.config['payload']) - self.assertEqual(self.MockCreateConn.call_count, 1) - - def test_send__failure_sets_dirty_connection(self): - - def raise_error(*args): - raise socket.error - - assert isinstance(self.conn._sock, mock.Mock) - self.conn._sock.sendall.side_effect=raise_error - try: - self.conn.send(self.config['request_id'], self.config['payload']) - except ConnectionError: - self.assertIsNone(self.conn._sock) - - def test_recv(self): - - self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) - - def test_recv__reconnects_on_dirty_conn(self): - - # Dirty the connection - try: - self.conn._raise_connection_error() - except ConnectionError: - pass - - # Now test that recv'ing attempts to reconnect - self.assertEqual(self.MockCreateConn.call_count, 0) - self.conn.recv(self.config['request_id']) - self.assertEqual(self.MockCreateConn.call_count, 1) - - def test_recv__failure_sets_dirty_connection(self): - - def raise_error(*args): - raise socket.error - - # test that recv'ing attempts to reconnect - assert isinstance(self.conn._sock, mock.Mock) - self.conn._sock.recv.side_effect=raise_error - try: - self.conn.recv(self.config['request_id']) - except ConnectionError: - self.assertIsNone(self.conn._sock) - - def test_recv__doesnt_consume_extra_data_in_stream(self): - - # Here just test that each call to recv will return a single payload - self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload']) - self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2']) - - def test_get_connected_socket(self): - s = self.conn.get_connected_socket() - - self.assertEqual(s, self.MockCreateConn()) - - def test_get_connected_socket_on_dirty_conn(self): - # Dirty the connection - try: - self.conn._raise_connection_error() - except ConnectionError: - pass - - # Test that get_connected_socket tries to connect - self.assertEqual(self.MockCreateConn.call_count, 0) - self.conn.get_connected_socket() - self.assertEqual(self.MockCreateConn.call_count, 1) - - def test_close__object_is_reusable(self): - - # test that sending to a closed connection - # will re-connect and send data to the socket - self.conn.close() - self.conn.send(self.config['request_id'], self.config['payload']) - self.assertEqual(self.MockCreateConn.call_count, 1) - self.conn._sock.sendall.assert_called_with(self.config['payload']) - - -class TestKafkaConnection(unittest.TestCase): - @mock.patch('socket.create_connection') - def test_copy(self, socket): - """KafkaConnection copies work as expected""" - - conn = KafkaConnection('kafka', 9092) - self.assertEqual(socket.call_count, 1) - - copy = conn.copy() - self.assertEqual(socket.call_count, 1) - self.assertEqual(copy.host, 'kafka') - self.assertEqual(copy.port, 9092) - self.assertEqual(copy._sock, None) - - copy.reinit() - self.assertEqual(socket.call_count, 2) - self.assertNotEqual(copy._sock, None) - - @mock.patch('socket.create_connection') - def test_copy_thread(self, socket): - """KafkaConnection copies work in other threads""" - - err = [] - copy = KafkaConnection('kafka', 9092).copy() - - def thread_func(err, copy): - try: - self.assertEqual(copy.host, 'kafka') - self.assertEqual(copy.port, 9092) - self.assertNotEqual(copy._sock, None) - except Exception as e: - err.append(e) - else: - err.append(None) - thread = Thread(target=thread_func, args=(err, copy)) - thread.start() - thread.join() - - self.assertEqual(err, [None]) - self.assertEqual(socket.call_count, 2) - - def test_timeout(self): - def _timeout(*args, **kwargs): - timeout = args[1] - time.sleep(timeout) - raise socket.timeout - - with mock.patch.object(socket, "create_connection", side_effect=_timeout): - - with Timer() as t: - with self.assertRaises(ConnectionError): - KafkaConnection("nowhere", 1234, 1.0) - self.assertGreaterEqual(t.interval, 1.0) -- cgit v1.2.1