summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNikolay Kim <fafhrd91@gmail.com>2013-03-20 10:11:28 -0700
committerNikolay Kim <fafhrd91@gmail.com>2013-03-20 10:11:28 -0700
commit1ca8eed2801f45a570a31476889f5b2cffcd088a (patch)
treee1b40de8b755eb51e5713fa17d43f80a75676c61
parentfeddce0dfce6dc5feb7160ab29d8585d061f5e14 (diff)
downloadtrollius-1ca8eed2801f45a570a31476889f5b2cffcd088a.tar.gz
http Response and Request helpers
-rw-r--r--srv.py65
-rw-r--r--tests/http_protocol_test.py814
-rw-r--r--tulip/http/client.py35
-rw-r--r--tulip/http/protocol.py701
-rw-r--r--tulip/selector_events.py2
5 files changed, 1231 insertions, 386 deletions
diff --git a/srv.py b/srv.py
index 540e63b..710e0e7 100644
--- a/srv.py
+++ b/srv.py
@@ -51,22 +51,36 @@ class HttpServer(tulip.Protocol):
print(hdr, val)
headers.add_header(hdr, val)
- write = self.transport.write
if isdir and not path.endswith('/'):
- bpath = path.encode('ascii')
- write(b'HTTP/1.0 302 Redirected\r\n'
- b'URI: ' + bpath + b'/\r\n'
- b'Location: ' + bpath + b'/\r\n'
- b'\r\n')
+ path = path + '/'
+ response = tulip.http.Response(self.transport, 302)
+ response.add_headers(
+ ('URI', path),
+ ('Location', path))
+ response.send_headers()
+ response.write_eof()
+ self.transport.close()
return
- write(b'HTTP/1.0 200 Ok\r\n')
- if isdir:
- write(b'Content-type: text/html\r\n')
- else:
- write(b'Content-type: text/plain\r\n')
- write(b'\r\n')
+
+ response = tulip.http.Response(self.transport, 200)
+ response.add_header('Transfer-Encoding', 'chunked')
+
+ # content encoding
+ accept_encoding = headers.get('accept-encoding', '').lower()
+ if 'deflate' in accept_encoding:
+ response.add_header('Content-Encoding', 'deflate')
+ response.add_compression_filter('deflate')
+ elif 'gzip' in accept_encoding:
+ response.add_header('Content-Encoding', 'gzip')
+ response.add_compression_filter('gzip')
+
+ response.add_chunking_filter(1025)
+
if isdir:
- write(b'<ul>\r\n')
+ response.add_header('Content-type', 'text/html')
+ response.send_headers()
+
+ response.write(b'<ul>\r\n')
for name in sorted(os.listdir(path)):
if name.isprintable() and not name.startswith('.'):
try:
@@ -75,18 +89,27 @@ class HttpServer(tulip.Protocol):
pass
else:
if os.path.isdir(os.path.join(path, name)):
- write(b'<li><a href="' + bname +
- b'/">' + bname + b'/</a></li>\r\n')
+ response.write(b'<li><a href="' + bname +
+ b'/">' + bname + b'/</a></li>\r\n')
else:
- write(b'<li><a href="' + bname +
- b'">' + bname + b'</a></li>\r\n')
- write(b'</ul>')
+ response.write(b'<li><a href="' + bname +
+ b'">' + bname + b'</a></li>\r\n')
+ response.write(b'</ul>')
else:
+ response.add_header('Content-type', 'text/plain')
+ response.send_headers()
+
try:
- with open(path, 'rb') as f:
- write(f.read())
+ with open(path, 'rb') as fp:
+ chunk = fp.read(8196)
+ while chunk:
+ if not response.write(chunk):
+ break
+ chunk = fp.read(8196)
except OSError:
- write(b'Cannot open\r\n')
+ response.write(b'Cannot open')
+
+ response.write_eof()
self.transport.close()
def connection_made(self, transport):
diff --git a/tests/http_protocol_test.py b/tests/http_protocol_test.py
index c095228..408bfc7 100644
--- a/tests/http_protocol_test.py
+++ b/tests/http_protocol_test.py
@@ -203,151 +203,6 @@ class HttpStreamReaderTests(LogTrackingTestCase):
self.assertIn("limit request headers fields size", str(cm.exception))
- def test_read_payload_unknown_encoding(self):
- self.assertRaises(
- ValueError, self.stream.read_length_payload, encoding='unknown')
-
- def test_read_payload(self):
- self.stream.feed_data(b'da')
- self.stream.feed_data(b't')
- self.stream.feed_data(b'ali')
- self.stream.feed_data(b'ne')
-
- stream = self.stream.read_length_payload(4)
- self.assertIsInstance(stream, tulip.StreamReader)
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'data', data)
- self.assertEqual(b'line', b''.join(self.stream.buffer))
-
- def test_read_payload_eof(self):
- self.stream.feed_data(b'da')
- self.stream.feed_eof()
- stream = self.stream.read_length_payload(4)
-
- self.assertRaises(
- http.client.IncompleteRead,
- self.loop.run_until_complete, tulip.Task(stream.read()))
-
- def test_read_payload_eof_exc(self):
- self.stream.feed_data(b'da')
- stream = self.stream.read_length_payload(4)
-
- def eof():
- yield from []
- self.stream.feed_eof()
-
- t1 = tulip.Task(stream.read())
- t2 = tulip.Task(eof())
-
- self.loop.run_until_complete(tulip.Task(tulip.wait([t1, t2])))
- self.assertRaises(http.client.IncompleteRead, t1.result)
- self.assertIsNone(self.stream._reader)
-
- def test_read_payload_deflate(self):
- comp = zlib.compressobj(wbits=-zlib.MAX_WBITS)
-
- data = b''.join([comp.compress(b'data'), comp.flush()])
- stream = self.stream.read_length_payload(len(data), encoding='deflate')
-
- self.stream.feed_data(data)
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'data', data)
-
- def _test_read_payload_compress_error(self):
- data = b'123123123datadatadata'
- reader = protocol.length_reader(4)
- self.stream.feed_data(data)
- stream = self.stream.read_payload(reader, 'deflate')
-
- self.assertRaises(
- http.client.IncompleteRead,
- self.loop.run_until_complete, tulip.Task(stream.read()))
-
- def test_read_chunked_payload(self):
- stream = self.stream.read_chunked_payload()
- self.stream.feed_data(b'4\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n')
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'dataline', data)
-
- def test_read_chunked_payload_chunks(self):
- stream = self.stream.read_chunked_payload()
-
- self.stream.feed_data(b'4\r\ndata\r')
- self.stream.feed_data(b'\n4')
- self.stream.feed_data(b'\r')
- self.stream.feed_data(b'\n')
- self.stream.feed_data(b'line\r\n0\r\n')
- self.stream.feed_data(b'test\r\n\r\n')
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'dataline', data)
-
- def test_read_chunked_payload_incomplete(self):
- stream = self.stream.read_chunked_payload()
-
- self.stream.feed_data(b'4\r\ndata\r\n')
- self.stream.feed_eof()
-
- self.assertRaises(
- http.client.IncompleteRead,
- self.loop.run_until_complete, tulip.Task(stream.read()))
-
- def test_read_chunked_payload_extension(self):
- stream = self.stream.read_chunked_payload()
-
- self.stream.feed_data(
- b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n')
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'dataline', data)
-
- def test_read_chunked_payload_size_error(self):
- stream = self.stream.read_chunked_payload()
-
- self.stream.feed_data(b'blah\r\n')
- self.assertRaises(
- http.client.IncompleteRead,
- self.loop.run_until_complete, tulip.Task(stream.read()))
-
- def test_read_length_payload(self):
- stream = self.stream.read_length_payload(8)
-
- self.stream.feed_data(b'data')
- self.stream.feed_data(b'data')
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'datadata', data)
-
- def test_read_length_payload_zero(self):
- stream = self.stream.read_length_payload(0)
-
- self.stream.feed_data(b'data')
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'', data)
-
- def test_read_length_payload_incomplete(self):
- stream = self.stream.read_length_payload(8)
-
- self.stream.feed_data(b'data')
- self.stream.feed_eof()
-
- self.assertRaises(
- http.client.IncompleteRead,
- self.loop.run_until_complete, tulip.Task(stream.read()))
-
- def test_read_eof_payload(self):
- stream = self.stream.read_eof_payload()
-
- self.stream.feed_data(b'data')
- self.stream.feed_eof()
-
- data = self.loop.run_until_complete(tulip.Task(stream.read()))
- self.assertEqual(b'data', data)
-
def test_read_message_should_close(self):
self.stream.feed_data(
b'Host: example.com\r\nConnection: close\r\n\r\n')
@@ -477,7 +332,7 @@ class HttpStreamReaderTests(LogTrackingTestCase):
payload = self.loop.run_until_complete(tulip.Task(msg.payload.read()))
self.assertEqual(b'dataline', payload)
- def test_read_message_readall(self):
+ def test_read_message_readall_eof(self):
self.stream.feed_data(
b'Host: example.com\r\n\r\n')
self.stream.feed_data(b'data')
@@ -490,46 +345,653 @@ class HttpStreamReaderTests(LogTrackingTestCase):
payload = self.loop.run_until_complete(tulip.Task(msg.payload.read()))
self.assertEqual(b'dataline', payload)
+ def test_read_message_payload(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Length: 8\r\n\r\n')
+ self.stream.feed_data(b'data')
+ self.stream.feed_data(b'data')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message(readall=True)))
+
+ data = self.loop.run_until_complete(tulip.Task(msg.payload.read()))
+ self.assertEqual(b'datadata', data)
+
+ def test_read_message_payload_eof(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Length: 4\r\n\r\n')
+ self.stream.feed_data(b'da')
+ self.stream.feed_eof()
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message(readall=True)))
+
+ self.assertRaises(
+ http.client.IncompleteRead,
+ self.loop.run_until_complete, tulip.Task(msg.payload.read()))
+
+ def test_read_message_length_payload_zero(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Length: 0\r\n\r\n')
+ self.stream.feed_data(b'data')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ data = self.loop.run_until_complete(tulip.Task(msg.payload.read()))
+ self.assertEqual(b'', data)
+
+ def test_read_message_length_payload_incomplete(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Length: 8\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(b'data')
+ self.stream.feed_eof()
+ return (yield from msg.payload.read())
+
+ self.assertRaises(
+ http.client.IncompleteRead,
+ self.loop.run_until_complete, tulip.Task(coro()))
-class HttpStreamWriterTests(unittest.TestCase):
+ def test_read_message_eof_payload(self):
+ self.stream.feed_data(b'Host: example.com\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message(readall=True)))
+
+ def coro():
+ self.stream.feed_data(b'data')
+ self.stream.feed_eof()
+ return (yield from msg.payload.read())
+
+ data = self.loop.run_until_complete(tulip.Task(coro()))
+ self.assertEqual(b'data', data)
+
+ def test_read_message_length_payload(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Length: 4\r\n\r\n')
+ self.stream.feed_data(b'da')
+ self.stream.feed_data(b't')
+ self.stream.feed_data(b'ali')
+ self.stream.feed_data(b'ne')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message(readall=True)))
+
+ self.assertIsInstance(msg.payload, tulip.StreamReader)
+
+ data = self.loop.run_until_complete(tulip.Task(msg.payload.read()))
+ self.assertEqual(b'data', data)
+ self.assertEqual(b'line', b''.join(self.stream.buffer))
+
+ def test_read_message_length_payload_extra(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Length: 4\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(b'da')
+ self.stream.feed_data(b't')
+ self.stream.feed_data(b'ali')
+ self.stream.feed_data(b'ne')
+ return (yield from msg.payload.read())
+
+ data = self.loop.run_until_complete(tulip.Task(coro()))
+ self.assertEqual(b'data', data)
+ self.assertEqual(b'line', b''.join(self.stream.buffer))
+
+ def test_parse_length_payload_eof_exc(self):
+ parser = self.stream._parse_length_payload(4)
+ next(parser)
+
+ stream = tulip.StreamReader()
+ parser.send(stream)
+ self.stream._parser = parser
+ self.stream.feed_data(b'da')
+
+ def eof():
+ yield from []
+ self.stream.feed_eof()
+
+ t1 = tulip.Task(stream.read())
+ t2 = tulip.Task(eof())
+
+ self.loop.run_until_complete(tulip.Task(tulip.wait([t1, t2])))
+ self.assertRaises(http.client.IncompleteRead, t1.result)
+ self.assertIsNone(self.stream._parser)
+
+ def test_read_message_deflate_payload(self):
+ comp = zlib.compressobj(wbits=-zlib.MAX_WBITS)
+
+ data = b''.join([comp.compress(b'data'), comp.flush()])
+
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Content-Encoding: deflate\r\n' +
+ ('Content-Length: %s\r\n\r\n' % len(data)).encode())
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message(readall=True)))
+
+ def coro():
+ self.stream.feed_data(data)
+ return (yield from msg.payload.read())
+
+ data = self.loop.run_until_complete(tulip.Task(coro()))
+ self.assertEqual(b'data', data)
+
+ def test_read_message_chunked_payload(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Transfer-Encoding: chunked\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(
+ b'4\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n')
+ return (yield from msg.payload.read())
+
+ data = self.loop.run_until_complete(tulip.Task(coro()))
+ self.assertEqual(b'dataline', data)
+
+ def test_read_message_chunked_payload_chunks(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Transfer-Encoding: chunked\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(b'4\r\ndata\r')
+ self.stream.feed_data(b'\n4')
+ self.stream.feed_data(b'\r')
+ self.stream.feed_data(b'\n')
+ self.stream.feed_data(b'line\r\n0\r\n')
+ self.stream.feed_data(b'test\r\n\r\n')
+ return (yield from msg.payload.read())
+
+ data = self.loop.run_until_complete(tulip.Task(coro()))
+ self.assertEqual(b'dataline', data)
+
+ def test_read_message_chunked_payload_incomplete(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Transfer-Encoding: chunked\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(b'4\r\ndata\r\n')
+ self.stream.feed_eof()
+ return (yield from msg.payload.read())
+
+ self.assertRaises(
+ http.client.IncompleteRead,
+ self.loop.run_until_complete, tulip.Task(coro()))
+
+ def test_read_message_chunked_payload_extension(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Transfer-Encoding: chunked\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(
+ b'4;test\r\ndata\r\n4\r\nline\r\n0\r\ntest\r\n\r\n')
+ return (yield from msg.payload.read())
+
+ data = self.loop.run_until_complete(tulip.Task(coro()))
+ self.assertEqual(b'dataline', data)
+
+ def test_read_message_chunked_payload_size_error(self):
+ self.stream.feed_data(
+ b'Host: example.com\r\n'
+ b'Transfer-Encoding: chunked\r\n\r\n')
+
+ msg = self.loop.run_until_complete(
+ tulip.Task(self.stream.read_message()))
+
+ def coro():
+ self.stream.feed_data(b'blah\r\n')
+ return (yield from msg.payload.read())
+
+ self.assertRaises(
+ http.client.IncompleteRead,
+ self.loop.run_until_complete, tulip.Task(coro()))
+
+ def test_deflate_stream_set_exception(self):
+ stream = tulip.StreamReader()
+ dstream = protocol.DeflateStream(stream, 'deflate')
+
+ exc = ValueError()
+ dstream.set_exception(exc)
+ self.assertIs(exc, stream.exception())
+
+ def test_deflate_stream_feed_data(self):
+ stream = tulip.StreamReader()
+ dstream = protocol.DeflateStream(stream, 'deflate')
+
+ dstream.zlib = unittest.mock.Mock()
+ dstream.zlib.decompress.return_value = b'line'
+
+ dstream.feed_data(b'data')
+ self.assertEqual([b'line'], list(stream.buffer))
+
+ def test_deflate_stream_feed_data_err(self):
+ stream = tulip.StreamReader()
+ dstream = protocol.DeflateStream(stream, 'deflate')
+
+ exc = ValueError()
+ dstream.zlib = unittest.mock.Mock()
+ dstream.zlib.decompress.side_effect = exc
+
+ dstream.feed_data(b'data')
+ self.assertIsInstance(stream.exception(), http.client.IncompleteRead)
+
+ def test_deflate_stream_feed_eof(self):
+ stream = tulip.StreamReader()
+ dstream = protocol.DeflateStream(stream, 'deflate')
+
+ dstream.zlib = unittest.mock.Mock()
+ dstream.zlib.flush.return_value = b'line'
+
+ dstream.feed_eof()
+ self.assertEqual([b'line'], list(stream.buffer))
+ self.assertTrue(stream.eof)
+
+ def test_deflate_stream_feed_eof_err(self):
+ stream = tulip.StreamReader()
+ dstream = protocol.DeflateStream(stream, 'deflate')
+
+ dstream.zlib = unittest.mock.Mock()
+ dstream.zlib.flush.return_value = b'line'
+ dstream.zlib.eof = False
+
+ dstream.feed_eof()
+ self.assertIsInstance(stream.exception(), http.client.IncompleteRead)
+
+
+class HttpMessageTests(unittest.TestCase):
def setUp(self):
self.transport = unittest.mock.Mock()
- self.writer = protocol.HttpStreamWriter(self.transport)
- def test_ctor(self):
- transport = unittest.mock.Mock()
- writer = protocol.HttpStreamWriter(transport, 'latin-1')
- self.assertIs(writer.transport, transport)
- self.assertEqual(writer.encoding, 'latin-1')
+ def test_start_request(self):
+ msg = protocol.Request(
+ self.transport, 'GET', '/index.html', close=True)
+
+ self.assertIs(msg.transport, self.transport)
+ self.assertIsNone(msg.status)
+ self.assertTrue(msg.closing)
+ self.assertEqual(msg.status_line, 'GET /index.html HTTP/1.1\r\n')
+
+ def test_start_response(self):
+ msg = protocol.Response(self.transport, 200, close=True)
+
+ self.assertIs(msg.transport, self.transport)
+ self.assertEqual(msg.status, 200)
+ self.assertTrue(msg.closing)
+ self.assertEqual(msg.status_line, 'HTTP/1.1 200 OK\r\n')
+
+ def test_force_close(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertFalse(msg.closing)
+ msg.force_close()
+ self.assertTrue(msg.closing)
+
+ def test_force_chunked(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertFalse(msg.chunked)
+ msg.force_chunked()
+ self.assertTrue(msg.chunked)
+
+ def test_keep_alive(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertFalse(msg.keep_alive())
+ msg.keepalive = True
+ self.assertTrue(msg.keep_alive())
+
+ msg.force_close()
+ self.assertFalse(msg.keep_alive())
+
+ def test_add_header(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertEqual([], msg.headers)
- def test_encode(self):
- self.assertEqual(b'test', self.writer.encode('test'))
- self.assertEqual(b'test', self.writer.encode(b'test'))
+ msg.add_header('content-type', 'plain/html')
+ self.assertEqual([('CONTENT-TYPE', 'plain/html')], msg.headers)
- def test_decode(self):
- self.assertEqual('test', self.writer.decode('test'))
- self.assertEqual('test', self.writer.decode(b'test'))
+ def test_add_headers(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertEqual([], msg.headers)
- def test_write(self):
- self.writer.write(b'test')
- self.assertTrue(self.transport.write.called)
- self.assertEqual((b'test',), self.transport.write.call_args[0])
+ msg.add_headers(('content-type', 'plain/html'))
+ self.assertEqual([('CONTENT-TYPE', 'plain/html')], msg.headers)
- def test_write_str(self):
- self.writer.write_str('test')
- self.assertTrue(self.transport.write.called)
- self.assertEqual((b'test',), self.transport.write.call_args[0])
+ def test_add_headers_length(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertIsNone(msg.length)
- def test_write_cunked(self):
- self.writer.write_chunked('')
- self.assertFalse(self.transport.write.called)
+ msg.add_headers(('content-length', '200'))
+ self.assertEqual(200, msg.length)
- self.writer.write_chunked('data')
+ def test_add_headers_upgrade(self):
+ msg = protocol.Response(self.transport, 200)
+ self.assertFalse(msg.upgrade)
+
+ msg.add_headers(('connection', 'upgrade'))
+ self.assertTrue(msg.upgrade)
+
+ def test_add_headers_upgrade_websocket(self):
+ msg = protocol.Response(self.transport, 200)
+
+ msg.add_headers(('upgrade', 'test'))
+ self.assertEqual([], msg.headers)
+
+ msg.add_headers(('upgrade', 'websocket'))
+ self.assertEqual([('UPGRADE', 'websocket')], msg.headers)
+
+ def test_add_headers_connection_keepalive(self):
+ msg = protocol.Response(self.transport, 200)
+
+ msg.add_headers(('connection', 'keep-alive'))
+ self.assertEqual([], msg.headers)
+ self.assertTrue(msg.keepalive)
+
+ msg.add_headers(('connection', 'close'))
+ self.assertFalse(msg.keepalive)
+
+ def test_add_headers_hop_headers(self):
+ msg = protocol.Response(self.transport, 200)
+
+ msg.add_headers(('connection', 'test'), ('transfer-encoding', 't'))
+ self.assertEqual([], msg.headers)
+
+ def test_default_headers(self):
+ msg = protocol.Response(self.transport, 200)
+
+ headers = [r for r, _ in msg._default_headers()]
+ self.assertIn('DATE', headers)
+ self.assertIn('CONNECTION', headers)
+
+ def test_default_headers_server(self):
+ msg = protocol.Response(self.transport, 200)
+
+ headers = [r for r, _ in msg._default_headers()]
+ self.assertIn('SERVER', headers)
+
+ def test_default_headers_useragent(self):
+ msg = protocol.Request(self.transport, 'GET', '/')
+
+ headers = [r for r, _ in msg._default_headers()]
+ self.assertNotIn('SERVER', headers)
+ self.assertIn('USER-AGENT', headers)
+
+ def test_default_headers_chunked(self):
+ msg = protocol.Response(self.transport, 200)
+
+ headers = [r for r, _ in msg._default_headers()]
+ self.assertNotIn('TRANSFER-ENCODING', headers)
+
+ msg.force_chunked()
+
+ headers = [r for r, _ in msg._default_headers()]
+ self.assertIn('TRANSFER-ENCODING', headers)
+
+ def test_default_headers_connection_upgrade(self):
+ msg = protocol.Response(self.transport, 200)
+ msg.upgrade = True
+
+ headers = [r for r in msg._default_headers() if r[0] == 'CONNECTION']
+ self.assertEqual([('CONNECTION', 'upgrade')], headers)
+
+ def test_default_headers_connection_close(self):
+ msg = protocol.Response(self.transport, 200)
+ msg.force_close()
+
+ headers = [r for r in msg._default_headers() if r[0] == 'CONNECTION']
+ self.assertEqual([('CONNECTION', 'close')], headers)
+
+ def test_default_headers_connection_keep_alive(self):
+ msg = protocol.Response(self.transport, 200)
+ msg.keepalive = True
+
+ headers = [r for r in msg._default_headers() if r[0] == 'CONNECTION']
+ self.assertEqual([('CONNECTION', 'keep-alive')], headers)
+
+ def test_send_headers(self):
+ write = self.transport.write = unittest.mock.Mock()
+
+ msg = protocol.Response(self.transport, 200)
+ msg.add_headers(('content-type', 'plain/html'))
+ self.assertFalse(msg.is_headers_sent())
+
+ msg.send_headers()
+
+ content = b''.join([arg[1][0] for arg in list(write.mock_calls)])
+
+ self.assertTrue(content.startswith(b'HTTP/1.1 200 OK\r\n'))
+ self.assertIn(b'CONTENT-TYPE: plain/html', content)
+ self.assertTrue(msg.headers_sent)
+ self.assertTrue(msg.is_headers_sent())
+
+ def test_send_headers_nomore_add(self):
+ msg = protocol.Response(self.transport, 200)
+ msg.add_headers(('content-type', 'plain/html'))
+ msg.send_headers()
+
+ self.assertRaises(AssertionError,
+ msg.add_header, 'content-type', 'plain/html')
+
+ def test_prepare_length(self):
+ msg = protocol.Response(self.transport, 200)
+ length = msg._write_length_payload = unittest.mock.Mock()
+ length.return_value = iter([1, 2, 3])
+
+ msg.add_headers(('content-length', '200'))
+ msg.send_headers()
+
+ self.assertTrue(length.called)
+ self.assertTrue((200,), length.call_args[0])
+
+ def test_prepare_chunked_force(self):
+ msg = protocol.Response(self.transport, 200)
+ msg.force_chunked()
+
+ chunked = msg._write_chunked_payload = unittest.mock.Mock()
+ chunked.return_value = iter([1, 2, 3])
+
+ msg.add_headers(('content-length', '200'))
+ msg.send_headers()
+ self.assertTrue(chunked.called)
+
+ def test_prepare_chunked_no_length(self):
+ msg = protocol.Response(self.transport, 200)
+
+ chunked = msg._write_chunked_payload = unittest.mock.Mock()
+ chunked.return_value = iter([1, 2, 3])
+
+ msg.send_headers()
+ self.assertTrue(chunked.called)
+
+ def test_prepare_eof(self):
+ msg = protocol.Response(self.transport, 200, http_version=(1, 0))
+
+ eof = msg._write_eof_payload = unittest.mock.Mock()
+ eof.return_value = iter([1, 2, 3])
+
+ msg.send_headers()
+ self.assertTrue(eof.called)
+
+ def test_write_auto_send_headers(self):
+ msg = protocol.Response(self.transport, 200, http_version=(1, 0))
+ msg._send_headers = True
+
+ msg.write(b'data1')
+ self.assertTrue(msg.headers_sent)
+
+ def test_write_payload_eof(self):
+ write = self.transport.write = unittest.mock.Mock()
+ msg = protocol.Response(self.transport, 200, http_version=(1, 0))
+ msg.send_headers()
+
+ msg.write(b'data1')
+ self.assertTrue(msg.headers_sent)
+
+ msg.write(b'data2')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertEqual(
+ b'data1data2', content.split(b'\r\n\r\n', 1)[-1])
+
+ def test_write_payload_chunked(self):
+ write = self.transport.write = unittest.mock.Mock()
+
+ msg = protocol.Response(self.transport, 200)
+ msg.force_chunked()
+ msg.send_headers()
+
+ msg.write(b'data')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertEqual(
+ b'4\r\ndata\r\n0\r\n\r\n',
+ content.split(b'\r\n\r\n', 1)[-1])
+
+ def test_write_payload_chunked_multiple(self):
+ write = self.transport.write = unittest.mock.Mock()
+
+ msg = protocol.Response(self.transport, 200)
+ msg.force_chunked()
+ msg.send_headers()
+
+ msg.write(b'data1')
+ msg.write(b'data2')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertEqual(
+ b'5\r\ndata1\r\n5\r\ndata2\r\n0\r\n\r\n',
+ content.split(b'\r\n\r\n', 1)[-1])
+
+ def test_write_payload_length(self):
+ write = self.transport.write = unittest.mock.Mock()
+
+ msg = protocol.Response(self.transport, 200)
+ msg.add_headers(('content-length', '2'))
+ msg.send_headers()
+
+ msg.write(b'd')
+ msg.write(b'ata')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertEqual(
+ b'da', content.split(b'\r\n\r\n', 1)[-1])
+
+ def test_write_payload_chunked_filter(self):
+ write = self.transport.write = unittest.mock.Mock()
+
+ msg = protocol.Response(self.transport, 200)
+ msg.send_headers()
+
+ msg.add_chunking_filter(2)
+ msg.write(b'data')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertTrue(content.endswith(b'2\r\nda\r\n2\r\nta\r\n0\r\n\r\n'))
+
+ def test_write_payload_chunked_filter_mutiple_chunks(self):
+ write = self.transport.write = unittest.mock.Mock()
+ msg = protocol.Response(self.transport, 200)
+ msg.send_headers()
+
+ msg.add_chunking_filter(2)
+ msg.write(b'data1')
+ msg.write(b'data2')
+ msg.write_eof()
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertTrue(content.endswith(
+ b'2\r\nda\r\n2\r\nta\r\n2\r\n1d\r\n2\r\nat\r\n'
+ b'2\r\na2\r\n0\r\n\r\n'))
+
+ def test_write_payload_chunked_large_chunk(self):
+ write = self.transport.write = unittest.mock.Mock()
+ msg = protocol.Response(self.transport, 200)
+ msg.send_headers()
+
+ msg.add_chunking_filter(1024)
+ msg.write(b'data')
+ msg.write_eof()
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertTrue(content.endswith(b'4\r\ndata\r\n0\r\n\r\n'))
+
+ _comp = zlib.compressobj(wbits=-zlib.MAX_WBITS)
+ _COMPRESSED = b''.join([_comp.compress(b'data'), _comp.flush()])
+
+ def test_write_payload_deflate_filter(self):
+ write = self.transport.write = unittest.mock.Mock()
+ msg = protocol.Response(self.transport, 200)
+ msg.add_headers(('content-length', '%s' % len(self._COMPRESSED)))
+ msg.send_headers()
+
+ msg.add_compression_filter('deflate')
+ msg.write(b'data')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertEqual(
+ self._COMPRESSED, content.split(b'\r\n\r\n', 1)[-1])
+
+ def test_write_payload_deflate_and_chunked(self):
+ write = self.transport.write = unittest.mock.Mock()
+ msg = protocol.Response(self.transport, 200)
+ msg.send_headers()
+
+ msg.add_compression_filter('deflate')
+ msg.add_chunking_filter(2)
+
+ msg.write(b'data')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
self.assertEqual(
- [(b'4\r\n',), (b'data',), (b'\r\n',)],
- [c[0] for c in self.transport.write.call_args_list])
+ b'2\r\nKI\r\n2\r\n,I\r\n2\r\n\x04\x00\r\n0\r\n\r\n',
+ content.split(b'\r\n\r\n', 1)[-1])
+
+ def test_write_payload_chunked_and_deflate(self):
+ write = self.transport.write = unittest.mock.Mock()
+ msg = protocol.Response(self.transport, 200)
+ msg.add_headers(('content-length', '%s' % len(self._COMPRESSED)))
+
+ msg.add_chunking_filter(2)
+ msg.add_compression_filter('deflate')
+ msg.send_headers()
- def test_write_eof(self):
- self.writer.write_chunked_eof()
- self.assertEqual((b'0\r\n\r\n',), self.transport.write.call_args[0])
+ msg.write(b'data')
+ msg.write_eof()
+
+ content = b''.join([c[1][0] for c in list(write.mock_calls)])
+ self.assertEqual(
+ self._COMPRESSED, content.split(b'\r\n\r\n', 1)[-1])
diff --git a/tulip/http/client.py b/tulip/http/client.py
index b4db5cc..b65b90a 100644
--- a/tulip/http/client.py
+++ b/tulip/http/client.py
@@ -10,15 +10,6 @@ Most basic usage:
headers['status'] == '200 Ok' # or some such
assert isinstance(response, bytes)
-However you can also open a stream:
-
- f, wstream = http_client.open_stream(url, method, headers)
- wstream.write(b'abc')
- wstream.writelines([b'def', b'ghi'])
- wstream.write_eof()
- sts, headers, rstream = yield from f
- response = yield from rstream.read()
-
TODO: Reuse email.Message class (or its subclass, http.client.HTTPMessage).
TODO: How do we do connection keep alive? Pooling?
"""
@@ -45,7 +36,7 @@ class HttpClientProtocol:
def __init__(self, host, port=None, *,
path='/', method='GET', headers=None, ssl=None,
- make_body=None, encoding='utf-8', version='1.1',
+ make_body=None, encoding='utf-8', version=(1, 1),
chunked=False):
host = self.validate(host, 'host')
if ':' in host:
@@ -70,7 +61,7 @@ class HttpClientProtocol:
self.validate(value, 'header value', True)
self.headers[key] = value
self.encoding = self.validate(encoding, 'encoding')
- self.version = self.validate(version, 'version')
+ self.version = version
self.make_body = make_body
self.chunked = chunked
self.ssl = ssl
@@ -127,22 +118,22 @@ class HttpClientProtocol:
def connection_made(self, transport):
self.transport = transport
self.stream = protocol.HttpStreamReader()
- self.wstream = protocol.HttpStreamWriter(transport)
-
- line = '{} {} HTTP/{}\r\n'.format(self.method,
- self.path,
- self.version)
- self.wstream.write_str(line)
- for key, value in self.headers.items():
- self.wstream.write_str('{}: {}\r\n'.format(key, value))
- self.wstream.write(b'\r\n')
+
+ self.request = protocol.Request(
+ transport, self.method, self.path, self.version)
+
+ self.request.add_headers(*self.headers.items())
+ self.request.send_headers()
+
if self.make_body is not None:
if self.chunked:
self.make_body(
- self.wstream.write_chunked, self.wstream.write_chunked_eof)
+ self.request.write, self.request.eof)
else:
self.make_body(
- self.wstream.write_str, self.wstream.write_eof)
+ self.request.write, self.request.eof)
+ else:
+ self.request.write_eof()
def data_received(self, data):
self.stream.feed_data(data)
diff --git a/tulip/http/protocol.py b/tulip/http/protocol.py
index 2ff7876..1773cab 100644
--- a/tulip/http/protocol.py
+++ b/tulip/http/protocol.py
@@ -1,12 +1,17 @@
"""Http related helper utils."""
-__all__ = ['HttpStreamReader', 'HttpStreamWriter',
- 'HttpMessage', 'RequestLine', 'ResponseStatus']
+__all__ = ['HttpStreamReader',
+ 'HttpMessage', 'Request', 'Response',
+ 'RawHttpMessage', 'RequestLine', 'ResponseStatus']
import collections
+import email.utils
import functools
import http.client
+import http.server
+import itertools
import re
+import sys
import zlib
import tulip
@@ -15,7 +20,7 @@ METHRE = re.compile('[A-Z0-9$-_.]+')
VERSRE = re.compile('HTTP/(\d+).(\d+)')
HDRRE = re.compile(b"[\x00-\x1F\x7F()<>@,;:\[\]={} \t\\\\\"]")
CONTINUATION = (b' ', b'\t')
-
+RESPONSES = http.server.BaseHTTPRequestHandler.responses
RequestLine = collections.namedtuple(
'RequestLine', ['method', 'uri', 'version'])
@@ -25,109 +30,8 @@ ResponseStatus = collections.namedtuple(
'ResponseStatus', ['version', 'code', 'reason'])
-HttpMessage = collections.namedtuple(
- 'HttpMessage', ['headers', 'payload', 'should_close', 'compression'])
-
-
-class StreamEofException(http.client.HTTPException):
- """eof received"""
-
-
-def wrap_payload_reader(f):
- """wrap_payload_reader wraps payload readers and redirect stream.
- payload readers are generator functions, read_chunked_payload,
- read_length_payload, read_eof_payload.
- payload reader allows to modify data stream and feed data into stream.
-
- StreamReader instance should be send to generator as first parameter.
- This steam is used as destination stream for processed data.
- To send data to reader use generator's send() method.
-
- To indicate eof stream, throw StreamEofException exception into the reader.
- In case of errors in incoming stream reader sets exception to
- destination stream with StreamReader.set_exception() method.
-
- Before exit, reader generator returns unprocessed data.
- """
-
- @functools.wraps(f)
- def wrapper(self, *args, **kw):
- assert self._reader is None
-
- rstream = stream = tulip.StreamReader()
-
- encoding = kw.pop('encoding', None)
- if encoding is not None:
- if encoding not in ('gzip', 'deflate'):
- raise ValueError(
- 'Content-Encoding %r is not supported' % encoding)
-
- stream = DeflateStream(stream, encoding)
-
- reader = f(self, *args, **kw)
- next(reader)
- try:
- reader.send(stream)
- except StopIteration:
- pass
- else:
- # feed buffer
- self.line_count = 0
- self.byte_count = 0
- while self.buffer:
- try:
- reader.send(self.buffer.popleft())
- except StopIteration as exc:
- buf = b''.join(self.buffer)
- self.buffer.clear()
- reader = None
- if exc.value:
- self.feed_data(exc.value)
-
- if buf:
- self.feed_data(buf)
-
- break
-
- if reader is not None:
- if self.eof:
- try:
- reader.throw(StreamEofException())
- except StopIteration as exc:
- pass
- else:
- self._reader = reader
-
- return rstream
-
- return wrapper
-
-
-class DeflateStream:
- """DeflateStream decomress stream and feed data into specified steram."""
-
- def __init__(self, stream, encoding):
- self.stream = stream
- zlib_mode = (16 + zlib.MAX_WBITS
- if encoding == 'gzip' else -zlib.MAX_WBITS)
-
- self.zlib = zlib.decompressobj(wbits=zlib_mode)
-
- def feed_data(self, chunk):
- try:
- chunk = self.zlib.decompress(chunk)
- except:
- self.stream.set_exception(http.client.IncompleteRead(b''))
-
- if chunk:
- self.stream.feed_data(chunk)
-
- def feed_eof(self):
- self.stream.feed_data(self.zlib.flush())
- if not self.zlib.eof:
- self.stream.set_exception(http.client.IncompleteRead(b''))
-
- self.stream.feed_eof()
+RawHttpMessage = collections.namedtuple(
+ 'RawHttpMessage', ['headers', 'payload', 'should_close', 'compression'])
class HttpStreamReader(tulip.StreamReader):
@@ -135,32 +39,32 @@ class HttpStreamReader(tulip.StreamReader):
MAX_HEADERS = 32768
MAX_HEADERFIELD_SIZE = 8190
- # if _reader is set, feed_data and feed_eof sends data into
- # _reader instead of self. is it being used as stream redirection for
- # read_chunked_payload, read_length_payload and read_eof_payload
- _reader = None
+ # if _parser is set, feed_data and feed_eof sends data into
+ # _parser instead of self. is it being used as stream redirection for
+ # _parse_chunked_payload, _parse_length_payload and _parse_eof_payload
+ _parser = None
def feed_data(self, data):
- """_reader is a generator, if _reader is set, feed_data sends
- incoming data into this generator untile generates stops."""
- if self._reader:
+ """_parser is a generator, if _parser is set, feed_data sends
+ incoming data into the generator untile generator stops."""
+ if self._parser:
try:
- self._reader.send(data)
+ self._parser.send(data)
except StopIteration as exc:
- self._reader = None
+ self._parser = None
if exc.value:
self.feed_data(exc.value)
else:
super().feed_data(data)
def feed_eof(self):
- """_reader is a generator, if _reader is set feed_eof throws
+ """_parser is a generator, if _parser is set feed_eof throws
StreamEofException into this generator."""
- if self._reader:
+ if self._parser:
try:
- self._reader.throw(StreamEofException())
+ self._parser.throw(StreamEofException())
except StopIteration:
- self._reader = None
+ self._parser = None
super().feed_eof()
@@ -314,9 +218,8 @@ class HttpStreamReader(tulip.StreamReader):
return headers
- @wrap_payload_reader
- def read_chunked_payload(self):
- """Read chunked stream."""
+ def _parse_chunked_payload(self):
+ """Chunked transfer encoding parser."""
stream = yield
try:
@@ -331,7 +234,7 @@ class HttpStreamReader(tulip.StreamReader):
line, data = data.split(b'\n', 1)
# Read the next chunk size from the file
- i = line.find(b";")
+ i = line.find(b';')
if i >= 0:
line = line[:i] # strip chunk-extensions
try:
@@ -375,8 +278,7 @@ class HttpStreamReader(tulip.StreamReader):
except http.client.IncompleteRead as exc:
stream.set_exception(exc)
- @wrap_payload_reader
- def read_length_payload(self, length):
+ def _parse_length_payload(self, length):
"""Read specified amount of bytes."""
stream = yield
@@ -400,8 +302,7 @@ class HttpStreamReader(tulip.StreamReader):
except StreamEofException:
stream.set_exception(http.client.IncompleteRead(b''))
- @wrap_payload_reader
- def read_eof_payload(self):
+ def _parse_eof_payload(self):
"""Read all bytes untile eof."""
stream = yield
@@ -437,11 +338,11 @@ class HttpStreamReader(tulip.StreamReader):
chunked = value.lower() == 'chunked'
elif name == 'SEC-WEBSOCKET-KEY1':
length = 8
- elif name == "CONNECTION":
+ elif name == 'CONNECTION':
v = value.lower()
- if v == "close":
+ if v == 'close':
close_conn = True
- elif v == "keep-alive":
+ elif v == 'keep-alive':
close_conn = False
elif compression and name == 'CONTENT-ENCODING':
enc = value.lower()
@@ -451,9 +352,9 @@ class HttpStreamReader(tulip.StreamReader):
if close_conn is None:
close_conn = version <= (1, 0)
- # payload stream
+ # payload parser
if chunked:
- payload = self.read_chunked_payload(encoding=encoding)
+ parser = self._parse_chunked_payload()
elif length is not None:
try:
@@ -464,45 +365,513 @@ class HttpStreamReader(tulip.StreamReader):
if length < 0:
raise http.client.HTTPException('CONTENT-LENGTH')
- payload = self.read_length_payload(length, encoding=encoding)
+ parser = self._parse_length_payload(length)
else:
if readall:
- payload = self.read_eof_payload(encoding=encoding)
+ parser = self._parse_eof_payload()
+ else:
+ parser = self._parse_length_payload(0)
+
+ next(parser)
+
+ payload = stream = tulip.StreamReader()
+
+ # payload decompression wrapper
+ if encoding is not None:
+ stream = DeflateStream(stream, encoding)
+
+ try:
+ # initialize payload parser with stream, stream is being
+ # used by parser as destination stream
+ parser.send(stream)
+ except StopIteration:
+ pass
+ else:
+ # feed existing buffer to payload parser
+ self.byte_count = 0
+ while self.buffer:
+ try:
+ parser.send(self.buffer.popleft())
+ except StopIteration as exc:
+ parser = None
+
+ # parser is done
+ buf = b''.join(self.buffer)
+ self.buffer.clear()
+
+ # re-add remaining data back to buffer
+ if exc.value:
+ self.feed_data(exc.value)
+
+ if buf:
+ self.feed_data(buf)
+
+ break
+
+ # parser still require more data
+ if parser is not None:
+ if self.eof:
+ try:
+ parser.throw(StreamEofException())
+ except StopIteration as exc:
+ pass
+ else:
+ self._parser = parser
+
+ return RawHttpMessage(headers, payload, close_conn, encoding)
+
+
+class StreamEofException(http.client.HTTPException):
+ """eof received"""
+
+
+class DeflateStream:
+ """DeflateStream decomress stream and feed data into specified stream."""
+
+ def __init__(self, stream, encoding):
+ self.stream = stream
+ zlib_mode = (16 + zlib.MAX_WBITS
+ if encoding == 'gzip' else -zlib.MAX_WBITS)
+
+ self.zlib = zlib.decompressobj(wbits=zlib_mode)
+
+ def set_exception(self, exc):
+ self.stream.set_exception(exc)
+
+ def feed_data(self, chunk):
+ try:
+ chunk = self.zlib.decompress(chunk)
+ except:
+ self.stream.set_exception(http.client.IncompleteRead(b''))
+
+ if chunk:
+ self.stream.feed_data(chunk)
+
+ def feed_eof(self):
+ self.stream.feed_data(self.zlib.flush())
+ if not self.zlib.eof:
+ self.stream.set_exception(http.client.IncompleteRead(b''))
+
+ self.stream.feed_eof()
+
+
+EOF_MARKER = object()
+EOL_MARKER = object()
+
+
+def wrap_payload_filter(func):
+ """Wraps payload filter and piped filters.
+
+ Filter is a generatator that accepts arbitrary chunks of data,
+ modify data and emit new stream of data.
+
+ For example we have stream of chunks: ['1', '2', '3', '4', '5'],
+ we can apply chunking filter to this stream:
+
+ ['1', '2', '3', '4', '5']
+ |
+ response.add_chunking_filter(2)
+ |
+ ['12', '34', '5']
+
+ It is possible to use different filters at the same time.
+
+ For a example to compress incoming stream with 'deflate' encoding
+ and then split data and emit chunks of 8196 bytes size chunks:
+
+ >> response.add_compression_filter('deflate')
+ >> response.add_chunking_filter(8196)
+
+ Filters do not alter transfer encoding.
+
+ Filter can receive types types of data, bytes object or EOF_MARKER.
+
+ 1. If filter receives bytes object, it should process data
+ and yield processed data then yield EOL_MARKER object.
+ 2. If Filter recevied EOF_MARKER, it should yield remaining
+ data (buffered) and then yield EOF_MARKER.
+ """
+ @functools.wraps(func)
+ def wrapper(self, *args, **kw):
+ new_filter = func(self, *args, **kw)
+
+ filter = self.filter
+ if filter is not None:
+ next(new_filter)
+ self.filter = filter_pipe(filter, new_filter)
+ else:
+ self.filter = new_filter
+
+ next(self.filter)
+
+ return wrapper
+
+
+def filter_pipe(filter, filter2):
+ """Creates pipe between two filters.
+
+ filter_pipe() feeds first filter with incoming data and then
+ send yielded from first filter data into filter2, results of
+ filter2 are being emitted.
+
+ 1. If filter_pipe receives bytes object, it sends it to the first filter.
+ 2. Reads yielded values from the first filter until it receives
+ EOF_MARKER or EOL_MARKER.
+ 3. Each of this values is being send to second filter.
+ 4. Reads yielded values from second filter until it recives EOF_MARKER or
+ EOL_MARKER. Each of this values yields to writer.
+ """
+ chunk = yield
+
+ while True:
+ eof = chunk is EOF_MARKER
+ chunk = filter.send(chunk)
+
+ while chunk is not EOL_MARKER:
+ chunk = filter2.send(chunk)
+
+ while chunk not in (EOF_MARKER, EOL_MARKER):
+ yield chunk
+ chunk = next(filter2)
+
+ if chunk is not EOF_MARKER:
+ if eof:
+ chunk = EOF_MARKER
+ else:
+ chunk = next(filter)
else:
- payload = self.read_length_payload(0, encoding=encoding)
+ break
+
+ chunk = yield EOL_MARKER
+
+
+class HttpMessage:
+ """HttpMessage allows to write headers and payload to a stream.
+
+ For example, lets say we want to read file then compress it with deflate
+ compression and then send it with chunked transfer encoding, code may look
+ like this:
+
+ >> response = tulip.http.Response(transport, 200)
+
+ We have to use deflate compression first:
+
+ >> response.add_compression_filter('deflate')
+
+ Then we want to split output stream into chunks of 1024 bytes size:
+
+ >> response.add_chunking_filter(1024)
+
+ We can add headers to response with add_headers() method. add_headers()
+ does not send data to transport, send_headers() sends request/response
+ line and then sends headers:
+
+ >> response.add_headers(
+ .. ('Content-Disposition', 'attachment; filename="..."'))
+ >> response.send_headers()
+
+ Now we can use chunked writer to write stream to a network stream.
+ First call to write() method sends response status line and headers,
+ add_header() and add_headers() method unavailble at this stage:
- return HttpMessage(headers, payload, close_conn, encoding)
+ >> with open('...', 'rb') as f:
+ .. chunk = fp.read(8196)
+ .. while chunk:
+ .. response.write(chunk)
+ .. chunk = fp.read(8196)
+ >> response.write_eof()
+ """
+
+ writer = None
+
+ # 'filter' is being used for altering write() bahaviour,
+ # add_chunking_filter adds deflate/gzip compression and
+ # add_compression_filter splits incoming data into a chunks.
+ filter = None
+
+ HOP_HEADERS = None # Must be set by subclass.
-class HttpStreamWriter:
+ SERVER_SOFTWARE = 'Python/{0[0]}.{0[1]} tulip/0.0'.format(sys.version_info)
- def __init__(self, transport, encoding='utf-8'):
+ status = None
+ status_line = b''
+
+ # subclass can enable auto sending headers with write() call,
+ # this is useful for wsgi's start_response implementation.
+ _send_headers = False
+
+ def __init__(self, transport, version, close):
self.transport = transport
- self.encoding = encoding
-
- def encode(self, s):
- if isinstance(s, bytes):
- return s
- return s.encode(self.encoding)
-
- def decode(self, s):
- if isinstance(s, str):
- return s
- return s.decode(self.encoding)
-
- def write(self, b):
- self.transport.write(b)
-
- def write_str(self, s):
- self.transport.write(self.encode(s))
-
- def write_chunked(self, chunk):
- if not chunk:
- return
- data = self.encode(chunk)
- self.write_str('{:x}\r\n'.format(len(data)))
- self.transport.write(data)
- self.transport.write(b'\r\n')
-
- def write_chunked_eof(self):
- self.transport.write(b'0\r\n\r\n')
+ self.version = version
+ self.closing = close
+ self.keepalive = False
+
+ self.chunked = False
+ self.length = None
+ self.upgrade = False
+ self.headers = []
+ self.headers_sent = False
+
+ def force_close(self):
+ self.closing = True
+
+ def force_chunked(self):
+ self.chunked = True
+
+ def keep_alive(self):
+ return self.keepalive and not self.closing
+
+ def is_headers_sent(self):
+ return self.headers_sent
+
+ def add_header(self, name, value):
+ """Analyze headers. Calculate content length,
+ removes hop headers, etc."""
+ assert not self.headers_sent, 'headers have been sent already'
+ assert isinstance(name, str), '%r is not a string' % name
+
+ name = name.strip().upper()
+
+ if name == 'CONTENT-LENGTH':
+ self.length = int(value)
+
+ if name == 'CONNECTION':
+ val = value.lower().strip()
+ # handle websocket
+ if val == 'upgrade':
+ self.upgrade = True
+ # connection keep-alive
+ elif val == 'close':
+ self.keepalive = False
+ elif val == 'keep-alive':
+ self.keepalive = True
+
+ elif name == 'UPGRADE':
+ if 'websocket' in value.lower():
+ self.headers.append((name, value))
+
+ elif name == 'TRANSFER-ENCODING' and not self.chunked:
+ self.chunked = value.lower().strip() == 'chunked'
+
+ elif name not in self.HOP_HEADERS:
+ # ignore hopbyhop headers
+ self.headers.append((name, value))
+
+ def add_headers(self, *headers):
+ """Adds headers to a http message."""
+ for name, value in headers:
+ self.add_header(name, value)
+
+ def send_headers(self):
+ """Writes headers to a stream. Constructs payload writer."""
+ # Chunked response is only for HTTP/1.1 clients or newer
+ # and there is no Content-Length header is set.
+ # Do not use chunked responses when the response is guaranteed to
+ # not have a response body (304, 204).
+ assert not self.headers_sent, 'headers have been sent already'
+ self.headers_sent = True
+
+ if (self.chunked is True) or (
+ self.length is None and
+ self.version >= (1, 1) and
+ self.status not in (304, 204)):
+ self.chunked = True
+ self.writer = self._write_chunked_payload()
+
+ elif self.length is not None:
+ self.writer = self._write_length_payload(self.length)
+
+ else:
+ self.writer = self._write_eof_payload()
+
+ next(self.writer)
+
+ # status line
+ self.transport.write(self.status_line.encode('ascii'))
+
+ # send headers
+ self.transport.write(
+ ('%s\r\n\r\n' % '\r\n'.join(
+ ('%s: %s' % (k, v) for k, v in
+ itertools.chain(self._default_headers(), self.headers)))
+ ).encode('ascii'))
+
+ def _default_headers(self):
+ # set the connection header
+ if self.upgrade:
+ connection = 'upgrade'
+ elif self.keep_alive():
+ connection = 'keep-alive'
+ else:
+ connection = 'close'
+
+ headers = [('CONNECTION', connection)]
+
+ if self.chunked:
+ headers.append(('TRANSFER-ENCODING', 'chunked'))
+
+ return headers
+
+ def write(self, chunk):
+ """write() writes chunk of data to a steram by using different writers.
+ writer uses filter to modify chunk of data. write_eof() indicates
+ end of stream. writer can't be used after write_eof() method
+ being called."""
+ assert (isinstance(chunk, (bytes, bytearray)) or
+ chunk is EOF_MARKER), chunk
+
+ if self._send_headers and not self.headers_sent:
+ self.send_headers()
+
+ assert self.writer is not None, 'send_headers() is not called.'
+
+ if self.filter:
+ chunk = self.filter.send(chunk)
+ while chunk not in (EOF_MARKER, EOL_MARKER):
+ self.writer.send(chunk)
+ chunk = next(self.filter)
+ else:
+ if chunk is not EOF_MARKER:
+ self.writer.send(chunk)
+
+ def write_eof(self):
+ self.write(EOF_MARKER)
+ try:
+ self.writer.throw(StreamEofException())
+ except StopIteration:
+ pass
+
+ def _write_chunked_payload(self):
+ """Write data in chunked transfer encoding."""
+ while True:
+ try:
+ chunk = yield
+ except StreamEofException:
+ self.transport.write(b'0\r\n\r\n')
+ break
+
+ self.transport.write('{:x}\r\n'.format(len(chunk)).encode('ascii'))
+ self.transport.write(chunk)
+ self.transport.write(b'\r\n')
+
+ def _write_length_payload(self, length):
+ """Write specified number of bytes to a stream."""
+ while True:
+ try:
+ chunk = yield
+ except StreamEofException:
+ break
+
+ if length:
+ l = len(chunk)
+ if length >= l:
+ self.transport.write(chunk)
+ else:
+ self.transport.write(chunk[:length])
+
+ length = max(0, length-l)
+
+ def _write_eof_payload(self):
+ while True:
+ try:
+ chunk = yield
+ except StreamEofException:
+ break
+
+ self.transport.write(chunk)
+
+ @wrap_payload_filter
+ def add_chunking_filter(self, chunk_size=16*1024):
+ """Split incoming stream into chunks."""
+ buf = bytearray()
+ chunk = yield
+
+ while True:
+ if chunk is EOF_MARKER:
+ if buf:
+ yield buf
+
+ yield EOF_MARKER
+
+ else:
+ buf.extend(chunk)
+
+ while len(buf) >= chunk_size:
+ chunk, buf = buf[:chunk_size], buf[chunk_size:]
+ yield chunk
+
+ chunk = yield EOL_MARKER
+
+ @wrap_payload_filter
+ def add_compression_filter(self, encoding='deflate'):
+ """Compress incoming stream with deflate or gzip encoding."""
+ zlib_mode = (16 + zlib.MAX_WBITS
+ if encoding == 'gzip' else -zlib.MAX_WBITS)
+ zcomp = zlib.compressobj(wbits=zlib_mode)
+
+ chunk = yield
+ while True:
+ if chunk is EOF_MARKER:
+ yield zcomp.flush()
+ chunk = yield EOF_MARKER
+
+ else:
+ yield zcomp.compress(chunk)
+ chunk = yield EOL_MARKER
+
+
+class Response(HttpMessage):
+ """Create http response message.
+
+ Transport is a socket stream transport. status is a response status code,
+ status has to be integer value. http_version is a tuple that represents
+ http version, (1, 0) stands for HTTP/1.0 and (1, 1) is for HTTP/1.1
+ """
+
+ HOP_HEADERS = {
+ 'CONNECTION',
+ 'KEEP-ALIVE',
+ 'PROXY-AUTHENTICATE',
+ 'PROXY-AUTHORIZATION',
+ 'TE',
+ 'TRAILERS',
+ 'TRANSFER-ENCODING',
+ 'UPGRADE',
+ 'SERVER',
+ 'DATE',
+ }
+
+ def __init__(self, transport, status, http_version=(1, 1), close=False):
+ super().__init__(transport, http_version, close)
+
+ self.status = status
+ self.status_line = 'HTTP/{0[0]}.{0[1]} {1} {2}\r\n'.format(
+ http_version, status, RESPONSES[status][0])
+
+ def _default_headers(self):
+ headers = super()._default_headers()
+ headers.extend((('DATE', email.utils.formatdate()),
+ ('SERVER', self.SERVER_SOFTWARE)))
+
+ return headers
+
+
+class Request(HttpMessage):
+
+ HOP_HEADERS = ()
+
+ def __init__(self, transport, method, uri,
+ http_version=(1, 1), close=False):
+ super().__init__(transport, http_version, close)
+
+ self.method = method
+ self.uri = uri
+ self.status_line = '{0} {1} HTTP/{2[0]}.{2[1]}\r\n'.format(
+ method, uri, http_version)
+
+ def _default_headers(self):
+ headers = super()._default_headers()
+ headers.append(('USER-AGENT', self.SERVER_SOFTWARE))
+
+ return headers
diff --git a/tulip/selector_events.py b/tulip/selector_events.py
index cc7fe33..9bc9c23 100644
--- a/tulip/selector_events.py
+++ b/tulip/selector_events.py
@@ -352,7 +352,7 @@ class _SelectorSocketTransport(transports.Transport):
self._event_loop.call_soon(self._protocol.eof_received)
def write(self, data):
- assert isinstance(data, bytes)
+ assert isinstance(data, (bytes, bytearray)), repr(data)
assert not self._closing
if not data:
return