From 1ca8eed2801f45a570a31476889f5b2cffcd088a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Wed, 20 Mar 2013 10:11:28 -0700 Subject: http Response and Request helpers --- srv.py | 65 ++-- tests/http_protocol_test.py | 814 ++++++++++++++++++++++++++++++++++---------- tulip/http/client.py | 35 +- tulip/http/protocol.py | 701 +++++++++++++++++++++++++++++--------- tulip/selector_events.py | 2 +- 5 files changed, 1231 insertions(+), 386 deletions(-) diff --git a/srv.py b/srv.py index 540e63b..710e0e7 100644 --- a/srv.py +++ b/srv.py @@ -51,22 +51,36 @@ class HttpServer(tulip.Protocol): print(hdr, val) headers.add_header(hdr, val) - write = self.transport.write if isdir and not path.endswith('/'): - bpath = path.encode('ascii') - write(b'HTTP/1.0 302 Redirected\r\n' - b'URI: ' + bpath + b'/\r\n' - b'Location: ' + bpath + b'/\r\n' - b'\r\n') + path = path + '/' + response = tulip.http.Response(self.transport, 302) + response.add_headers( + ('URI', path), + ('Location', path)) + response.send_headers() + response.write_eof() + self.transport.close() return - write(b'HTTP/1.0 200 Ok\r\n') - if isdir: - write(b'Content-type: text/html\r\n') - else: - write(b'Content-type: text/plain\r\n') - write(b'\r\n') + + response = tulip.http.Response(self.transport, 200) + response.add_header('Transfer-Encoding', 'chunked') + + # content encoding + accept_encoding = headers.get('accept-encoding', '').lower() + if 'deflate' in accept_encoding: + response.add_header('Content-Encoding', 'deflate') + response.add_compression_filter('deflate') + elif 'gzip' in accept_encoding: + response.add_header('Content-Encoding', 'gzip') + response.add_compression_filter('gzip') + + response.add_chunking_filter(1025) + if isdir: - write(b'') else: + response.add_header('Content-type', 'text/plain') + response.send_headers() + try: - with open(path, 'rb') as f: - write(f.read()) + with open(path, 'rb') as fp: + chunk = fp.read(8196) + while chunk: + if not response.write(chunk): + break + chunk = fp.read(8196) except OSError: - write(b'Cannot open\r\n') + response.write(b'Cannot open') + + response.write_eof() self.transport.close() def connection_made(self, transport): diff --git a/tests/http_protocol_test.py b/tests/http_protocol_test.py index c095228..408bfc7 100644 --- a/tests/http_protocol_test.py +++ b/tests/http_protocol_test.py @@ -203,151 +203,6 @@ class HttpStreamReaderTests(LogTrackingTestCase): self.assertIn("limit request headers fields size", str(cm.exception)) - def test_read_payload_unknown_encoding(self): - self.assertRaises( - ValueError, self.stream.read_length_payload, encoding='unknown') - - def test_read_payload(self): - self.stream.feed_data(b'da') - self.stream.feed_data(b't') - self.stream.feed_data(b'ali') - self.stream.feed_data(b'ne') - - stream = self.stream.read_length_payload(4) - self.assertIsInstance(stream, tulip.StreamReader) - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'data', data) - self.assertEqual(b'line', b''.join(self.stream.buffer)) - - def test_read_payload_eof(self): - self.stream.feed_data(b'da') - self.stream.feed_eof() - stream = self.stream.read_length_payload(4) - - self.assertRaises( - http.client.IncompleteRead, - self.loop.run_until_complete, tulip.Task(stream.read())) - - def test_read_payload_eof_exc(self): - self.stream.feed_data(b'da') - stream = self.stream.read_length_payload(4) - - def eof(): - yield from [] - self.stream.feed_eof() - - t1 = tulip.Task(stream.read()) - t2 = tulip.Task(eof()) - - self.loop.run_until_complete(tulip.Task(tulip.wait([t1, t2]))) - self.assertRaises(http.client.IncompleteRead, t1.result) - self.assertIsNone(self.stream._reader) - - def test_read_payload_deflate(self): - comp = zlib.compressobj(wbits=-zlib.MAX_WBITS) - - data = b''.join([comp.compress(b'data'), comp.flush()]) - stream = self.stream.read_length_payload(len(data), encoding='deflate') - - self.stream.feed_data(data) - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'data', data) - - def _test_read_payload_compress_error(self): - data = b'123123123datadatadata' - reader = protocol.length_reader(4) - self.stream.feed_data(data) - stream = self.stream.read_payload(reader, 'deflate') - - self.assertRaises( - http.client.IncompleteRead, - self.loop.run_until_complete, tulip.Task(stream.read())) - - def test_read_chunked_payload(self): - stream = self.stream.read_chunked_payload() - self.stream.feed_data(b'4\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n') - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'dataline', data) - - def test_read_chunked_payload_chunks(self): - stream = self.stream.read_chunked_payload() - - self.stream.feed_data(b'4\r\ndata\r') - self.stream.feed_data(b'\n4') - self.stream.feed_data(b'\r') - self.stream.feed_data(b'\n') - self.stream.feed_data(b'line\r\n0\r\n') - self.stream.feed_data(b'test\r\n\r\n') - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'dataline', data) - - def test_read_chunked_payload_incomplete(self): - stream = self.stream.read_chunked_payload() - - self.stream.feed_data(b'4\r\ndata\r\n') - self.stream.feed_eof() - - self.assertRaises( - http.client.IncompleteRead, - self.loop.run_until_complete, tulip.Task(stream.read())) - - def test_read_chunked_payload_extension(self): - stream = self.stream.read_chunked_payload() - - self.stream.feed_data( - b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n') - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'dataline', data) - - def test_read_chunked_payload_size_error(self): - stream = self.stream.read_chunked_payload() - - self.stream.feed_data(b'blah\r\n') - self.assertRaises( - http.client.IncompleteRead, - self.loop.run_until_complete, tulip.Task(stream.read())) - - def test_read_length_payload(self): - stream = self.stream.read_length_payload(8) - - self.stream.feed_data(b'data') - self.stream.feed_data(b'data') - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'datadata', data) - - def test_read_length_payload_zero(self): - stream = self.stream.read_length_payload(0) - - self.stream.feed_data(b'data') - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'', data) - - def test_read_length_payload_incomplete(self): - stream = self.stream.read_length_payload(8) - - self.stream.feed_data(b'data') - self.stream.feed_eof() - - self.assertRaises( - http.client.IncompleteRead, - self.loop.run_until_complete, tulip.Task(stream.read())) - - def test_read_eof_payload(self): - stream = self.stream.read_eof_payload() - - self.stream.feed_data(b'data') - self.stream.feed_eof() - - data = self.loop.run_until_complete(tulip.Task(stream.read())) - self.assertEqual(b'data', data) - def test_read_message_should_close(self): self.stream.feed_data( b'Host: example.com\r\nConnection: close\r\n\r\n') @@ -477,7 +332,7 @@ class HttpStreamReaderTests(LogTrackingTestCase): payload = self.loop.run_until_complete(tulip.Task(msg.payload.read())) self.assertEqual(b'dataline', payload) - def test_read_message_readall(self): + def test_read_message_readall_eof(self): self.stream.feed_data( b'Host: example.com\r\n\r\n') self.stream.feed_data(b'data') @@ -490,46 +345,653 @@ class HttpStreamReaderTests(LogTrackingTestCase): payload = self.loop.run_until_complete(tulip.Task(msg.payload.read())) self.assertEqual(b'dataline', payload) + def test_read_message_payload(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Length: 8\r\n\r\n') + self.stream.feed_data(b'data') + self.stream.feed_data(b'data') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message(readall=True))) + + data = self.loop.run_until_complete(tulip.Task(msg.payload.read())) + self.assertEqual(b'datadata', data) + + def test_read_message_payload_eof(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Length: 4\r\n\r\n') + self.stream.feed_data(b'da') + self.stream.feed_eof() + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message(readall=True))) + + self.assertRaises( + http.client.IncompleteRead, + self.loop.run_until_complete, tulip.Task(msg.payload.read())) + + def test_read_message_length_payload_zero(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Length: 0\r\n\r\n') + self.stream.feed_data(b'data') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + data = self.loop.run_until_complete(tulip.Task(msg.payload.read())) + self.assertEqual(b'', data) + + def test_read_message_length_payload_incomplete(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Length: 8\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data(b'data') + self.stream.feed_eof() + return (yield from msg.payload.read()) + + self.assertRaises( + http.client.IncompleteRead, + self.loop.run_until_complete, tulip.Task(coro())) -class HttpStreamWriterTests(unittest.TestCase): + def test_read_message_eof_payload(self): + self.stream.feed_data(b'Host: example.com\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message(readall=True))) + + def coro(): + self.stream.feed_data(b'data') + self.stream.feed_eof() + return (yield from msg.payload.read()) + + data = self.loop.run_until_complete(tulip.Task(coro())) + self.assertEqual(b'data', data) + + def test_read_message_length_payload(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Length: 4\r\n\r\n') + self.stream.feed_data(b'da') + self.stream.feed_data(b't') + self.stream.feed_data(b'ali') + self.stream.feed_data(b'ne') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message(readall=True))) + + self.assertIsInstance(msg.payload, tulip.StreamReader) + + data = self.loop.run_until_complete(tulip.Task(msg.payload.read())) + self.assertEqual(b'data', data) + self.assertEqual(b'line', b''.join(self.stream.buffer)) + + def test_read_message_length_payload_extra(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Length: 4\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data(b'da') + self.stream.feed_data(b't') + self.stream.feed_data(b'ali') + self.stream.feed_data(b'ne') + return (yield from msg.payload.read()) + + data = self.loop.run_until_complete(tulip.Task(coro())) + self.assertEqual(b'data', data) + self.assertEqual(b'line', b''.join(self.stream.buffer)) + + def test_parse_length_payload_eof_exc(self): + parser = self.stream._parse_length_payload(4) + next(parser) + + stream = tulip.StreamReader() + parser.send(stream) + self.stream._parser = parser + self.stream.feed_data(b'da') + + def eof(): + yield from [] + self.stream.feed_eof() + + t1 = tulip.Task(stream.read()) + t2 = tulip.Task(eof()) + + self.loop.run_until_complete(tulip.Task(tulip.wait([t1, t2]))) + self.assertRaises(http.client.IncompleteRead, t1.result) + self.assertIsNone(self.stream._parser) + + def test_read_message_deflate_payload(self): + comp = zlib.compressobj(wbits=-zlib.MAX_WBITS) + + data = b''.join([comp.compress(b'data'), comp.flush()]) + + self.stream.feed_data( + b'Host: example.com\r\n' + b'Content-Encoding: deflate\r\n' + + ('Content-Length: %s\r\n\r\n' % len(data)).encode()) + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message(readall=True))) + + def coro(): + self.stream.feed_data(data) + return (yield from msg.payload.read()) + + data = self.loop.run_until_complete(tulip.Task(coro())) + self.assertEqual(b'data', data) + + def test_read_message_chunked_payload(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Transfer-Encoding: chunked\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data( + b'4\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n') + return (yield from msg.payload.read()) + + data = self.loop.run_until_complete(tulip.Task(coro())) + self.assertEqual(b'dataline', data) + + def test_read_message_chunked_payload_chunks(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Transfer-Encoding: chunked\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data(b'4\r\ndata\r') + self.stream.feed_data(b'\n4') + self.stream.feed_data(b'\r') + self.stream.feed_data(b'\n') + self.stream.feed_data(b'line\r\n0\r\n') + self.stream.feed_data(b'test\r\n\r\n') + return (yield from msg.payload.read()) + + data = self.loop.run_until_complete(tulip.Task(coro())) + self.assertEqual(b'dataline', data) + + def test_read_message_chunked_payload_incomplete(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Transfer-Encoding: chunked\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data(b'4\r\ndata\r\n') + self.stream.feed_eof() + return (yield from msg.payload.read()) + + self.assertRaises( + http.client.IncompleteRead, + self.loop.run_until_complete, tulip.Task(coro())) + + def test_read_message_chunked_payload_extension(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Transfer-Encoding: chunked\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data( + b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n') + return (yield from msg.payload.read()) + + data = self.loop.run_until_complete(tulip.Task(coro())) + self.assertEqual(b'dataline', data) + + def test_read_message_chunked_payload_size_error(self): + self.stream.feed_data( + b'Host: example.com\r\n' + b'Transfer-Encoding: chunked\r\n\r\n') + + msg = self.loop.run_until_complete( + tulip.Task(self.stream.read_message())) + + def coro(): + self.stream.feed_data(b'blah\r\n') + return (yield from msg.payload.read()) + + self.assertRaises( + http.client.IncompleteRead, + self.loop.run_until_complete, tulip.Task(coro())) + + def test_deflate_stream_set_exception(self): + stream = tulip.StreamReader() + dstream = protocol.DeflateStream(stream, 'deflate') + + exc = ValueError() + dstream.set_exception(exc) + self.assertIs(exc, stream.exception()) + + def test_deflate_stream_feed_data(self): + stream = tulip.StreamReader() + dstream = protocol.DeflateStream(stream, 'deflate') + + dstream.zlib = unittest.mock.Mock() + dstream.zlib.decompress.return_value = b'line' + + dstream.feed_data(b'data') + self.assertEqual([b'line'], list(stream.buffer)) + + def test_deflate_stream_feed_data_err(self): + stream = tulip.StreamReader() + dstream = protocol.DeflateStream(stream, 'deflate') + + exc = ValueError() + dstream.zlib = unittest.mock.Mock() + dstream.zlib.decompress.side_effect = exc + + dstream.feed_data(b'data') + self.assertIsInstance(stream.exception(), http.client.IncompleteRead) + + def test_deflate_stream_feed_eof(self): + stream = tulip.StreamReader() + dstream = protocol.DeflateStream(stream, 'deflate') + + dstream.zlib = unittest.mock.Mock() + dstream.zlib.flush.return_value = b'line' + + dstream.feed_eof() + self.assertEqual([b'line'], list(stream.buffer)) + self.assertTrue(stream.eof) + + def test_deflate_stream_feed_eof_err(self): + stream = tulip.StreamReader() + dstream = protocol.DeflateStream(stream, 'deflate') + + dstream.zlib = unittest.mock.Mock() + dstream.zlib.flush.return_value = b'line' + dstream.zlib.eof = False + + dstream.feed_eof() + self.assertIsInstance(stream.exception(), http.client.IncompleteRead) + + +class HttpMessageTests(unittest.TestCase): def setUp(self): self.transport = unittest.mock.Mock() - self.writer = protocol.HttpStreamWriter(self.transport) - def test_ctor(self): - transport = unittest.mock.Mock() - writer = protocol.HttpStreamWriter(transport, 'latin-1') - self.assertIs(writer.transport, transport) - self.assertEqual(writer.encoding, 'latin-1') + def test_start_request(self): + msg = protocol.Request( + self.transport, 'GET', '/index.html', close=True) + + self.assertIs(msg.transport, self.transport) + self.assertIsNone(msg.status) + self.assertTrue(msg.closing) + self.assertEqual(msg.status_line, 'GET /index.html HTTP/1.1\r\n') + + def test_start_response(self): + msg = protocol.Response(self.transport, 200, close=True) + + self.assertIs(msg.transport, self.transport) + self.assertEqual(msg.status, 200) + self.assertTrue(msg.closing) + self.assertEqual(msg.status_line, 'HTTP/1.1 200 OK\r\n') + + def test_force_close(self): + msg = protocol.Response(self.transport, 200) + self.assertFalse(msg.closing) + msg.force_close() + self.assertTrue(msg.closing) + + def test_force_chunked(self): + msg = protocol.Response(self.transport, 200) + self.assertFalse(msg.chunked) + msg.force_chunked() + self.assertTrue(msg.chunked) + + def test_keep_alive(self): + msg = protocol.Response(self.transport, 200) + self.assertFalse(msg.keep_alive()) + msg.keepalive = True + self.assertTrue(msg.keep_alive()) + + msg.force_close() + self.assertFalse(msg.keep_alive()) + + def test_add_header(self): + msg = protocol.Response(self.transport, 200) + self.assertEqual([], msg.headers) - def test_encode(self): - self.assertEqual(b'test', self.writer.encode('test')) - self.assertEqual(b'test', self.writer.encode(b'test')) + msg.add_header('content-type', 'plain/html') + self.assertEqual([('CONTENT-TYPE', 'plain/html')], msg.headers) - def test_decode(self): - self.assertEqual('test', self.writer.decode('test')) - self.assertEqual('test', self.writer.decode(b'test')) + def test_add_headers(self): + msg = protocol.Response(self.transport, 200) + self.assertEqual([], msg.headers) - def test_write(self): - self.writer.write(b'test') - self.assertTrue(self.transport.write.called) - self.assertEqual((b'test',), self.transport.write.call_args[0]) + msg.add_headers(('content-type', 'plain/html')) + self.assertEqual([('CONTENT-TYPE', 'plain/html')], msg.headers) - def test_write_str(self): - self.writer.write_str('test') - self.assertTrue(self.transport.write.called) - self.assertEqual((b'test',), self.transport.write.call_args[0]) + def test_add_headers_length(self): + msg = protocol.Response(self.transport, 200) + self.assertIsNone(msg.length) - def test_write_cunked(self): - self.writer.write_chunked('') - self.assertFalse(self.transport.write.called) + msg.add_headers(('content-length', '200')) + self.assertEqual(200, msg.length) - self.writer.write_chunked('data') + def test_add_headers_upgrade(self): + msg = protocol.Response(self.transport, 200) + self.assertFalse(msg.upgrade) + + msg.add_headers(('connection', 'upgrade')) + self.assertTrue(msg.upgrade) + + def test_add_headers_upgrade_websocket(self): + msg = protocol.Response(self.transport, 200) + + msg.add_headers(('upgrade', 'test')) + self.assertEqual([], msg.headers) + + msg.add_headers(('upgrade', 'websocket')) + self.assertEqual([('UPGRADE', 'websocket')], msg.headers) + + def test_add_headers_connection_keepalive(self): + msg = protocol.Response(self.transport, 200) + + msg.add_headers(('connection', 'keep-alive')) + self.assertEqual([], msg.headers) + self.assertTrue(msg.keepalive) + + msg.add_headers(('connection', 'close')) + self.assertFalse(msg.keepalive) + + def test_add_headers_hop_headers(self): + msg = protocol.Response(self.transport, 200) + + msg.add_headers(('connection', 'test'), ('transfer-encoding', 't')) + self.assertEqual([], msg.headers) + + def test_default_headers(self): + msg = protocol.Response(self.transport, 200) + + headers = [r for r, _ in msg._default_headers()] + self.assertIn('DATE', headers) + self.assertIn('CONNECTION', headers) + + def test_default_headers_server(self): + msg = protocol.Response(self.transport, 200) + + headers = [r for r, _ in msg._default_headers()] + self.assertIn('SERVER', headers) + + def test_default_headers_useragent(self): + msg = protocol.Request(self.transport, 'GET', '/') + + headers = [r for r, _ in msg._default_headers()] + self.assertNotIn('SERVER', headers) + self.assertIn('USER-AGENT', headers) + + def test_default_headers_chunked(self): + msg = protocol.Response(self.transport, 200) + + headers = [r for r, _ in msg._default_headers()] + self.assertNotIn('TRANSFER-ENCODING', headers) + + msg.force_chunked() + + headers = [r for r, _ in msg._default_headers()] + self.assertIn('TRANSFER-ENCODING', headers) + + def test_default_headers_connection_upgrade(self): + msg = protocol.Response(self.transport, 200) + msg.upgrade = True + + headers = [r for r in msg._default_headers() if r[0] == 'CONNECTION'] + self.assertEqual([('CONNECTION', 'upgrade')], headers) + + def test_default_headers_connection_close(self): + msg = protocol.Response(self.transport, 200) + msg.force_close() + + headers = [r for r in msg._default_headers() if r[0] == 'CONNECTION'] + self.assertEqual([('CONNECTION', 'close')], headers) + + def test_default_headers_connection_keep_alive(self): + msg = protocol.Response(self.transport, 200) + msg.keepalive = True + + headers = [r for r in msg._default_headers() if r[0] == 'CONNECTION'] + self.assertEqual([('CONNECTION', 'keep-alive')], headers) + + def test_send_headers(self): + write = self.transport.write = unittest.mock.Mock() + + msg = protocol.Response(self.transport, 200) + msg.add_headers(('content-type', 'plain/html')) + self.assertFalse(msg.is_headers_sent()) + + msg.send_headers() + + content = b''.join([arg[1][0] for arg in list(write.mock_calls)]) + + self.assertTrue(content.startswith(b'HTTP/1.1 200 OK\r\n')) + self.assertIn(b'CONTENT-TYPE: plain/html', content) + self.assertTrue(msg.headers_sent) + self.assertTrue(msg.is_headers_sent()) + + def test_send_headers_nomore_add(self): + msg = protocol.Response(self.transport, 200) + msg.add_headers(('content-type', 'plain/html')) + msg.send_headers() + + self.assertRaises(AssertionError, + msg.add_header, 'content-type', 'plain/html') + + def test_prepare_length(self): + msg = protocol.Response(self.transport, 200) + length = msg._write_length_payload = unittest.mock.Mock() + length.return_value = iter([1, 2, 3]) + + msg.add_headers(('content-length', '200')) + msg.send_headers() + + self.assertTrue(length.called) + self.assertTrue((200,), length.call_args[0]) + + def test_prepare_chunked_force(self): + msg = protocol.Response(self.transport, 200) + msg.force_chunked() + + chunked = msg._write_chunked_payload = unittest.mock.Mock() + chunked.return_value = iter([1, 2, 3]) + + msg.add_headers(('content-length', '200')) + msg.send_headers() + self.assertTrue(chunked.called) + + def test_prepare_chunked_no_length(self): + msg = protocol.Response(self.transport, 200) + + chunked = msg._write_chunked_payload = unittest.mock.Mock() + chunked.return_value = iter([1, 2, 3]) + + msg.send_headers() + self.assertTrue(chunked.called) + + def test_prepare_eof(self): + msg = protocol.Response(self.transport, 200, http_version=(1, 0)) + + eof = msg._write_eof_payload = unittest.mock.Mock() + eof.return_value = iter([1, 2, 3]) + + msg.send_headers() + self.assertTrue(eof.called) + + def test_write_auto_send_headers(self): + msg = protocol.Response(self.transport, 200, http_version=(1, 0)) + msg._send_headers = True + + msg.write(b'data1') + self.assertTrue(msg.headers_sent) + + def test_write_payload_eof(self): + write = self.transport.write = unittest.mock.Mock() + msg = protocol.Response(self.transport, 200, http_version=(1, 0)) + msg.send_headers() + + msg.write(b'data1') + self.assertTrue(msg.headers_sent) + + msg.write(b'data2') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertEqual( + b'data1data2', content.split(b'\r\n\r\n', 1)[-1]) + + def test_write_payload_chunked(self): + write = self.transport.write = unittest.mock.Mock() + + msg = protocol.Response(self.transport, 200) + msg.force_chunked() + msg.send_headers() + + msg.write(b'data') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertEqual( + b'4\r\ndata\r\n0\r\n\r\n', + content.split(b'\r\n\r\n', 1)[-1]) + + def test_write_payload_chunked_multiple(self): + write = self.transport.write = unittest.mock.Mock() + + msg = protocol.Response(self.transport, 200) + msg.force_chunked() + msg.send_headers() + + msg.write(b'data1') + msg.write(b'data2') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertEqual( + b'5\r\ndata1\r\n5\r\ndata2\r\n0\r\n\r\n', + content.split(b'\r\n\r\n', 1)[-1]) + + def test_write_payload_length(self): + write = self.transport.write = unittest.mock.Mock() + + msg = protocol.Response(self.transport, 200) + msg.add_headers(('content-length', '2')) + msg.send_headers() + + msg.write(b'd') + msg.write(b'ata') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertEqual( + b'da', content.split(b'\r\n\r\n', 1)[-1]) + + def test_write_payload_chunked_filter(self): + write = self.transport.write = unittest.mock.Mock() + + msg = protocol.Response(self.transport, 200) + msg.send_headers() + + msg.add_chunking_filter(2) + msg.write(b'data') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertTrue(content.endswith(b'2\r\nda\r\n2\r\nta\r\n0\r\n\r\n')) + + def test_write_payload_chunked_filter_mutiple_chunks(self): + write = self.transport.write = unittest.mock.Mock() + msg = protocol.Response(self.transport, 200) + msg.send_headers() + + msg.add_chunking_filter(2) + msg.write(b'data1') + msg.write(b'data2') + msg.write_eof() + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertTrue(content.endswith( + b'2\r\nda\r\n2\r\nta\r\n2\r\n1d\r\n2\r\nat\r\n' + b'2\r\na2\r\n0\r\n\r\n')) + + def test_write_payload_chunked_large_chunk(self): + write = self.transport.write = unittest.mock.Mock() + msg = protocol.Response(self.transport, 200) + msg.send_headers() + + msg.add_chunking_filter(1024) + msg.write(b'data') + msg.write_eof() + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertTrue(content.endswith(b'4\r\ndata\r\n0\r\n\r\n')) + + _comp = zlib.compressobj(wbits=-zlib.MAX_WBITS) + _COMPRESSED = b''.join([_comp.compress(b'data'), _comp.flush()]) + + def test_write_payload_deflate_filter(self): + write = self.transport.write = unittest.mock.Mock() + msg = protocol.Response(self.transport, 200) + msg.add_headers(('content-length', '%s' % len(self._COMPRESSED))) + msg.send_headers() + + msg.add_compression_filter('deflate') + msg.write(b'data') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertEqual( + self._COMPRESSED, content.split(b'\r\n\r\n', 1)[-1]) + + def test_write_payload_deflate_and_chunked(self): + write = self.transport.write = unittest.mock.Mock() + msg = protocol.Response(self.transport, 200) + msg.send_headers() + + msg.add_compression_filter('deflate') + msg.add_chunking_filter(2) + + msg.write(b'data') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) self.assertEqual( - [(b'4\r\n',), (b'data',), (b'\r\n',)], - [c[0] for c in self.transport.write.call_args_list]) + b'2\r\nKI\r\n2\r\n,I\r\n2\r\n\x04\x00\r\n0\r\n\r\n', + content.split(b'\r\n\r\n', 1)[-1]) + + def test_write_payload_chunked_and_deflate(self): + write = self.transport.write = unittest.mock.Mock() + msg = protocol.Response(self.transport, 200) + msg.add_headers(('content-length', '%s' % len(self._COMPRESSED))) + + msg.add_chunking_filter(2) + msg.add_compression_filter('deflate') + msg.send_headers() - def test_write_eof(self): - self.writer.write_chunked_eof() - self.assertEqual((b'0\r\n\r\n',), self.transport.write.call_args[0]) + msg.write(b'data') + msg.write_eof() + + content = b''.join([c[1][0] for c in list(write.mock_calls)]) + self.assertEqual( + self._COMPRESSED, content.split(b'\r\n\r\n', 1)[-1]) diff --git a/tulip/http/client.py b/tulip/http/client.py index b4db5cc..b65b90a 100644 --- a/tulip/http/client.py +++ b/tulip/http/client.py @@ -10,15 +10,6 @@ Most basic usage: headers['status'] == '200 Ok' # or some such assert isinstance(response, bytes) -However you can also open a stream: - - f, wstream = http_client.open_stream(url, method, headers) - wstream.write(b'abc') - wstream.writelines([b'def', b'ghi']) - wstream.write_eof() - sts, headers, rstream = yield from f - response = yield from rstream.read() - TODO: Reuse email.Message class (or its subclass, http.client.HTTPMessage). TODO: How do we do connection keep alive? Pooling? """ @@ -45,7 +36,7 @@ class HttpClientProtocol: def __init__(self, host, port=None, *, path='/', method='GET', headers=None, ssl=None, - make_body=None, encoding='utf-8', version='1.1', + make_body=None, encoding='utf-8', version=(1, 1), chunked=False): host = self.validate(host, 'host') if ':' in host: @@ -70,7 +61,7 @@ class HttpClientProtocol: self.validate(value, 'header value', True) self.headers[key] = value self.encoding = self.validate(encoding, 'encoding') - self.version = self.validate(version, 'version') + self.version = version self.make_body = make_body self.chunked = chunked self.ssl = ssl @@ -127,22 +118,22 @@ class HttpClientProtocol: def connection_made(self, transport): self.transport = transport self.stream = protocol.HttpStreamReader() - self.wstream = protocol.HttpStreamWriter(transport) - - line = '{} {} HTTP/{}\r\n'.format(self.method, - self.path, - self.version) - self.wstream.write_str(line) - for key, value in self.headers.items(): - self.wstream.write_str('{}: {}\r\n'.format(key, value)) - self.wstream.write(b'\r\n') + + self.request = protocol.Request( + transport, self.method, self.path, self.version) + + self.request.add_headers(*self.headers.items()) + self.request.send_headers() + if self.make_body is not None: if self.chunked: self.make_body( - self.wstream.write_chunked, self.wstream.write_chunked_eof) + self.request.write, self.request.eof) else: self.make_body( - self.wstream.write_str, self.wstream.write_eof) + self.request.write, self.request.eof) + else: + self.request.write_eof() def data_received(self, data): self.stream.feed_data(data) diff --git a/tulip/http/protocol.py b/tulip/http/protocol.py index 2ff7876..1773cab 100644 --- a/tulip/http/protocol.py +++ b/tulip/http/protocol.py @@ -1,12 +1,17 @@ """Http related helper utils.""" -__all__ = ['HttpStreamReader', 'HttpStreamWriter', - 'HttpMessage', 'RequestLine', 'ResponseStatus'] +__all__ = ['HttpStreamReader', + 'HttpMessage', 'Request', 'Response', + 'RawHttpMessage', 'RequestLine', 'ResponseStatus'] import collections +import email.utils import functools import http.client +import http.server +import itertools import re +import sys import zlib import tulip @@ -15,7 +20,7 @@ METHRE = re.compile('[A-Z0-9$-_.]+') VERSRE = re.compile('HTTP/(\d+).(\d+)') HDRRE = re.compile(b"[\x00-\x1F\x7F()<>@,;:\[\]={} \t\\\\\"]") CONTINUATION = (b' ', b'\t') - +RESPONSES = http.server.BaseHTTPRequestHandler.responses RequestLine = collections.namedtuple( 'RequestLine', ['method', 'uri', 'version']) @@ -25,109 +30,8 @@ ResponseStatus = collections.namedtuple( 'ResponseStatus', ['version', 'code', 'reason']) -HttpMessage = collections.namedtuple( - 'HttpMessage', ['headers', 'payload', 'should_close', 'compression']) - - -class StreamEofException(http.client.HTTPException): - """eof received""" - - -def wrap_payload_reader(f): - """wrap_payload_reader wraps payload readers and redirect stream. - payload readers are generator functions, read_chunked_payload, - read_length_payload, read_eof_payload. - payload reader allows to modify data stream and feed data into stream. - - StreamReader instance should be send to generator as first parameter. - This steam is used as destination stream for processed data. - To send data to reader use generator's send() method. - - To indicate eof stream, throw StreamEofException exception into the reader. - In case of errors in incoming stream reader sets exception to - destination stream with StreamReader.set_exception() method. - - Before exit, reader generator returns unprocessed data. - """ - - @functools.wraps(f) - def wrapper(self, *args, **kw): - assert self._reader is None - - rstream = stream = tulip.StreamReader() - - encoding = kw.pop('encoding', None) - if encoding is not None: - if encoding not in ('gzip', 'deflate'): - raise ValueError( - 'Content-Encoding %r is not supported' % encoding) - - stream = DeflateStream(stream, encoding) - - reader = f(self, *args, **kw) - next(reader) - try: - reader.send(stream) - except StopIteration: - pass - else: - # feed buffer - self.line_count = 0 - self.byte_count = 0 - while self.buffer: - try: - reader.send(self.buffer.popleft()) - except StopIteration as exc: - buf = b''.join(self.buffer) - self.buffer.clear() - reader = None - if exc.value: - self.feed_data(exc.value) - - if buf: - self.feed_data(buf) - - break - - if reader is not None: - if self.eof: - try: - reader.throw(StreamEofException()) - except StopIteration as exc: - pass - else: - self._reader = reader - - return rstream - - return wrapper - - -class DeflateStream: - """DeflateStream decomress stream and feed data into specified steram.""" - - def __init__(self, stream, encoding): - self.stream = stream - zlib_mode = (16 + zlib.MAX_WBITS - if encoding == 'gzip' else -zlib.MAX_WBITS) - - self.zlib = zlib.decompressobj(wbits=zlib_mode) - - def feed_data(self, chunk): - try: - chunk = self.zlib.decompress(chunk) - except: - self.stream.set_exception(http.client.IncompleteRead(b'')) - - if chunk: - self.stream.feed_data(chunk) - - def feed_eof(self): - self.stream.feed_data(self.zlib.flush()) - if not self.zlib.eof: - self.stream.set_exception(http.client.IncompleteRead(b'')) - - self.stream.feed_eof() +RawHttpMessage = collections.namedtuple( + 'RawHttpMessage', ['headers', 'payload', 'should_close', 'compression']) class HttpStreamReader(tulip.StreamReader): @@ -135,32 +39,32 @@ class HttpStreamReader(tulip.StreamReader): MAX_HEADERS = 32768 MAX_HEADERFIELD_SIZE = 8190 - # if _reader is set, feed_data and feed_eof sends data into - # _reader instead of self. is it being used as stream redirection for - # read_chunked_payload, read_length_payload and read_eof_payload - _reader = None + # if _parser is set, feed_data and feed_eof sends data into + # _parser instead of self. is it being used as stream redirection for + # _parse_chunked_payload, _parse_length_payload and _parse_eof_payload + _parser = None def feed_data(self, data): - """_reader is a generator, if _reader is set, feed_data sends - incoming data into this generator untile generates stops.""" - if self._reader: + """_parser is a generator, if _parser is set, feed_data sends + incoming data into the generator untile generator stops.""" + if self._parser: try: - self._reader.send(data) + self._parser.send(data) except StopIteration as exc: - self._reader = None + self._parser = None if exc.value: self.feed_data(exc.value) else: super().feed_data(data) def feed_eof(self): - """_reader is a generator, if _reader is set feed_eof throws + """_parser is a generator, if _parser is set feed_eof throws StreamEofException into this generator.""" - if self._reader: + if self._parser: try: - self._reader.throw(StreamEofException()) + self._parser.throw(StreamEofException()) except StopIteration: - self._reader = None + self._parser = None super().feed_eof() @@ -314,9 +218,8 @@ class HttpStreamReader(tulip.StreamReader): return headers - @wrap_payload_reader - def read_chunked_payload(self): - """Read chunked stream.""" + def _parse_chunked_payload(self): + """Chunked transfer encoding parser.""" stream = yield try: @@ -331,7 +234,7 @@ class HttpStreamReader(tulip.StreamReader): line, data = data.split(b'\n', 1) # Read the next chunk size from the file - i = line.find(b";") + i = line.find(b';') if i >= 0: line = line[:i] # strip chunk-extensions try: @@ -375,8 +278,7 @@ class HttpStreamReader(tulip.StreamReader): except http.client.IncompleteRead as exc: stream.set_exception(exc) - @wrap_payload_reader - def read_length_payload(self, length): + def _parse_length_payload(self, length): """Read specified amount of bytes.""" stream = yield @@ -400,8 +302,7 @@ class HttpStreamReader(tulip.StreamReader): except StreamEofException: stream.set_exception(http.client.IncompleteRead(b'')) - @wrap_payload_reader - def read_eof_payload(self): + def _parse_eof_payload(self): """Read all bytes untile eof.""" stream = yield @@ -437,11 +338,11 @@ class HttpStreamReader(tulip.StreamReader): chunked = value.lower() == 'chunked' elif name == 'SEC-WEBSOCKET-KEY1': length = 8 - elif name == "CONNECTION": + elif name == 'CONNECTION': v = value.lower() - if v == "close": + if v == 'close': close_conn = True - elif v == "keep-alive": + elif v == 'keep-alive': close_conn = False elif compression and name == 'CONTENT-ENCODING': enc = value.lower() @@ -451,9 +352,9 @@ class HttpStreamReader(tulip.StreamReader): if close_conn is None: close_conn = version <= (1, 0) - # payload stream + # payload parser if chunked: - payload = self.read_chunked_payload(encoding=encoding) + parser = self._parse_chunked_payload() elif length is not None: try: @@ -464,45 +365,513 @@ class HttpStreamReader(tulip.StreamReader): if length < 0: raise http.client.HTTPException('CONTENT-LENGTH') - payload = self.read_length_payload(length, encoding=encoding) + parser = self._parse_length_payload(length) else: if readall: - payload = self.read_eof_payload(encoding=encoding) + parser = self._parse_eof_payload() + else: + parser = self._parse_length_payload(0) + + next(parser) + + payload = stream = tulip.StreamReader() + + # payload decompression wrapper + if encoding is not None: + stream = DeflateStream(stream, encoding) + + try: + # initialize payload parser with stream, stream is being + # used by parser as destination stream + parser.send(stream) + except StopIteration: + pass + else: + # feed existing buffer to payload parser + self.byte_count = 0 + while self.buffer: + try: + parser.send(self.buffer.popleft()) + except StopIteration as exc: + parser = None + + # parser is done + buf = b''.join(self.buffer) + self.buffer.clear() + + # re-add remaining data back to buffer + if exc.value: + self.feed_data(exc.value) + + if buf: + self.feed_data(buf) + + break + + # parser still require more data + if parser is not None: + if self.eof: + try: + parser.throw(StreamEofException()) + except StopIteration as exc: + pass + else: + self._parser = parser + + return RawHttpMessage(headers, payload, close_conn, encoding) + + +class StreamEofException(http.client.HTTPException): + """eof received""" + + +class DeflateStream: + """DeflateStream decomress stream and feed data into specified stream.""" + + def __init__(self, stream, encoding): + self.stream = stream + zlib_mode = (16 + zlib.MAX_WBITS + if encoding == 'gzip' else -zlib.MAX_WBITS) + + self.zlib = zlib.decompressobj(wbits=zlib_mode) + + def set_exception(self, exc): + self.stream.set_exception(exc) + + def feed_data(self, chunk): + try: + chunk = self.zlib.decompress(chunk) + except: + self.stream.set_exception(http.client.IncompleteRead(b'')) + + if chunk: + self.stream.feed_data(chunk) + + def feed_eof(self): + self.stream.feed_data(self.zlib.flush()) + if not self.zlib.eof: + self.stream.set_exception(http.client.IncompleteRead(b'')) + + self.stream.feed_eof() + + +EOF_MARKER = object() +EOL_MARKER = object() + + +def wrap_payload_filter(func): + """Wraps payload filter and piped filters. + + Filter is a generatator that accepts arbitrary chunks of data, + modify data and emit new stream of data. + + For example we have stream of chunks: ['1', '2', '3', '4', '5'], + we can apply chunking filter to this stream: + + ['1', '2', '3', '4', '5'] + | + response.add_chunking_filter(2) + | + ['12', '34', '5'] + + It is possible to use different filters at the same time. + + For a example to compress incoming stream with 'deflate' encoding + and then split data and emit chunks of 8196 bytes size chunks: + + >> response.add_compression_filter('deflate') + >> response.add_chunking_filter(8196) + + Filters do not alter transfer encoding. + + Filter can receive types types of data, bytes object or EOF_MARKER. + + 1. If filter receives bytes object, it should process data + and yield processed data then yield EOL_MARKER object. + 2. If Filter recevied EOF_MARKER, it should yield remaining + data (buffered) and then yield EOF_MARKER. + """ + @functools.wraps(func) + def wrapper(self, *args, **kw): + new_filter = func(self, *args, **kw) + + filter = self.filter + if filter is not None: + next(new_filter) + self.filter = filter_pipe(filter, new_filter) + else: + self.filter = new_filter + + next(self.filter) + + return wrapper + + +def filter_pipe(filter, filter2): + """Creates pipe between two filters. + + filter_pipe() feeds first filter with incoming data and then + send yielded from first filter data into filter2, results of + filter2 are being emitted. + + 1. If filter_pipe receives bytes object, it sends it to the first filter. + 2. Reads yielded values from the first filter until it receives + EOF_MARKER or EOL_MARKER. + 3. Each of this values is being send to second filter. + 4. Reads yielded values from second filter until it recives EOF_MARKER or + EOL_MARKER. Each of this values yields to writer. + """ + chunk = yield + + while True: + eof = chunk is EOF_MARKER + chunk = filter.send(chunk) + + while chunk is not EOL_MARKER: + chunk = filter2.send(chunk) + + while chunk not in (EOF_MARKER, EOL_MARKER): + yield chunk + chunk = next(filter2) + + if chunk is not EOF_MARKER: + if eof: + chunk = EOF_MARKER + else: + chunk = next(filter) else: - payload = self.read_length_payload(0, encoding=encoding) + break + + chunk = yield EOL_MARKER + + +class HttpMessage: + """HttpMessage allows to write headers and payload to a stream. + + For example, lets say we want to read file then compress it with deflate + compression and then send it with chunked transfer encoding, code may look + like this: + + >> response = tulip.http.Response(transport, 200) + + We have to use deflate compression first: + + >> response.add_compression_filter('deflate') + + Then we want to split output stream into chunks of 1024 bytes size: + + >> response.add_chunking_filter(1024) + + We can add headers to response with add_headers() method. add_headers() + does not send data to transport, send_headers() sends request/response + line and then sends headers: + + >> response.add_headers( + .. ('Content-Disposition', 'attachment; filename="..."')) + >> response.send_headers() + + Now we can use chunked writer to write stream to a network stream. + First call to write() method sends response status line and headers, + add_header() and add_headers() method unavailble at this stage: - return HttpMessage(headers, payload, close_conn, encoding) + >> with open('...', 'rb') as f: + .. chunk = fp.read(8196) + .. while chunk: + .. response.write(chunk) + .. chunk = fp.read(8196) + >> response.write_eof() + """ + + writer = None + + # 'filter' is being used for altering write() bahaviour, + # add_chunking_filter adds deflate/gzip compression and + # add_compression_filter splits incoming data into a chunks. + filter = None + + HOP_HEADERS = None # Must be set by subclass. -class HttpStreamWriter: + SERVER_SOFTWARE = 'Python/{0[0]}.{0[1]} tulip/0.0'.format(sys.version_info) - def __init__(self, transport, encoding='utf-8'): + status = None + status_line = b'' + + # subclass can enable auto sending headers with write() call, + # this is useful for wsgi's start_response implementation. + _send_headers = False + + def __init__(self, transport, version, close): self.transport = transport - self.encoding = encoding - - def encode(self, s): - if isinstance(s, bytes): - return s - return s.encode(self.encoding) - - def decode(self, s): - if isinstance(s, str): - return s - return s.decode(self.encoding) - - def write(self, b): - self.transport.write(b) - - def write_str(self, s): - self.transport.write(self.encode(s)) - - def write_chunked(self, chunk): - if not chunk: - return - data = self.encode(chunk) - self.write_str('{:x}\r\n'.format(len(data))) - self.transport.write(data) - self.transport.write(b'\r\n') - - def write_chunked_eof(self): - self.transport.write(b'0\r\n\r\n') + self.version = version + self.closing = close + self.keepalive = False + + self.chunked = False + self.length = None + self.upgrade = False + self.headers = [] + self.headers_sent = False + + def force_close(self): + self.closing = True + + def force_chunked(self): + self.chunked = True + + def keep_alive(self): + return self.keepalive and not self.closing + + def is_headers_sent(self): + return self.headers_sent + + def add_header(self, name, value): + """Analyze headers. Calculate content length, + removes hop headers, etc.""" + assert not self.headers_sent, 'headers have been sent already' + assert isinstance(name, str), '%r is not a string' % name + + name = name.strip().upper() + + if name == 'CONTENT-LENGTH': + self.length = int(value) + + if name == 'CONNECTION': + val = value.lower().strip() + # handle websocket + if val == 'upgrade': + self.upgrade = True + # connection keep-alive + elif val == 'close': + self.keepalive = False + elif val == 'keep-alive': + self.keepalive = True + + elif name == 'UPGRADE': + if 'websocket' in value.lower(): + self.headers.append((name, value)) + + elif name == 'TRANSFER-ENCODING' and not self.chunked: + self.chunked = value.lower().strip() == 'chunked' + + elif name not in self.HOP_HEADERS: + # ignore hopbyhop headers + self.headers.append((name, value)) + + def add_headers(self, *headers): + """Adds headers to a http message.""" + for name, value in headers: + self.add_header(name, value) + + def send_headers(self): + """Writes headers to a stream. Constructs payload writer.""" + # Chunked response is only for HTTP/1.1 clients or newer + # and there is no Content-Length header is set. + # Do not use chunked responses when the response is guaranteed to + # not have a response body (304, 204). + assert not self.headers_sent, 'headers have been sent already' + self.headers_sent = True + + if (self.chunked is True) or ( + self.length is None and + self.version >= (1, 1) and + self.status not in (304, 204)): + self.chunked = True + self.writer = self._write_chunked_payload() + + elif self.length is not None: + self.writer = self._write_length_payload(self.length) + + else: + self.writer = self._write_eof_payload() + + next(self.writer) + + # status line + self.transport.write(self.status_line.encode('ascii')) + + # send headers + self.transport.write( + ('%s\r\n\r\n' % '\r\n'.join( + ('%s: %s' % (k, v) for k, v in + itertools.chain(self._default_headers(), self.headers))) + ).encode('ascii')) + + def _default_headers(self): + # set the connection header + if self.upgrade: + connection = 'upgrade' + elif self.keep_alive(): + connection = 'keep-alive' + else: + connection = 'close' + + headers = [('CONNECTION', connection)] + + if self.chunked: + headers.append(('TRANSFER-ENCODING', 'chunked')) + + return headers + + def write(self, chunk): + """write() writes chunk of data to a steram by using different writers. + writer uses filter to modify chunk of data. write_eof() indicates + end of stream. writer can't be used after write_eof() method + being called.""" + assert (isinstance(chunk, (bytes, bytearray)) or + chunk is EOF_MARKER), chunk + + if self._send_headers and not self.headers_sent: + self.send_headers() + + assert self.writer is not None, 'send_headers() is not called.' + + if self.filter: + chunk = self.filter.send(chunk) + while chunk not in (EOF_MARKER, EOL_MARKER): + self.writer.send(chunk) + chunk = next(self.filter) + else: + if chunk is not EOF_MARKER: + self.writer.send(chunk) + + def write_eof(self): + self.write(EOF_MARKER) + try: + self.writer.throw(StreamEofException()) + except StopIteration: + pass + + def _write_chunked_payload(self): + """Write data in chunked transfer encoding.""" + while True: + try: + chunk = yield + except StreamEofException: + self.transport.write(b'0\r\n\r\n') + break + + self.transport.write('{:x}\r\n'.format(len(chunk)).encode('ascii')) + self.transport.write(chunk) + self.transport.write(b'\r\n') + + def _write_length_payload(self, length): + """Write specified number of bytes to a stream.""" + while True: + try: + chunk = yield + except StreamEofException: + break + + if length: + l = len(chunk) + if length >= l: + self.transport.write(chunk) + else: + self.transport.write(chunk[:length]) + + length = max(0, length-l) + + def _write_eof_payload(self): + while True: + try: + chunk = yield + except StreamEofException: + break + + self.transport.write(chunk) + + @wrap_payload_filter + def add_chunking_filter(self, chunk_size=16*1024): + """Split incoming stream into chunks.""" + buf = bytearray() + chunk = yield + + while True: + if chunk is EOF_MARKER: + if buf: + yield buf + + yield EOF_MARKER + + else: + buf.extend(chunk) + + while len(buf) >= chunk_size: + chunk, buf = buf[:chunk_size], buf[chunk_size:] + yield chunk + + chunk = yield EOL_MARKER + + @wrap_payload_filter + def add_compression_filter(self, encoding='deflate'): + """Compress incoming stream with deflate or gzip encoding.""" + zlib_mode = (16 + zlib.MAX_WBITS + if encoding == 'gzip' else -zlib.MAX_WBITS) + zcomp = zlib.compressobj(wbits=zlib_mode) + + chunk = yield + while True: + if chunk is EOF_MARKER: + yield zcomp.flush() + chunk = yield EOF_MARKER + + else: + yield zcomp.compress(chunk) + chunk = yield EOL_MARKER + + +class Response(HttpMessage): + """Create http response message. + + Transport is a socket stream transport. status is a response status code, + status has to be integer value. http_version is a tuple that represents + http version, (1, 0) stands for HTTP/1.0 and (1, 1) is for HTTP/1.1 + """ + + HOP_HEADERS = { + 'CONNECTION', + 'KEEP-ALIVE', + 'PROXY-AUTHENTICATE', + 'PROXY-AUTHORIZATION', + 'TE', + 'TRAILERS', + 'TRANSFER-ENCODING', + 'UPGRADE', + 'SERVER', + 'DATE', + } + + def __init__(self, transport, status, http_version=(1, 1), close=False): + super().__init__(transport, http_version, close) + + self.status = status + self.status_line = 'HTTP/{0[0]}.{0[1]} {1} {2}\r\n'.format( + http_version, status, RESPONSES[status][0]) + + def _default_headers(self): + headers = super()._default_headers() + headers.extend((('DATE', email.utils.formatdate()), + ('SERVER', self.SERVER_SOFTWARE))) + + return headers + + +class Request(HttpMessage): + + HOP_HEADERS = () + + def __init__(self, transport, method, uri, + http_version=(1, 1), close=False): + super().__init__(transport, http_version, close) + + self.method = method + self.uri = uri + self.status_line = '{0} {1} HTTP/{2[0]}.{2[1]}\r\n'.format( + method, uri, http_version) + + def _default_headers(self): + headers = super()._default_headers() + headers.append(('USER-AGENT', self.SERVER_SOFTWARE)) + + return headers diff --git a/tulip/selector_events.py b/tulip/selector_events.py index cc7fe33..9bc9c23 100644 --- a/tulip/selector_events.py +++ b/tulip/selector_events.py @@ -352,7 +352,7 @@ class _SelectorSocketTransport(transports.Transport): self._event_loop.call_soon(self._protocol.eof_received) def write(self, data): - assert isinstance(data, bytes) + assert isinstance(data, (bytes, bytearray)), repr(data) assert not self._closing if not data: return -- cgit v1.2.1