""" robotparser.py Copyright (C) 2000 Bastian Kleineidam You can choose between two licenses when using this package: 1) GNU GPLv2 2) PSF license for Python 2.2 The robots.txt Exclusion Protocol is implemented as specified in http://www.robotstxt.org/norobots-rfc.txt """ import collections import urllib.parse import urllib.request __all__ = ["RobotFileParser"] RequestRate = collections.namedtuple("RequestRate", "requests seconds") class RobotFileParser: """ This class provides a set of methods to read, parse and answer questions about a single robots.txt file. """ def __init__(self, url=''): self.entries = [] self.sitemaps = [] self.default_entry = None self.disallow_all = False self.allow_all = False self.set_url(url) self.last_checked = 0 def mtime(self): """Returns the time the robots.txt file was last fetched. This is useful for long-running web spiders that need to check for new robots.txt files periodically. """ return self.last_checked def modified(self): """Sets the time the robots.txt file was last fetched to the current time. """ import time self.last_checked = time.time() def set_url(self, url): """Sets the URL referring to a robots.txt file.""" self.url = url self.host, self.path = urllib.parse.urlparse(url)[1:3] def read(self): """Reads the robots.txt URL and feeds it to the parser.""" try: f = urllib.request.urlopen(self.url) except urllib.error.HTTPError as err: if err.code in (401, 403): self.disallow_all = True elif err.code >= 400 and err.code < 500: self.allow_all = True else: raw = f.read() self.parse(raw.decode("utf-8").splitlines()) def _add_entry(self, entry): if "*" in entry.useragents: # the default entry is considered last if self.default_entry is None: # the first default entry wins self.default_entry = entry else: self.entries.append(entry) def parse(self, lines): """Parse the input lines from a robots.txt file. We allow that a user-agent: line is not preceded by one or more blank lines. """ # states: # 0: start state # 1: saw user-agent line # 2: saw an allow or disallow line state = 0 entry = Entry() self.modified() for line in lines: if not line: if state == 1: entry = Entry() state = 0 elif state == 2: self._add_entry(entry) entry = Entry() state = 0 # remove optional comment and strip line i = line.find('#') if i >= 0: line = line[:i] line = line.strip() if not line: continue line = line.split(':', 1) if len(line) == 2: line[0] = line[0].strip().lower() line[1] = urllib.parse.unquote(line[1].strip()) if line[0] == "user-agent": if state == 2: self._add_entry(entry) entry = Entry() entry.useragents.append(line[1]) state = 1 elif line[0] == "disallow": if state != 0: entry.rulelines.append(RuleLine(line[1], False)) state = 2 elif line[0] == "allow": if state != 0: entry.rulelines.append(RuleLine(line[1], True)) state = 2 elif line[0] == "crawl-delay": if state != 0: # before trying to convert to int we need to make # sure that robots.txt has valid syntax otherwise # it will crash if line[1].strip().isdigit(): entry.delay = int(line[1]) state = 2 elif line[0] == "request-rate": if state != 0: numbers = line[1].split('/') # check if all values are sane if (len(numbers) == 2 and numbers[0].strip().isdigit() and numbers[1].strip().isdigit()): entry.req_rate = RequestRate(int(numbers[0]), int(numbers[1])) state = 2 elif line[0] == "sitemap": # According to http://www.sitemaps.org/protocol.html # "This directive is independent of the user-agent line, # so it doesn't matter where you place it in your file." # Therefore we do not change the state of the parser. self.sitemaps.append(line[1]) if state == 2: self._add_entry(entry) def can_fetch(self, useragent, url): """using the parsed robots.txt decide if useragent can fetch url""" if self.disallow_all: return False if self.allow_all: return True # Until the robots.txt file has been read or found not # to exist, we must assume that no url is allowable. # This prevents false positives when a user erroneously # calls can_fetch() before calling read(). if not self.last_checked: return False # search for given user agent matches # the first match counts parsed_url = urllib.parse.urlparse(urllib.parse.unquote(url)) url = urllib.parse.urlunparse(('','',parsed_url.path, parsed_url.params,parsed_url.query, parsed_url.fragment)) url = urllib.parse.quote(url) if not url: url = "/" for entry in self.entries: if entry.applies_to(useragent): return entry.allowance(url) # try the default entry last if self.default_entry: return self.default_entry.allowance(url) # agent not found ==> access granted return True def crawl_delay(self, useragent): if not self.mtime(): return None for entry in self.entries: if entry.applies_to(useragent): return entry.delay if self.default_entry: return self.default_entry.delay return None def request_rate(self, useragent): if not self.mtime(): return None for entry in self.entries: if entry.applies_to(useragent): return entry.req_rate if self.default_entry: return self.default_entry.req_rate return None def site_maps(self): if not self.sitemaps: return None return self.sitemaps def __str__(self): entries = self.entries if self.default_entry is not None: entries = entries + [self.default_entry] return '\n\n'.join(map(str, entries)) class RuleLine: """A rule line is a single "Allow:" (allowance==True) or "Disallow:" (allowance==False) followed by a path.""" def __init__(self, path, allowance): if path == '' and not allowance: # an empty value means allow all allowance = True path = urllib.parse.urlunparse(urllib.parse.urlparse(path)) self.path = urllib.parse.quote(path) self.allowance = allowance def applies_to(self, filename): return self.path == "*" or filename.startswith(self.path) def __str__(self): return ("Allow" if self.allowance else "Disallow") + ": " + self.path class Entry: """An entry has one or more user-agents and zero or more rulelines""" def __init__(self): self.useragents = [] self.rulelines = [] self.delay = None self.req_rate = None def __str__(self): ret = [] for agent in self.useragents: ret.append(f"User-agent: {agent}") if self.delay is not None: ret.append(f"Crawl-delay: {self.delay}") if self.req_rate is not None: rate = self.req_rate ret.append(f"Request-rate: {rate.requests}/{rate.seconds}") ret.extend(map(str, self.rulelines)) return '\n'.join(ret) def applies_to(self, useragent): """check if this entry applies to the specified agent""" # split the name token and make it lower case useragent = useragent.split("/")[0].lower() for agent in self.useragents: if agent == '*': # we have the catch-all agent return True agent = agent.lower() if agent in useragent: return True return False def allowance(self, filename): """Preconditions: - our agent applies to this entry - filename is URL decoded""" for line in self.rulelines: if line.applies_to(filename): return line.allowance return True