diff options
author | Sergey Shepelev <temotor@gmail.com> | 2017-04-28 23:58:32 +0300 |
---|---|---|
committer | Sergey Shepelev <temotor@gmail.com> | 2017-04-30 20:56:40 +0300 |
commit | 7f53465578543156e7251e243c0636e087a8445f (patch) | |
tree | f95752a440b97a5d5a8e2eb167af6b89aa3bc5f9 | |
parent | ef830554411c86b6f09f55a67afdbb3ab31ba535 (diff) | |
download | eventlet-7f53465578543156e7251e243c0636e087a8445f.tar.gz |
wsgi: close idle connections (also applies to websockets)wsgi-close-idle-188
https://github.com/eventlet/eventlet/issues/188
-rw-r--r-- | eventlet/wsgi.py | 91 | ||||
-rw-r--r-- | tests/websocket_test.py | 25 | ||||
-rw-r--r-- | tests/wsgi_test.py | 28 |
3 files changed, 115 insertions, 29 deletions
diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py index d26bc27..38a18c7 100644 --- a/eventlet/wsgi.py +++ b/eventlet/wsgi.py @@ -24,8 +24,15 @@ MINIMUM_CHUNK_SIZE = 4096 # %(client_port)s is also available DEFAULT_LOG_FORMAT = ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"' ' %(status_code)s %(body_length)s %(wall_seconds).6f') +RESPONSE_414 = b'''HTTP/1.0 414 Request URI Too Long\r\n\ +Connection: close\r\n\ +Content-Length: 0\r\n\r\n''' is_accepting = True +STATE_IDLE = 'idle' +STATE_REQUEST = 'request' +STATE_CLOSE = 'close' + __all__ = ['server', 'format_date_time'] # Weekday and month names for HTTP date/time formatting; always English! @@ -318,6 +325,17 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): # so before going back to unbuffered, remove any usage of `writelines`. wbufsize = 16 << 10 + def __init__(self, conn_state, server): + self.request = conn_state[1] + self.client_address = conn_state[0] + self.conn_state = conn_state + self.server = server + self.setup() + try: + self.handle() + finally: + self.finish() + def setup(self): # overriding SocketServer.setup to correctly handle SSL.Connection objects conn = self.connection = self.request @@ -343,32 +361,42 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler): raise NotImplementedError( '''eventlet.wsgi doesn't support sockets of type {0}'''.format(type(conn))) - def handle_one_request(self): - if self.server.max_http_version: - self.protocol_version = self.server.max_http_version + def handle(self): + self.close_connection = True + while True: + self.handle_one_request() + if self.conn_state[2] == STATE_CLOSE: + self.close_connection = 1 + if self.close_connection: + break + + def _read_request_line(self): if self.rfile.closed: self.close_connection = 1 - return + return '' try: - self.raw_requestline = self.rfile.readline(self.server.url_length_limit) - if len(self.raw_requestline) >= self.server.url_length_limit: - self.wfile.write( - b"HTTP/1.0 414 Request URI Too Long\r\n" - b"Connection: close\r\nContent-length: 0\r\n\r\n") - self.close_connection = 1 - return + return self.rfile.readline(self.server.url_length_limit) except greenio.SSL.ZeroReturnError: - self.raw_requestline = '' + pass except socket.error as e: if support.get_errno(e) not in BAD_SOCK: raise - self.raw_requestline = '' + return '' + + def handle_one_request(self): + if self.server.max_http_version: + self.protocol_version = self.server.max_http_version + self.raw_requestline = self._read_request_line() if not self.raw_requestline: self.close_connection = 1 return + if len(self.raw_requestline) >= self.server.url_length_limit: + self.wfile.write(RESPONSE_414) + self.close_connection = 1 + return orig_rfile = self.rfile try: @@ -736,22 +764,21 @@ class Server(BaseHTTPServer.HTTPServer): d.update(self.environ) return d - def process_request(self, sock_params): + def process_request(self, conn_state): # The actual request handling takes place in __init__, so we need to # set minimum_chunk_size before __init__ executes and we don't want to modify # class variable - sock, address = sock_params[:2] proto = new(self.protocol) if self.minimum_chunk_size is not None: proto.minimum_chunk_size = self.minimum_chunk_size proto.capitalize_response_headers = self.capitalize_response_headers try: - proto.__init__(sock, address, self) + proto.__init__(conn_state, self) except socket.timeout: # Expected exceptions are not exceptional - sock.close() + conn_state[1].close() # similar to logging "accepted" in server() - self.log.debug('({0}) timed out {1!r}'.format(self.pid, address)) + self.log.debug('({0}) timed out {1!r}'.format(self.pid, conn_state[0])) def log_message(self, message): raise AttributeError('''\ @@ -893,19 +920,30 @@ def server(sock, site, else: pool = eventlet.GreenPool(max_size) - if not (hasattr(pool, 'spawn_n') and hasattr(pool, 'waitall')): + if not (hasattr(pool, 'spawn') and hasattr(pool, 'waitall')): raise AttributeError('''\ -eventlet.wsgi.Server pool must provide methods: `spawn_n`, `waitall`. +eventlet.wsgi.Server pool must provide methods: `spawn`, `waitall`. If unsure, use eventlet.GreenPool.''') + # [addr, socket, state] + connections = {} + + def _clean_connection(_, conn): + connections.pop(conn[0], None) + conn[2] = STATE_CLOSE + greenio.shutdown_safe(conn[1]) + conn[1].close() + try: serv.log.info('({0}) wsgi starting up on {1}'.format(serv.pid, socket_repr(sock))) while is_accepting: try: - client_socket = sock.accept() - client_socket[0].settimeout(serv.socket_timeout) - serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_socket[1])) - pool.spawn_n(serv.process_request, client_socket) + client_socket, client_addr = sock.accept() + client_socket.settimeout(serv.socket_timeout) + serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_addr)) + connections[client_addr] = connection = [client_addr, client_socket, STATE_IDLE] + (pool.spawn(serv.process_request, connection) + .link(_clean_connection, connection)) except ACCEPT_EXCEPTIONS as e: if support.get_errno(e) not in ACCEPT_ERRNO: raise @@ -913,6 +951,11 @@ If unsure, use eventlet.GreenPool.''') serv.log.info('wsgi exiting') break finally: + for cs in six.itervalues(connections): + prev_state = cs[2] + cs[2] = STATE_CLOSE + if prev_state == STATE_IDLE: + greenio.shutdown_safe(cs[1]) pool.waitall() serv.log.info('({0}) wsgi exited, is_accepting={1}'.format(serv.pid, is_accepting)) try: diff --git a/tests/websocket_test.py b/tests/websocket_test.py index 3ff500a..caeda53 100644 --- a/tests/websocket_test.py +++ b/tests/websocket_test.py @@ -1,5 +1,6 @@ import errno import socket +import sys import eventlet from eventlet import event @@ -503,6 +504,30 @@ class TestWebSocket(tests.wsgi_test._TestBase): done_with_request.wait() assert error_detected[0] + def test_close_idle(self): + pool = eventlet.GreenPool() + # use log=stderr when test runner can capture it + self.spawn_server(custom_pool=pool, log=sys.stdout) + connect = ( + 'GET /echo HTTP/1.1', + 'Upgrade: WebSocket', + 'Connection: Upgrade', + 'Host: %s:%s' % self.server_addr, + 'Origin: http://%s:%s' % self.server_addr, + 'Sec-WebSocket-Protocol: ws', + 'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5', + 'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00', + ) + sock = eventlet.connect(self.server_addr) + sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U')) + sock.recv(1024) + sock.sendall(b'\x00hello\xff') + result = sock.recv(1024) + assert result, b'\x00hello\xff' + self.killer.kill(KeyboardInterrupt) + with eventlet.Timeout(1): + pool.waitall() + class TestWebSocketSSL(tests.wsgi_test._TestBase): def set_site(self): diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py index 1bc6abb..809335e 100644 --- a/tests/wsgi_test.py +++ b/tests/wsgi_test.py @@ -255,7 +255,7 @@ class _TestBase(tests.LimitedTestCase): if self.killer: greenthread.kill(self.killer) - self.killer = eventlet.spawn_n(target, **kwargs) + self.killer = eventlet.spawn(target, **kwargs) def set_site(self): raise NotImplementedError @@ -557,8 +557,8 @@ class TestHttpd(_TestBase): def server(sock, site, log): try: serv = wsgi.Server(sock, sock.getsockname(), site, log) - client_socket = sock.accept() - serv.process_request(client_socket) + client_socket, addr = sock.accept() + serv.process_request([addr, client_socket, wsgi.STATE_IDLE]) return True except Exception: traceback.print_exc() @@ -1471,13 +1471,14 @@ class TestHttpd(_TestBase): sock.close() request_thread = eventlet.spawn(make_request) - server_conn = server_sock.accept() + client_sock, addr = server_sock.accept() # Next line must not raise IOError -32 Broken pipe - server.process_request(server_conn) + server.process_request([addr, client_sock, wsgi.STATE_IDLE]) request_thread.wait() server_sock.close() def test_server_connection_timeout_exception(self): + self.reset_timeout(5) # Handle connection socket timeouts # https://bitbucket.org/eventlet/eventlet/issue/143/ # Runs tests.wsgi_test_conntimeout in a separate process. @@ -1582,6 +1583,23 @@ class TestHttpd(_TestBase): log_content = self.logfile.getvalue() assert log_content == '' + def test_close_idle_connections(self): + self.reset_timeout(2) + pool = eventlet.GreenPool() + self.spawn_server(custom_pool=pool) + # https://github.com/eventlet/eventlet/issues/188 + sock = eventlet.connect(self.server_addr) + + sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n') + result = read_http(sock) + assert result.status == 'HTTP/1.1 200 OK', 'Received status {0!r}'.format(result.status) + self.killer.kill(KeyboardInterrupt) + try: + with eventlet.Timeout(1): + pool.waitall() + except Exception: + assert False, self.logfile.getvalue() + def read_headers(sock): fd = sock.makefile('rb') |