diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-17 11:56:17 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-17 11:56:17 -0700 |
commit | 0bda9a4734167bf88b3f1361928559d035df9392 (patch) | |
tree | 8745e406bee0f2a0426c320481a7ab56bca5f504 /kafka/conn.py | |
parent | 68c8a4a276237488fba1495fcec43b572a80289c (diff) | |
download | kafka-python-0bda9a4734167bf88b3f1361928559d035df9392.tar.gz |
Delete KafkaConnection class (#769)
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 187 |
1 files changed, 0 insertions, 187 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 38829c6..5489d1f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -6,10 +6,7 @@ import io from random import shuffle import socket import ssl -import struct -from threading import local import time -import warnings import six @@ -27,7 +24,6 @@ if six.PY2: log = logging.getLogger(__name__) -DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 DEFAULT_KAFKA_PORT = 9092 # support older ssl libraries @@ -745,186 +741,3 @@ def collect_hosts(hosts, randomize=True): shuffle(result) return result - - -class KafkaConnection(local): - """A socket connection to a single Kafka broker - - Arguments: - host: the host name or IP address of a kafka broker - port: the port number the kafka broker is listening on - timeout: default 120. The socket timeout for sending and receiving data - in seconds. None means no timeout, so a request can block forever. - """ - def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): - warnings.warn('KafkaConnection has been deprecated and will be' - ' removed in a future release', DeprecationWarning) - super(KafkaConnection, self).__init__() - self.host = host - self.port = port - self.timeout = timeout - self._sock = None - - self.reinit() - - def __getnewargs__(self): - return (self.host, self.port, self.timeout) - - def __repr__(self): - return "<KafkaConnection host=%s port=%d>" % (self.host, self.port) - - ################### - # Private API # - ################### - - def _raise_connection_error(self): - # Cleanup socket if we have one - if self._sock: - self.close() - - # And then raise - raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) - - def _read_bytes(self, num_bytes): - bytes_left = num_bytes - responses = [] - - log.debug("About to read %d bytes from Kafka", num_bytes) - - # Make sure we have a connection - if not self._sock: - self.reinit() - - while bytes_left: - - try: - # pylint: disable-msg=no-member - data = self._sock.recv(min(bytes_left, 4096)) - - # Receiving empty string from recv signals - # that the socket is in error. we will never get - # more data from this socket - if data == b'': - raise socket.error("Not enough data to read message -- did server kill socket?") - - except socket.error: - log.exception('Unable to receive data from Kafka') - self._raise_connection_error() - - bytes_left -= len(data) - log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes) - responses.append(data) - - return b''.join(responses) - - ################## - # Public API # - ################## - - # TODO multiplex socket communication to allow for multi-threaded clients - - def get_connected_socket(self): - if not self._sock: - self.reinit() - return self._sock - - def send(self, request_id, payload): - """ - Send a request to Kafka - - Arguments:: - request_id (int): can be any int (used only for debug logging...) - payload: an encoded kafka packet (see KafkaProtocol) - """ - - log.debug("About to send %d bytes to Kafka, request %d" % (len(payload), request_id)) - - # Make sure we have a connection - if not self._sock: - self.reinit() - - try: - # pylint: disable-msg=no-member - self._sock.sendall(payload) - except socket.error: - log.exception('Unable to send payload to Kafka') - self._raise_connection_error() - - def recv(self, request_id): - """ - Get a response packet from Kafka - - Arguments: - request_id: can be any int (only used for debug logging...) - - Returns: - str: Encoded kafka packet response from server - """ - log.debug("Reading response %d from Kafka" % request_id) - - # Make sure we have a connection - if not self._sock: - self.reinit() - - # Read the size off of the header - resp = self._read_bytes(4) - (size,) = struct.unpack('>i', resp) - - # Read the remainder of the response - resp = self._read_bytes(size) - return resp - - def copy(self): - """ - Create an inactive copy of the connection object, suitable for - passing to a background thread. - - The returned copy is not connected; you must call reinit() before - using. - """ - c = copy.deepcopy(self) - # Python 3 doesn't copy custom attributes of the threadlocal subclass - c.host = copy.copy(self.host) - c.port = copy.copy(self.port) - c.timeout = copy.copy(self.timeout) - c._sock = None - return c - - def close(self): - """ - Shutdown and close the connection socket - """ - log.debug("Closing socket connection for %s:%d" % (self.host, self.port)) - if self._sock: - # Call shutdown to be a good TCP client - # But expect an error if the socket has already been - # closed by the server - try: - # pylint: disable-msg=no-member - self._sock.shutdown(socket.SHUT_RDWR) - except socket.error: - pass - - # Closing the socket should always succeed - self._sock.close() - self._sock = None - else: - log.debug("No socket found to close!") - - def reinit(self): - """ - Re-initialize the socket connection - close current socket (if open) - and start a fresh connection - raise ConnectionError on error - """ - log.debug("Reinitializing socket connection for %s:%d" % (self.host, self.port)) - - if self._sock: - self.close() - - try: - self._sock = socket.create_connection((self.host, self.port), self.timeout) - except socket.error: - log.exception('Unable to connect to kafka broker at %s:%d' % (self.host, self.port)) - self._raise_connection_error() |