summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-06-26 19:28:59 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-17 09:48:03 -0700
commitcfa06f7380bc0bbe702151002d74d01476dcfe1b (patch)
tree1982ae2e9380b1763a363dfc7d5dbb470b611b56
parentade3160a4b954f5460f4a0aa34d4664d07a0e378 (diff)
downloadkafka-python-conn_legacy.tar.gz
Delete KafkaConnection classconn_legacy
-rw-r--r--kafka/client.py3
-rw-r--r--kafka/conn.py187
-rw-r--r--test/test_conn_legacy.py210
3 files changed, 2 insertions, 398 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 8a34cc4..056d623 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -15,7 +15,7 @@ from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
from kafka.structs import TopicPartition, BrokerMetadata
from kafka.conn import (
- collect_hosts, BrokerConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS,
+ collect_hosts, BrokerConnection,
ConnectionStates, get_ip_port_afi)
from kafka.protocol import KafkaProtocol
@@ -32,6 +32,7 @@ log = logging.getLogger(__name__)
class SimpleClient(object):
CLIENT_ID = b'kafka-python'
+ DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
diff --git a/kafka/conn.py b/kafka/conn.py
index 38829c6..5489d1f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -6,10 +6,7 @@ import io
from random import shuffle
import socket
import ssl
-import struct
-from threading import local
import time
-import warnings
import six
@@ -27,7 +24,6 @@ if six.PY2:
log = logging.getLogger(__name__)
-DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092
# support older ssl libraries
@@ -745,186 +741,3 @@ def collect_hosts(hosts, randomize=True):
shuffle(result)
return result
-
-
-class KafkaConnection(local):
- """A socket connection to a single Kafka broker
-
- Arguments:
- host: the host name or IP address of a kafka broker
- port: the port number the kafka broker is listening on
- timeout: default 120. The socket timeout for sending and receiving data
- in seconds. None means no timeout, so a request can block forever.
- """
- def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
- warnings.warn('KafkaConnection has been deprecated and will be'
- ' removed in a future release', DeprecationWarning)
- super(KafkaConnection, self).__init__()
- self.host = host
- self.port = port
- self.timeout = timeout
- self._sock = None
-
- self.reinit()
-
- def __getnewargs__(self):
- return (self.host, self.port, self.timeout)
-
- def __repr__(self):
- return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
-
- ###################
- # Private API #
- ###################
-
- def _raise_connection_error(self):
- # Cleanup socket if we have one
- if self._sock:
- self.close()
-
- # And then raise
- raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
-
- def _read_bytes(self, num_bytes):
- bytes_left = num_bytes
- responses = []
-
- log.debug("About to read %d bytes from Kafka", num_bytes)
-
- # Make sure we have a connection
- if not self._sock:
- self.reinit()
-
- while bytes_left:
-
- try:
- # pylint: disable-msg=no-member
- data = self._sock.recv(min(bytes_left, 4096))
-
- # Receiving empty string from recv signals
- # that the socket is in error. we will never get
- # more data from this socket
- if data == b'':
- raise socket.error("Not enough data to read message -- did server kill socket?")
-
- except socket.error:
- log.exception('Unable to receive data from Kafka')
- self._raise_connection_error()
-
- bytes_left -= len(data)
- log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
- responses.append(data)
-
- return b''.join(responses)
-
- ##################
- # Public API #
- ##################
-
- # TODO multiplex socket communication to allow for multi-threaded clients
-
- def get_connected_socket(self):
- if not self._sock:
- self.reinit()
- return self._sock
-
- def send(self, request_id, payload):
- """
- Send a request to Kafka
-
- Arguments::
- request_id (int): can be any int (used only for debug logging...)
- payload: an encoded kafka packet (see KafkaProtocol)
- """
-
- log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id))
-
- # Make sure we have a connection
- if not self._sock:
- self.reinit()
-
- try:
- # pylint: disable-msg=no-member
- self._sock.sendall(payload)
- except socket.error:
- log.exception('Unable to send payload to Kafka')
- self._raise_connection_error()
-
- def recv(self, request_id):
- """
- Get a response packet from Kafka
-
- Arguments:
- request_id: can be any int (only used for debug logging...)
-
- Returns:
- str: Encoded kafka packet response from server
- """
- log.debug("Reading response %d from Kafka" % request_id)
-
- # Make sure we have a connection
- if not self._sock:
- self.reinit()
-
- # Read the size off of the header
- resp = self._read_bytes(4)
- (size,) = struct.unpack('>i', resp)
-
- # Read the remainder of the response
- resp = self._read_bytes(size)
- return resp
-
- def copy(self):
- """
- Create an inactive copy of the connection object, suitable for
- passing to a background thread.
-
- The returned copy is not connected; you must call reinit() before
- using.
- """
- c = copy.deepcopy(self)
- # Python 3 doesn't copy custom attributes of the threadlocal subclass
- c.host = copy.copy(self.host)
- c.port = copy.copy(self.port)
- c.timeout = copy.copy(self.timeout)
- c._sock = None
- return c
-
- def close(self):
- """
- Shutdown and close the connection socket
- """
- log.debug("Closing socket connection for %s:%d" % (self.host, self.port))
- if self._sock:
- # Call shutdown to be a good TCP client
- # But expect an error if the socket has already been
- # closed by the server
- try:
- # pylint: disable-msg=no-member
- self._sock.shutdown(socket.SHUT_RDWR)
- except socket.error:
- pass
-
- # Closing the socket should always succeed
- self._sock.close()
- self._sock = None
- else:
- log.debug("No socket found to close!")
-
- def reinit(self):
- """
- Re-initialize the socket connection
- close current socket (if open)
- and start a fresh connection
- raise ConnectionError on error
- """
- log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port))
-
- if self._sock:
- self.close()
-
- try:
- self._sock = socket.create_connection((self.host, self.port), self.timeout)
- except socket.error:
- log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port))
- self._raise_connection_error()
diff --git a/test/test_conn_legacy.py b/test/test_conn_legacy.py
deleted file mode 100644
index ca3b17a..0000000
--- a/test/test_conn_legacy.py
+++ /dev/null
@@ -1,210 +0,0 @@
-import socket
-import struct
-from threading import Thread
-import time
-
-import mock
-from . import unittest
-
-from kafka.errors import ConnectionError
-from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
-from test.testutil import Timer
-
-
-class ConnTest(unittest.TestCase):
- def setUp(self):
-
- self.config = {
- 'host': 'localhost',
- 'port': 9090,
- 'request_id': 0,
- 'payload': b'test data',
- 'payload2': b'another packet'
- }
-
- # Mocking socket.create_connection will cause _sock to always be a
- # MagicMock()
- patcher = mock.patch('socket.create_connection', spec=True)
- self.MockCreateConn = patcher.start()
- self.addCleanup(patcher.stop)
-
- # Also mock socket.sendall() to appear successful
- self.MockCreateConn().sendall.return_value = None
-
- # And mock socket.recv() to return two payloads, then '', then raise
- # Note that this currently ignores the num_bytes parameter to sock.recv()
- payload_size = len(self.config['payload'])
- payload2_size = len(self.config['payload2'])
- self.MockCreateConn().recv.side_effect = [
- struct.pack('>i', payload_size),
- struct.pack('>%ds' % payload_size, self.config['payload']),
- struct.pack('>i', payload2_size),
- struct.pack('>%ds' % payload2_size, self.config['payload2']),
- b''
- ]
-
- # Create a connection object
- self.conn = KafkaConnection(self.config['host'], self.config['port'])
-
- # Reset any mock counts caused by __init__
- self.MockCreateConn.reset_mock()
-
- def test_send(self):
- self.conn.send(self.config['request_id'], self.config['payload'])
- self.conn._sock.sendall.assert_called_with(self.config['payload'])
-
- def test_init_creates_socket_connection(self):
- KafkaConnection(self.config['host'], self.config['port'])
- self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS)
-
- def test_init_failure_raises_connection_error(self):
-
- def raise_error(*args):
- raise socket.error
-
- assert socket.create_connection is self.MockCreateConn
- socket.create_connection.side_effect=raise_error
- with self.assertRaises(ConnectionError):
- KafkaConnection(self.config['host'], self.config['port'])
-
- def test_send__reconnects_on_dirty_conn(self):
-
- # Dirty the connection
- try:
- self.conn._raise_connection_error()
- except ConnectionError:
- pass
-
- # Now test that sending attempts to reconnect
- self.assertEqual(self.MockCreateConn.call_count, 0)
- self.conn.send(self.config['request_id'], self.config['payload'])
- self.assertEqual(self.MockCreateConn.call_count, 1)
-
- def test_send__failure_sets_dirty_connection(self):
-
- def raise_error(*args):
- raise socket.error
-
- assert isinstance(self.conn._sock, mock.Mock)
- self.conn._sock.sendall.side_effect=raise_error
- try:
- self.conn.send(self.config['request_id'], self.config['payload'])
- except ConnectionError:
- self.assertIsNone(self.conn._sock)
-
- def test_recv(self):
-
- self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
-
- def test_recv__reconnects_on_dirty_conn(self):
-
- # Dirty the connection
- try:
- self.conn._raise_connection_error()
- except ConnectionError:
- pass
-
- # Now test that recv'ing attempts to reconnect
- self.assertEqual(self.MockCreateConn.call_count, 0)
- self.conn.recv(self.config['request_id'])
- self.assertEqual(self.MockCreateConn.call_count, 1)
-
- def test_recv__failure_sets_dirty_connection(self):
-
- def raise_error(*args):
- raise socket.error
-
- # test that recv'ing attempts to reconnect
- assert isinstance(self.conn._sock, mock.Mock)
- self.conn._sock.recv.side_effect=raise_error
- try:
- self.conn.recv(self.config['request_id'])
- except ConnectionError:
- self.assertIsNone(self.conn._sock)
-
- def test_recv__doesnt_consume_extra_data_in_stream(self):
-
- # Here just test that each call to recv will return a single payload
- self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
- self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
-
- def test_get_connected_socket(self):
- s = self.conn.get_connected_socket()
-
- self.assertEqual(s, self.MockCreateConn())
-
- def test_get_connected_socket_on_dirty_conn(self):
- # Dirty the connection
- try:
- self.conn._raise_connection_error()
- except ConnectionError:
- pass
-
- # Test that get_connected_socket tries to connect
- self.assertEqual(self.MockCreateConn.call_count, 0)
- self.conn.get_connected_socket()
- self.assertEqual(self.MockCreateConn.call_count, 1)
-
- def test_close__object_is_reusable(self):
-
- # test that sending to a closed connection
- # will re-connect and send data to the socket
- self.conn.close()
- self.conn.send(self.config['request_id'], self.config['payload'])
- self.assertEqual(self.MockCreateConn.call_count, 1)
- self.conn._sock.sendall.assert_called_with(self.config['payload'])
-
-
-class TestKafkaConnection(unittest.TestCase):
- @mock.patch('socket.create_connection')
- def test_copy(self, socket):
- """KafkaConnection copies work as expected"""
-
- conn = KafkaConnection('kafka', 9092)
- self.assertEqual(socket.call_count, 1)
-
- copy = conn.copy()
- self.assertEqual(socket.call_count, 1)
- self.assertEqual(copy.host, 'kafka')
- self.assertEqual(copy.port, 9092)
- self.assertEqual(copy._sock, None)
-
- copy.reinit()
- self.assertEqual(socket.call_count, 2)
- self.assertNotEqual(copy._sock, None)
-
- @mock.patch('socket.create_connection')
- def test_copy_thread(self, socket):
- """KafkaConnection copies work in other threads"""
-
- err = []
- copy = KafkaConnection('kafka', 9092).copy()
-
- def thread_func(err, copy):
- try:
- self.assertEqual(copy.host, 'kafka')
- self.assertEqual(copy.port, 9092)
- self.assertNotEqual(copy._sock, None)
- except Exception as e:
- err.append(e)
- else:
- err.append(None)
- thread = Thread(target=thread_func, args=(err, copy))
- thread.start()
- thread.join()
-
- self.assertEqual(err, [None])
- self.assertEqual(socket.call_count, 2)
-
- def test_timeout(self):
- def _timeout(*args, **kwargs):
- timeout = args[1]
- time.sleep(timeout)
- raise socket.timeout
-
- with mock.patch.object(socket, "create_connection", side_effect=_timeout):
-
- with Timer() as t:
- with self.assertRaises(ConnectionError):
- KafkaConnection("nowhere", 1234, 1.0)
- self.assertGreaterEqual(t.interval, 1.0)