summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Shepelev <temotor@gmail.com>2017-04-28 23:58:32 +0300
committerSergey Shepelev <temotor@gmail.com>2017-04-30 20:56:40 +0300
commit7f53465578543156e7251e243c0636e087a8445f (patch)
treef95752a440b97a5d5a8e2eb167af6b89aa3bc5f9
parentef830554411c86b6f09f55a67afdbb3ab31ba535 (diff)
downloadeventlet-7f53465578543156e7251e243c0636e087a8445f.tar.gz
wsgi: close idle connections (also applies to websockets)wsgi-close-idle-188
https://github.com/eventlet/eventlet/issues/188
-rw-r--r--eventlet/wsgi.py91
-rw-r--r--tests/websocket_test.py25
-rw-r--r--tests/wsgi_test.py28
3 files changed, 115 insertions, 29 deletions
diff --git a/eventlet/wsgi.py b/eventlet/wsgi.py
index d26bc27..38a18c7 100644
--- a/eventlet/wsgi.py
+++ b/eventlet/wsgi.py
@@ -24,8 +24,15 @@ MINIMUM_CHUNK_SIZE = 4096
# %(client_port)s is also available
DEFAULT_LOG_FORMAT = ('%(client_ip)s - - [%(date_time)s] "%(request_line)s"'
' %(status_code)s %(body_length)s %(wall_seconds).6f')
+RESPONSE_414 = b'''HTTP/1.0 414 Request URI Too Long\r\n\
+Connection: close\r\n\
+Content-Length: 0\r\n\r\n'''
is_accepting = True
+STATE_IDLE = 'idle'
+STATE_REQUEST = 'request'
+STATE_CLOSE = 'close'
+
__all__ = ['server', 'format_date_time']
# Weekday and month names for HTTP date/time formatting; always English!
@@ -318,6 +325,17 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
# so before going back to unbuffered, remove any usage of `writelines`.
wbufsize = 16 << 10
+ def __init__(self, conn_state, server):
+ self.request = conn_state[1]
+ self.client_address = conn_state[0]
+ self.conn_state = conn_state
+ self.server = server
+ self.setup()
+ try:
+ self.handle()
+ finally:
+ self.finish()
+
def setup(self):
# overriding SocketServer.setup to correctly handle SSL.Connection objects
conn = self.connection = self.request
@@ -343,32 +361,42 @@ class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler):
raise NotImplementedError(
'''eventlet.wsgi doesn't support sockets of type {0}'''.format(type(conn)))
- def handle_one_request(self):
- if self.server.max_http_version:
- self.protocol_version = self.server.max_http_version
+ def handle(self):
+ self.close_connection = True
+ while True:
+ self.handle_one_request()
+ if self.conn_state[2] == STATE_CLOSE:
+ self.close_connection = 1
+ if self.close_connection:
+ break
+
+ def _read_request_line(self):
if self.rfile.closed:
self.close_connection = 1
- return
+ return ''
try:
- self.raw_requestline = self.rfile.readline(self.server.url_length_limit)
- if len(self.raw_requestline) >= self.server.url_length_limit:
- self.wfile.write(
- b"HTTP/1.0 414 Request URI Too Long\r\n"
- b"Connection: close\r\nContent-length: 0\r\n\r\n")
- self.close_connection = 1
- return
+ return self.rfile.readline(self.server.url_length_limit)
except greenio.SSL.ZeroReturnError:
- self.raw_requestline = ''
+ pass
except socket.error as e:
if support.get_errno(e) not in BAD_SOCK:
raise
- self.raw_requestline = ''
+ return ''
+
+ def handle_one_request(self):
+ if self.server.max_http_version:
+ self.protocol_version = self.server.max_http_version
+ self.raw_requestline = self._read_request_line()
if not self.raw_requestline:
self.close_connection = 1
return
+ if len(self.raw_requestline) >= self.server.url_length_limit:
+ self.wfile.write(RESPONSE_414)
+ self.close_connection = 1
+ return
orig_rfile = self.rfile
try:
@@ -736,22 +764,21 @@ class Server(BaseHTTPServer.HTTPServer):
d.update(self.environ)
return d
- def process_request(self, sock_params):
+ def process_request(self, conn_state):
# The actual request handling takes place in __init__, so we need to
# set minimum_chunk_size before __init__ executes and we don't want to modify
# class variable
- sock, address = sock_params[:2]
proto = new(self.protocol)
if self.minimum_chunk_size is not None:
proto.minimum_chunk_size = self.minimum_chunk_size
proto.capitalize_response_headers = self.capitalize_response_headers
try:
- proto.__init__(sock, address, self)
+ proto.__init__(conn_state, self)
except socket.timeout:
# Expected exceptions are not exceptional
- sock.close()
+ conn_state[1].close()
# similar to logging "accepted" in server()
- self.log.debug('({0}) timed out {1!r}'.format(self.pid, address))
+ self.log.debug('({0}) timed out {1!r}'.format(self.pid, conn_state[0]))
def log_message(self, message):
raise AttributeError('''\
@@ -893,19 +920,30 @@ def server(sock, site,
else:
pool = eventlet.GreenPool(max_size)
- if not (hasattr(pool, 'spawn_n') and hasattr(pool, 'waitall')):
+ if not (hasattr(pool, 'spawn') and hasattr(pool, 'waitall')):
raise AttributeError('''\
-eventlet.wsgi.Server pool must provide methods: `spawn_n`, `waitall`.
+eventlet.wsgi.Server pool must provide methods: `spawn`, `waitall`.
If unsure, use eventlet.GreenPool.''')
+ # [addr, socket, state]
+ connections = {}
+
+ def _clean_connection(_, conn):
+ connections.pop(conn[0], None)
+ conn[2] = STATE_CLOSE
+ greenio.shutdown_safe(conn[1])
+ conn[1].close()
+
try:
serv.log.info('({0}) wsgi starting up on {1}'.format(serv.pid, socket_repr(sock)))
while is_accepting:
try:
- client_socket = sock.accept()
- client_socket[0].settimeout(serv.socket_timeout)
- serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_socket[1]))
- pool.spawn_n(serv.process_request, client_socket)
+ client_socket, client_addr = sock.accept()
+ client_socket.settimeout(serv.socket_timeout)
+ serv.log.debug('({0}) accepted {1!r}'.format(serv.pid, client_addr))
+ connections[client_addr] = connection = [client_addr, client_socket, STATE_IDLE]
+ (pool.spawn(serv.process_request, connection)
+ .link(_clean_connection, connection))
except ACCEPT_EXCEPTIONS as e:
if support.get_errno(e) not in ACCEPT_ERRNO:
raise
@@ -913,6 +951,11 @@ If unsure, use eventlet.GreenPool.''')
serv.log.info('wsgi exiting')
break
finally:
+ for cs in six.itervalues(connections):
+ prev_state = cs[2]
+ cs[2] = STATE_CLOSE
+ if prev_state == STATE_IDLE:
+ greenio.shutdown_safe(cs[1])
pool.waitall()
serv.log.info('({0}) wsgi exited, is_accepting={1}'.format(serv.pid, is_accepting))
try:
diff --git a/tests/websocket_test.py b/tests/websocket_test.py
index 3ff500a..caeda53 100644
--- a/tests/websocket_test.py
+++ b/tests/websocket_test.py
@@ -1,5 +1,6 @@
import errno
import socket
+import sys
import eventlet
from eventlet import event
@@ -503,6 +504,30 @@ class TestWebSocket(tests.wsgi_test._TestBase):
done_with_request.wait()
assert error_detected[0]
+ def test_close_idle(self):
+ pool = eventlet.GreenPool()
+ # use log=stderr when test runner can capture it
+ self.spawn_server(custom_pool=pool, log=sys.stdout)
+ connect = (
+ 'GET /echo HTTP/1.1',
+ 'Upgrade: WebSocket',
+ 'Connection: Upgrade',
+ 'Host: %s:%s' % self.server_addr,
+ 'Origin: http://%s:%s' % self.server_addr,
+ 'Sec-WebSocket-Protocol: ws',
+ 'Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5',
+ 'Sec-WebSocket-Key2: 12998 5 Y3 1 .P00',
+ )
+ sock = eventlet.connect(self.server_addr)
+ sock.sendall(six.b('\r\n'.join(connect) + '\r\n\r\n^n:ds[4U'))
+ sock.recv(1024)
+ sock.sendall(b'\x00hello\xff')
+ result = sock.recv(1024)
+ assert result, b'\x00hello\xff'
+ self.killer.kill(KeyboardInterrupt)
+ with eventlet.Timeout(1):
+ pool.waitall()
+
class TestWebSocketSSL(tests.wsgi_test._TestBase):
def set_site(self):
diff --git a/tests/wsgi_test.py b/tests/wsgi_test.py
index 1bc6abb..809335e 100644
--- a/tests/wsgi_test.py
+++ b/tests/wsgi_test.py
@@ -255,7 +255,7 @@ class _TestBase(tests.LimitedTestCase):
if self.killer:
greenthread.kill(self.killer)
- self.killer = eventlet.spawn_n(target, **kwargs)
+ self.killer = eventlet.spawn(target, **kwargs)
def set_site(self):
raise NotImplementedError
@@ -557,8 +557,8 @@ class TestHttpd(_TestBase):
def server(sock, site, log):
try:
serv = wsgi.Server(sock, sock.getsockname(), site, log)
- client_socket = sock.accept()
- serv.process_request(client_socket)
+ client_socket, addr = sock.accept()
+ serv.process_request([addr, client_socket, wsgi.STATE_IDLE])
return True
except Exception:
traceback.print_exc()
@@ -1471,13 +1471,14 @@ class TestHttpd(_TestBase):
sock.close()
request_thread = eventlet.spawn(make_request)
- server_conn = server_sock.accept()
+ client_sock, addr = server_sock.accept()
# Next line must not raise IOError -32 Broken pipe
- server.process_request(server_conn)
+ server.process_request([addr, client_sock, wsgi.STATE_IDLE])
request_thread.wait()
server_sock.close()
def test_server_connection_timeout_exception(self):
+ self.reset_timeout(5)
# Handle connection socket timeouts
# https://bitbucket.org/eventlet/eventlet/issue/143/
# Runs tests.wsgi_test_conntimeout in a separate process.
@@ -1582,6 +1583,23 @@ class TestHttpd(_TestBase):
log_content = self.logfile.getvalue()
assert log_content == ''
+ def test_close_idle_connections(self):
+ self.reset_timeout(2)
+ pool = eventlet.GreenPool()
+ self.spawn_server(custom_pool=pool)
+ # https://github.com/eventlet/eventlet/issues/188
+ sock = eventlet.connect(self.server_addr)
+
+ sock.sendall(b'GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
+ result = read_http(sock)
+ assert result.status == 'HTTP/1.1 200 OK', 'Received status {0!r}'.format(result.status)
+ self.killer.kill(KeyboardInterrupt)
+ try:
+ with eventlet.Timeout(1):
+ pool.waitall()
+ except Exception:
+ assert False, self.logfile.getvalue()
+
def read_headers(sock):
fd = sock.makefile('rb')