summaryrefslogtreecommitdiff
path: root/examples/crawl.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/crawl.py')
-rw-r--r--examples/crawl.py109
1 files changed, 61 insertions, 48 deletions
diff --git a/examples/crawl.py b/examples/crawl.py
index 4bb0b4e..7f54059 100644
--- a/examples/crawl.py
+++ b/examples/crawl.py
@@ -1,7 +1,9 @@
-#!/usr/bin/env python3.4
+#!/usr/bin/env python
"""A simple web crawler."""
+from __future__ import print_function
+
# TODO:
# - More organized logging (with task ID or URL?).
# - Use logging module for Logger.
@@ -15,15 +17,23 @@
# - Handle out of file descriptors directly? (How?)
import argparse
-import asyncio
+import trollius as asyncio
+from trollius import From, Return
import asyncio.locks
import cgi
-from http.client import BadStatusLine
import logging
import re
import sys
import time
-import urllib.parse
+try:
+ from httplib import BadStatusLine
+ import urlparse
+ from urllib import splitport as urllib_splitport
+except ImportError:
+ # Python 3
+ from http.client import BadStatusLine
+ from urllib import parse as urlparse
+ from urllib.parse import splitport as urllib_splitport
ARGS = argparse.ArgumentParser(description="Web crawler")
@@ -96,7 +106,8 @@ class Logger:
def _log(self, n, args):
if self.level >= n:
- print(*args, file=sys.stderr, flush=True)
+ print(*args, file=sys.stderr)
+ sys.stderr.flush()
def log(self, n, *args):
self._log(n, args)
@@ -133,14 +144,14 @@ class ConnectionPool:
for conn in conns:
conn.close()
self.connections.clear()
- self.queue.clear()
+ del self.queue[:]
@asyncio.coroutine
def get_connection(self, host, port, ssl):
"""Create or reuse a connection."""
port = port or (443 if ssl else 80)
try:
- ipaddrs = yield from self.loop.getaddrinfo(host, port)
+ ipaddrs = yield From(self.loop.getaddrinfo(host, port))
except Exception as exc:
self.log(0, 'Exception %r for (%r, %r)' % (exc, host, port))
raise
@@ -148,7 +159,8 @@ class ConnectionPool:
(host, ', '.join(ip[4][0] for ip in ipaddrs)))
# Look for a reusable connection.
- for _, _, _, _, (h, p, *_) in ipaddrs:
+ for _, _, _, _, addr in ipaddrs:
+ h, p = addr[:2]
key = h, p, ssl
conn = None
conns = self.connections.get(key)
@@ -163,13 +175,13 @@ class ConnectionPool:
else:
self.log(1, '* Reusing pooled connection', key,
'FD =', conn.fileno())
- return conn
+ raise Return(conn)
# Create a new connection.
conn = Connection(self.log, self, host, port, ssl)
- yield from conn.connect()
+ yield From(conn.connect())
self.log(1, '* New connection', conn.key, 'FD =', conn.fileno())
- return conn
+ raise Return(conn)
def recycle_connection(self, conn):
"""Make a connection available for reuse.
@@ -258,8 +270,8 @@ class Connection:
@asyncio.coroutine
def connect(self):
- self.reader, self.writer = yield from asyncio.open_connection(
- self.host, self.port, ssl=self.ssl)
+ self.reader, self.writer = yield From(asyncio.open_connection(
+ self.host, self.port, ssl=self.ssl))
peername = self.writer.get_extra_info('peername')
if peername:
self.host, self.port = peername[:2]
@@ -286,7 +298,7 @@ class Request:
self.log = log
self.url = url
self.pool = pool
- self.parts = urllib.parse.urlparse(self.url)
+ self.parts = urlparse.urlparse(self.url)
self.scheme = self.parts.scheme
assert self.scheme in ('http', 'https'), repr(url)
self.ssl = self.parts.scheme == 'https'
@@ -311,8 +323,8 @@ class Request:
(self.hostname, self.port,
'ssl' if self.ssl else 'tcp',
self.url))
- self.conn = yield from self.pool.get_connection(self.hostname,
- self.port, self.ssl)
+ self.conn = yield From(self.pool.get_connection(self.hostname,
+ self.port, self.ssl))
def close(self, recycle=False):
"""Close the connection, recycle if requested."""
@@ -336,7 +348,7 @@ class Request:
"""Send the request."""
request_line = '%s %s %s' % (self.method, self.full_path,
self.http_version)
- yield from self.putline(request_line)
+ yield From(self.putline(request_line))
# TODO: What if a header is already set?
self.headers.append(('User-Agent', 'asyncio-example-crawl/0.0'))
self.headers.append(('Host', self.netloc))
@@ -344,15 +356,15 @@ class Request:
##self.headers.append(('Accept-Encoding', 'gzip'))
for key, value in self.headers:
line = '%s: %s' % (key, value)
- yield from self.putline(line)
- yield from self.putline('')
+ yield From(self.putline(line))
+ yield From(self.putline(''))
@asyncio.coroutine
def get_response(self):
"""Receive the response."""
response = Response(self.log, self.conn.reader)
- yield from response.read_headers()
- return response
+ yield From(response.read_headers())
+ raise Return(response)
class Response:
@@ -374,14 +386,15 @@ class Response:
@asyncio.coroutine
def getline(self):
"""Read one line from the connection."""
- line = (yield from self.reader.readline()).decode('latin-1').rstrip()
+ line = (yield From(self.reader.readline()))
+ line = line.decode('latin-1').rstrip()
self.log(2, '<', line)
- return line
+ raise Return(line)
@asyncio.coroutine
def read_headers(self):
"""Read the response status and the request headers."""
- status_line = yield from self.getline()
+ status_line = yield From(self.getline())
status_parts = status_line.split(None, 2)
if len(status_parts) != 3:
self.log(0, 'bad status_line', repr(status_line))
@@ -389,7 +402,7 @@ class Response:
self.http_version, status, self.reason = status_parts
self.status = int(status)
while True:
- header_line = yield from self.getline()
+ header_line = yield From(self.getline())
if not header_line:
break
# TODO: Continuation lines.
@@ -426,7 +439,7 @@ class Response:
self.log(2, 'parsing chunked response')
blocks = []
while True:
- size_header = yield from self.reader.readline()
+ size_header = yield From(self.reader.readline())
if not size_header:
self.log(0, 'premature end of chunked response')
break
@@ -435,10 +448,10 @@ class Response:
size = int(parts[0], 16)
if size:
self.log(3, 'reading chunk of', size, 'bytes')
- block = yield from self.reader.readexactly(size)
+ block = yield From(self.reader.readexactly(size))
assert len(block) == size, (len(block), size)
blocks.append(block)
- crlf = yield from self.reader.readline()
+ crlf = yield From(self.reader.readline())
assert crlf == b'\r\n', repr(crlf)
if not size:
break
@@ -447,12 +460,12 @@ class Response:
'bytes in', len(blocks), 'blocks')
else:
self.log(3, 'reading until EOF')
- body = yield from self.reader.read()
+ body = yield From(self.reader.read())
# TODO: Should make sure not to recycle the connection
# in this case.
else:
- body = yield from self.reader.readexactly(nbytes)
- return body
+ body = yield From(self.reader.readexactly(nbytes))
+ raise Return(body)
class Fetcher:
@@ -504,10 +517,10 @@ class Fetcher:
self.request = None
try:
self.request = Request(self.log, self.url, self.crawler.pool)
- yield from self.request.connect()
- yield from self.request.send_request()
- self.response = yield from self.request.get_response()
- self.body = yield from self.response.read()
+ yield From(self.request.connect())
+ yield From(self.request.send_request())
+ self.response = yield From(self.request.get_response())
+ self.body = yield From(self.response.read())
h_conn = self.response.get_header('connection').lower()
if h_conn != 'close':
self.request.close(recycle=True)
@@ -531,7 +544,7 @@ class Fetcher:
return
next_url = self.response.get_redirect_url()
if next_url:
- self.next_url = urllib.parse.urljoin(self.url, next_url)
+ self.next_url = urlparse.urljoin(self.url, next_url)
if self.max_redirect > 0:
self.log(1, 'redirect to', self.next_url, 'from', self.url)
self.crawler.add_url(self.next_url, self.max_redirect-1)
@@ -556,8 +569,8 @@ class Fetcher:
self.new_urls = set()
for url in self.urls:
url = unescape(url)
- url = urllib.parse.urljoin(self.url, url)
- url, frag = urllib.parse.urldefrag(url)
+ url = urlparse.urljoin(self.url, url)
+ url, frag = urlparse.urldefrag(url)
if self.crawler.add_url(url):
self.new_urls.add(url)
@@ -657,8 +670,8 @@ class Crawler:
self.pool = ConnectionPool(self.log, max_pool, max_tasks)
self.root_domains = set()
for root in roots:
- parts = urllib.parse.urlparse(root)
- host, port = urllib.parse.splitport(parts.netloc)
+ parts = urlparse.urlparse(root)
+ host, port = urllib_splitport(parts.netloc)
if not host:
continue
if re.match(r'\A[\d\.]*\Z', host):
@@ -731,11 +744,11 @@ class Crawler:
"""Add a URL to the todo list if not seen before."""
if self.exclude and re.search(self.exclude, url):
return False
- parts = urllib.parse.urlparse(url)
+ parts = urlparse.urlparse(url)
if parts.scheme not in ('http', 'https'):
self.log(2, 'skipping non-http scheme in', url)
return False
- host, port = urllib.parse.splitport(parts.netloc)
+ host, port = urllib_splitport(parts.netloc)
if not self.host_okay(host):
self.log(2, 'skipping non-root host in', url)
return False
@@ -750,7 +763,7 @@ class Crawler:
@asyncio.coroutine
def crawl(self):
"""Run the crawler until all finished."""
- with (yield from self.termination):
+ with (yield From(self.termination)):
while self.todo or self.busy:
if self.todo:
url, max_redirect = self.todo.popitem()
@@ -762,7 +775,7 @@ class Crawler:
self.busy[url] = fetcher
fetcher.task = asyncio.Task(self.fetch(fetcher))
else:
- yield from self.termination.wait()
+ yield From(self.termination.wait())
self.t1 = time.time()
@asyncio.coroutine
@@ -772,13 +785,13 @@ class Crawler:
Once this returns, move the fetcher from busy to done.
"""
url = fetcher.url
- with (yield from self.governor):
+ with (yield From(self.governor)):
try:
- yield from fetcher.fetch() # Fetcher gonna fetch.
+ yield From(fetcher.fetch()) # Fetcher gonna fetch.
finally:
# Force GC of the task, so the error is logged.
fetcher.task = None
- with (yield from self.termination):
+ with (yield From(self.termination)):
self.done[url] = fetcher
del self.busy[url]
self.termination.notify()
@@ -828,7 +841,7 @@ def main():
log = Logger(args.level)
if args.iocp:
- from asyncio.windows_events import ProactorEventLoop
+ from trollius.windows_events import ProactorEventLoop
loop = ProactorEventLoop()
asyncio.set_event_loop(loop)
elif args.select: