import errno import io import socket from ..exceptions import ConnectionError, TimeoutError from ..utils import SSL_AVAILABLE NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {BlockingIOError: errno.EWOULDBLOCK} if SSL_AVAILABLE: import ssl if hasattr(ssl, "SSLWantReadError"): NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantReadError] = 2 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantWriteError] = 2 else: NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2 NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys()) SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server." SENTINEL = object() class SocketBuffer: def __init__(self, socket, socket_read_size, socket_timeout): self._sock = socket self.socket_read_size = socket_read_size self.socket_timeout = socket_timeout self._buffer = io.BytesIO() # number of bytes written to the buffer from the socket self.bytes_written = 0 # number of bytes read from the buffer self.bytes_read = 0 @property def length(self): return self.bytes_written - self.bytes_read def _read_from_socket(self, length=None, timeout=SENTINEL, raise_on_timeout=True): sock = self._sock socket_read_size = self.socket_read_size buf = self._buffer buf.seek(self.bytes_written) marker = 0 custom_timeout = timeout is not SENTINEL try: if custom_timeout: sock.settimeout(timeout) while True: data = self._sock.recv(socket_read_size) # an empty string indicates the server shutdown the socket if isinstance(data, bytes) and len(data) == 0: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) buf.write(data) data_length = len(data) self.bytes_written += data_length marker += data_length if length is not None and length > marker: continue return True except socket.timeout: if raise_on_timeout: raise TimeoutError("Timeout reading from socket") return False except NONBLOCKING_EXCEPTIONS as ex: # if we're in nonblocking mode and the recv raises a # blocking error, simply return False indicating that # there's no data to be read. otherwise raise the # original exception. allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1) if not raise_on_timeout and ex.errno == allowed: return False raise ConnectionError(f"Error while reading from socket: {ex.args}") finally: if custom_timeout: sock.settimeout(self.socket_timeout) def can_read(self, timeout): return bool(self.length) or self._read_from_socket( timeout=timeout, raise_on_timeout=False ) def read(self, length): length = length + 2 # make sure to read the \r\n terminator # make sure we've read enough data from the socket if length > self.length: self._read_from_socket(length - self.length) self._buffer.seek(self.bytes_read) data = self._buffer.read(length) self.bytes_read += len(data) # purge the buffer when we've consumed it all so it doesn't # grow forever if self.bytes_read == self.bytes_written: self.purge() return data[:-2] def readline(self): buf = self._buffer buf.seek(self.bytes_read) data = buf.readline() while not data.endswith(b"\r\n"): # there's more data in the socket that we need self._read_from_socket() buf.seek(self.bytes_read) data = buf.readline() self.bytes_read += len(data) # purge the buffer when we've consumed it all so it doesn't # grow forever if self.bytes_read == self.bytes_written: self.purge() return data[:-2] def purge(self): self._buffer.seek(0) self._buffer.truncate() self.bytes_written = 0 self.bytes_read = 0 def close(self): try: self.purge() self._buffer.close() except Exception: # issue #633 suggests the purge/close somehow raised a # BadFileDescriptor error. Perhaps the client ran out of # memory or something else? It's probably OK to ignore # any error being raised from purge/close since we're # removing the reference to the instance below. pass self._buffer = None self._sock = None