diff options
Diffstat (limited to 'examples/crawl.py')
-rw-r--r-- | examples/crawl.py | 109 |
1 files changed, 61 insertions, 48 deletions
diff --git a/examples/crawl.py b/examples/crawl.py index 4bb0b4e..7f54059 100644 --- a/examples/crawl.py +++ b/examples/crawl.py @@ -1,7 +1,9 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python """A simple web crawler.""" +from __future__ import print_function + # TODO: # - More organized logging (with task ID or URL?). # - Use logging module for Logger. @@ -15,15 +17,23 @@ # - Handle out of file descriptors directly? (How?) import argparse -import asyncio +import trollius as asyncio +from trollius import From, Return import asyncio.locks import cgi -from http.client import BadStatusLine import logging import re import sys import time -import urllib.parse +try: + from httplib import BadStatusLine + import urlparse + from urllib import splitport as urllib_splitport +except ImportError: + # Python 3 + from http.client import BadStatusLine + from urllib import parse as urlparse + from urllib.parse import splitport as urllib_splitport ARGS = argparse.ArgumentParser(description="Web crawler") @@ -96,7 +106,8 @@ class Logger: def _log(self, n, args): if self.level >= n: - print(*args, file=sys.stderr, flush=True) + print(*args, file=sys.stderr) + sys.stderr.flush() def log(self, n, *args): self._log(n, args) @@ -133,14 +144,14 @@ class ConnectionPool: for conn in conns: conn.close() self.connections.clear() - self.queue.clear() + del self.queue[:] @asyncio.coroutine def get_connection(self, host, port, ssl): """Create or reuse a connection.""" port = port or (443 if ssl else 80) try: - ipaddrs = yield from self.loop.getaddrinfo(host, port) + ipaddrs = yield From(self.loop.getaddrinfo(host, port)) except Exception as exc: self.log(0, 'Exception %r for (%r, %r)' % (exc, host, port)) raise @@ -148,7 +159,8 @@ class ConnectionPool: (host, ', '.join(ip[4][0] for ip in ipaddrs))) # Look for a reusable connection. - for _, _, _, _, (h, p, *_) in ipaddrs: + for _, _, _, _, addr in ipaddrs: + h, p = addr[:2] key = h, p, ssl conn = None conns = self.connections.get(key) @@ -163,13 +175,13 @@ class ConnectionPool: else: self.log(1, '* Reusing pooled connection', key, 'FD =', conn.fileno()) - return conn + raise Return(conn) # Create a new connection. conn = Connection(self.log, self, host, port, ssl) - yield from conn.connect() + yield From(conn.connect()) self.log(1, '* New connection', conn.key, 'FD =', conn.fileno()) - return conn + raise Return(conn) def recycle_connection(self, conn): """Make a connection available for reuse. @@ -258,8 +270,8 @@ class Connection: @asyncio.coroutine def connect(self): - self.reader, self.writer = yield from asyncio.open_connection( - self.host, self.port, ssl=self.ssl) + self.reader, self.writer = yield From(asyncio.open_connection( + self.host, self.port, ssl=self.ssl)) peername = self.writer.get_extra_info('peername') if peername: self.host, self.port = peername[:2] @@ -286,7 +298,7 @@ class Request: self.log = log self.url = url self.pool = pool - self.parts = urllib.parse.urlparse(self.url) + self.parts = urlparse.urlparse(self.url) self.scheme = self.parts.scheme assert self.scheme in ('http', 'https'), repr(url) self.ssl = self.parts.scheme == 'https' @@ -311,8 +323,8 @@ class Request: (self.hostname, self.port, 'ssl' if self.ssl else 'tcp', self.url)) - self.conn = yield from self.pool.get_connection(self.hostname, - self.port, self.ssl) + self.conn = yield From(self.pool.get_connection(self.hostname, + self.port, self.ssl)) def close(self, recycle=False): """Close the connection, recycle if requested.""" @@ -336,7 +348,7 @@ class Request: """Send the request.""" request_line = '%s %s %s' % (self.method, self.full_path, self.http_version) - yield from self.putline(request_line) + yield From(self.putline(request_line)) # TODO: What if a header is already set? self.headers.append(('User-Agent', 'asyncio-example-crawl/0.0')) self.headers.append(('Host', self.netloc)) @@ -344,15 +356,15 @@ class Request: ##self.headers.append(('Accept-Encoding', 'gzip')) for key, value in self.headers: line = '%s: %s' % (key, value) - yield from self.putline(line) - yield from self.putline('') + yield From(self.putline(line)) + yield From(self.putline('')) @asyncio.coroutine def get_response(self): """Receive the response.""" response = Response(self.log, self.conn.reader) - yield from response.read_headers() - return response + yield From(response.read_headers()) + raise Return(response) class Response: @@ -374,14 +386,15 @@ class Response: @asyncio.coroutine def getline(self): """Read one line from the connection.""" - line = (yield from self.reader.readline()).decode('latin-1').rstrip() + line = (yield From(self.reader.readline())) + line = line.decode('latin-1').rstrip() self.log(2, '<', line) - return line + raise Return(line) @asyncio.coroutine def read_headers(self): """Read the response status and the request headers.""" - status_line = yield from self.getline() + status_line = yield From(self.getline()) status_parts = status_line.split(None, 2) if len(status_parts) != 3: self.log(0, 'bad status_line', repr(status_line)) @@ -389,7 +402,7 @@ class Response: self.http_version, status, self.reason = status_parts self.status = int(status) while True: - header_line = yield from self.getline() + header_line = yield From(self.getline()) if not header_line: break # TODO: Continuation lines. @@ -426,7 +439,7 @@ class Response: self.log(2, 'parsing chunked response') blocks = [] while True: - size_header = yield from self.reader.readline() + size_header = yield From(self.reader.readline()) if not size_header: self.log(0, 'premature end of chunked response') break @@ -435,10 +448,10 @@ class Response: size = int(parts[0], 16) if size: self.log(3, 'reading chunk of', size, 'bytes') - block = yield from self.reader.readexactly(size) + block = yield From(self.reader.readexactly(size)) assert len(block) == size, (len(block), size) blocks.append(block) - crlf = yield from self.reader.readline() + crlf = yield From(self.reader.readline()) assert crlf == b'\r\n', repr(crlf) if not size: break @@ -447,12 +460,12 @@ class Response: 'bytes in', len(blocks), 'blocks') else: self.log(3, 'reading until EOF') - body = yield from self.reader.read() + body = yield From(self.reader.read()) # TODO: Should make sure not to recycle the connection # in this case. else: - body = yield from self.reader.readexactly(nbytes) - return body + body = yield From(self.reader.readexactly(nbytes)) + raise Return(body) class Fetcher: @@ -504,10 +517,10 @@ class Fetcher: self.request = None try: self.request = Request(self.log, self.url, self.crawler.pool) - yield from self.request.connect() - yield from self.request.send_request() - self.response = yield from self.request.get_response() - self.body = yield from self.response.read() + yield From(self.request.connect()) + yield From(self.request.send_request()) + self.response = yield From(self.request.get_response()) + self.body = yield From(self.response.read()) h_conn = self.response.get_header('connection').lower() if h_conn != 'close': self.request.close(recycle=True) @@ -531,7 +544,7 @@ class Fetcher: return next_url = self.response.get_redirect_url() if next_url: - self.next_url = urllib.parse.urljoin(self.url, next_url) + self.next_url = urlparse.urljoin(self.url, next_url) if self.max_redirect > 0: self.log(1, 'redirect to', self.next_url, 'from', self.url) self.crawler.add_url(self.next_url, self.max_redirect-1) @@ -556,8 +569,8 @@ class Fetcher: self.new_urls = set() for url in self.urls: url = unescape(url) - url = urllib.parse.urljoin(self.url, url) - url, frag = urllib.parse.urldefrag(url) + url = urlparse.urljoin(self.url, url) + url, frag = urlparse.urldefrag(url) if self.crawler.add_url(url): self.new_urls.add(url) @@ -657,8 +670,8 @@ class Crawler: self.pool = ConnectionPool(self.log, max_pool, max_tasks) self.root_domains = set() for root in roots: - parts = urllib.parse.urlparse(root) - host, port = urllib.parse.splitport(parts.netloc) + parts = urlparse.urlparse(root) + host, port = urllib_splitport(parts.netloc) if not host: continue if re.match(r'\A[\d\.]*\Z', host): @@ -731,11 +744,11 @@ class Crawler: """Add a URL to the todo list if not seen before.""" if self.exclude and re.search(self.exclude, url): return False - parts = urllib.parse.urlparse(url) + parts = urlparse.urlparse(url) if parts.scheme not in ('http', 'https'): self.log(2, 'skipping non-http scheme in', url) return False - host, port = urllib.parse.splitport(parts.netloc) + host, port = urllib_splitport(parts.netloc) if not self.host_okay(host): self.log(2, 'skipping non-root host in', url) return False @@ -750,7 +763,7 @@ class Crawler: @asyncio.coroutine def crawl(self): """Run the crawler until all finished.""" - with (yield from self.termination): + with (yield From(self.termination)): while self.todo or self.busy: if self.todo: url, max_redirect = self.todo.popitem() @@ -762,7 +775,7 @@ class Crawler: self.busy[url] = fetcher fetcher.task = asyncio.Task(self.fetch(fetcher)) else: - yield from self.termination.wait() + yield From(self.termination.wait()) self.t1 = time.time() @asyncio.coroutine @@ -772,13 +785,13 @@ class Crawler: Once this returns, move the fetcher from busy to done. """ url = fetcher.url - with (yield from self.governor): + with (yield From(self.governor)): try: - yield from fetcher.fetch() # Fetcher gonna fetch. + yield From(fetcher.fetch()) # Fetcher gonna fetch. finally: # Force GC of the task, so the error is logged. fetcher.task = None - with (yield from self.termination): + with (yield From(self.termination)): self.done[url] = fetcher del self.busy[url] self.termination.notify() @@ -828,7 +841,7 @@ def main(): log = Logger(args.level) if args.iocp: - from asyncio.windows_events import ProactorEventLoop + from trollius.windows_events import ProactorEventLoop loop = ProactorEventLoop() asyncio.set_event_loop(loop) elif args.select: |