diff options
Diffstat (limited to 'debian/debian_support.py')
-rw-r--r-- | debian/debian_support.py | 648 |
1 files changed, 648 insertions, 0 deletions
diff --git a/debian/debian_support.py b/debian/debian_support.py new file mode 100644 index 0000000..8a72d63 --- /dev/null +++ b/debian/debian_support.py @@ -0,0 +1,648 @@ +# debian_support.py -- Python module for Debian metadata +# Copyright (C) 2005 Florian Weimer <fw@deneb.enyo.de> +# Copyright (C) 2010 John Wright <jsw@debian.org> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""This module implements facilities to deal with Debian-specific metadata.""" + +from __future__ import absolute_import, print_function + +import os +import re +import hashlib +import types + +from debian.deprecation import function_deprecated_by + +try: + import apt_pkg + apt_pkg.init() + _have_apt_pkg = True +except ImportError: + _have_apt_pkg = False + +class ParseError(Exception): + """An exception which is used to signal a parse failure. + + Attributes: + + filename - name of the file + lineno - line number in the file + msg - error message + + """ + + def __init__(self, filename, lineno, msg): + assert type(lineno) == types.IntType + self.filename = filename + self.lineno = lineno + self.msg = msg + + def __str__(self): + return self.msg + + def __repr__(self): + return "ParseError(%r, %d, %r)" % (self.filename, + self.lineno, + self.msg) + + def print_out(self, file): + """Writes a machine-parsable error message to file.""" + file.write("%s:%d: %s\n" % (self.filename, self.lineno, self.msg)) + file.flush() + + printOut = function_deprecated_by(print_out) + +class BaseVersion(object): + """Base class for classes representing Debian versions + + It doesn't implement any comparison, but it does check for valid versions + according to Section 5.6.12 in the Debian Policy Manual. Since splitting + the version into epoch, upstream_version, and debian_revision components is + pretty much free with the validation, it sets those fields as properties of + the object, and sets the raw version to the full_version property. A + missing epoch or debian_revision results in the respective property set to + None. Setting any of the properties results in the full_version being + recomputed and the rest of the properties set from that. + + It also implements __str__, just returning the raw version given to the + initializer. + """ + + re_valid_version = re.compile( + r"^((?P<epoch>\d+):)?" + "(?P<upstream_version>[A-Za-z0-9.+:~-]+?)" + "(-(?P<debian_revision>[A-Za-z0-9+.~]+))?$") + magic_attrs = ('full_version', 'epoch', 'upstream_version', + 'debian_revision', 'debian_version') + + def __init__(self, version): + self.full_version = version + + def _set_full_version(self, version): + m = self.re_valid_version.match(version) + if not m: + raise ValueError("Invalid version string %r" % version) + # If there no epoch ("1:..."), then the upstream version can not + # contain a :. + if (m.group("epoch") is None and ":" in m.group("upstream_version")): + raise ValueError("Invalid version string %r" % version) + + self.__full_version = version + self.__epoch = m.group("epoch") + self.__upstream_version = m.group("upstream_version") + self.__debian_revision = m.group("debian_revision") + + def __setattr__(self, attr, value): + if attr not in self.magic_attrs: + super(BaseVersion, self).__setattr__(attr, value) + return + + # For compatibility with the old changelog.Version class + if attr == "debian_version": + attr = "debian_revision" + + if attr == "full_version": + self._set_full_version(str(value)) + else: + if value is not None: + value = str(value) + private = "_BaseVersion__%s" % attr + old_value = getattr(self, private) + setattr(self, private, value) + try: + self._update_full_version() + except ValueError: + # Don't leave it in an invalid state + setattr(self, private, old_value) + self._update_full_version() + raise ValueError("Setting %s to %r results in invalid version" + % (attr, value)) + + def __getattr__(self, attr): + if attr not in self.magic_attrs: + return super(BaseVersion, self).__getattribute__(attr) + + # For compatibility with the old changelog.Version class + if attr == "debian_version": + attr = "debian_revision" + + private = "_BaseVersion__%s" % attr + return getattr(self, private) + + def _update_full_version(self): + version = "" + if self.__epoch is not None: + version += self.__epoch + ":" + version += self.__upstream_version + if self.__debian_revision: + version += "-" + self.__debian_revision + self.full_version = version + + def __str__(self): + return self.full_version + + def __repr__(self): + return "%s('%s')" % (self.__class__.__name__, self) + + def _compare(self, other): + raise NotImplementedError + + # TODO: Once we support only Python >= 2.7, we can simplify this using + # @functools.total_ordering. + + def __lt__(self, other): + return self._compare(other) < 0 + + def __le__(self, other): + return self._compare(other) <= 0 + + def __eq__(self, other): + return self._compare(other) == 0 + + def __ne__(self, other): + return self._compare(other) != 0 + + def __ge__(self, other): + return self._compare(other) >= 0 + + def __gt__(self, other): + return self._compare(other) > 0 + + def __hash__(self): + return hash(str(self)) + +class AptPkgVersion(BaseVersion): + """Represents a Debian package version, using apt_pkg.VersionCompare""" + + def __init__(self, version): + if not _have_apt_pkg: + raise NotImplementedError("apt_pkg not available; install the " + "python-apt package") + super(AptPkgVersion, self).__init__(version) + + def _compare(self, other): + return apt_pkg.version_compare(str(self), str(other)) + +# NativeVersion based on the DpkgVersion class by Raphael Hertzog in +# svn://svn.debian.org/qa/trunk/pts/www/bin/common.py r2361 +class NativeVersion(BaseVersion): + """Represents a Debian package version, with native Python comparison""" + + re_all_digits_or_not = re.compile("\d+|\D+") + re_digits = re.compile("\d+") + re_digit = re.compile("\d") + re_alpha = re.compile("[A-Za-z]") + + def _compare(self, other): + # Convert other into an instance of BaseVersion if it's not already. + # (All we need is epoch, upstream_version, and debian_revision + # attributes, which BaseVersion gives us.) Requires other's string + # representation to be the raw version. + if not isinstance(other, BaseVersion): + try: + other = BaseVersion(str(other)) + except ValueError as e: + raise ValueError("Couldn't convert %r to BaseVersion: %s" + % (other, e)) + + lepoch = int(self.epoch or "0") + repoch = int(other.epoch or "0") + if lepoch < repoch: + return -1 + elif lepoch > repoch: + return 1 + res = self._version_cmp_part(self.upstream_version, + other.upstream_version) + if res != 0: + return res + return self._version_cmp_part(self.debian_revision or "0", + other.debian_revision or "0") + + @classmethod + def _order(cls, x): + """Return an integer value for character x""" + if x == '~': + return -1 + elif cls.re_digit.match(x): + return int(x) + 1 + elif cls.re_alpha.match(x): + return ord(x) + else: + return ord(x) + 256 + + @classmethod + def _version_cmp_string(cls, va, vb): + la = [cls._order(x) for x in va] + lb = [cls._order(x) for x in vb] + while la or lb: + a = 0 + b = 0 + if la: + a = la.pop(0) + if lb: + b = lb.pop(0) + if a < b: + return -1 + elif a > b: + return 1 + return 0 + + @classmethod + def _version_cmp_part(cls, va, vb): + la = cls.re_all_digits_or_not.findall(va) + lb = cls.re_all_digits_or_not.findall(vb) + while la or lb: + a = "0" + b = "0" + if la: + a = la.pop(0) + if lb: + b = lb.pop(0) + if cls.re_digits.match(a) and cls.re_digits.match(b): + a = int(a) + b = int(b) + if a < b: + return -1 + elif a > b: + return 1 + else: + res = cls._version_cmp_string(a, b) + if res != 0: + return res + return 0 + +if _have_apt_pkg: + class Version(AptPkgVersion): + pass +else: + class Version(NativeVersion): + pass + +def version_compare(a, b): + va = Version(a) + vb = Version(b) + if va < vb: + return -1 + elif va > vb: + return 1 + else: + return 0 + +class PackageFile: + """A Debian package file. + + Objects of this class can be used to read Debian's Source and + Packages files.""" + + re_field = re.compile(r'^([A-Za-z][A-Za-z0-9-]+):(?:\s*(.*?))?\s*$') + re_continuation = re.compile(r'^\s+(?:\.|(\S.*?)\s*)$') + + def __init__(self, name, file_obj=None): + """Creates a new package file object. + + name - the name of the file the data comes from + file_obj - an alternate data source; the default is to open the + file with the indicated name. + """ + if file_obj is None: + file_obj = open(name) + self.name = name + self.file = file_obj + self.lineno = 0 + + def __iter__(self): + line = self.file.readline() + self.lineno += 1 + pkg = [] + while line: + if line.strip(' \t') == '\n': + if len(pkg) == 0: + self.raise_syntax_error('expected package record') + yield pkg + pkg = [] + line = self.file.readline() + self.lineno += 1 + continue + + match = self.re_field.match(line) + if not match: + self.raise_syntax_error("expected package field") + (name, contents) = match.groups() + contents = contents or '' + + while True: + line = self.file.readline() + self.lineno += 1 + match = self.re_continuation.match(line) + if match: + (ncontents,) = match.groups() + if ncontents is None: + ncontents = "" + contents = "%s\n%s" % (contents, ncontents) + else: + break + pkg.append((name, contents)) + if pkg: + yield pkg + + def raise_syntax_error(self, msg, lineno=None): + if lineno is None: + lineno = self.lineno + raise ParseError(self.name, lineno, msg) + + raiseSyntaxError = function_deprecated_by(raise_syntax_error) + +class PseudoEnum: + """A base class for types which resemble enumeration types.""" + def __init__(self, name, order): + self._name = name + self._order = order + def __repr__(self): + return '%s(%r)' % (self.__class__._name__, self._name) + def __str__(self): + return self._name + # TODO: Once we support only Python >= 2.7, we can simplify this using + # @functools.total_ordering. + def __lt__(self, other): + return self._order < other._order + def __le__(self, other): + return self._order <= other._order + def __eq__(self, other): + return self._order == other._order + def __ne__(self, other): + return self._order != other._order + def __ge__(self, other): + return self._order >= other._order + def __gt__(self, other): + return self._order > other._order + def __hash__(self): + return hash(self._order) + +class Release(PseudoEnum): pass + +def list_releases(): + releases = {} + rels = ("potato", "woody", "sarge", "etch", "lenny", "sid") + for r in range(len(rels)): + releases[rels[r]] = Release(rels[r], r) + Release.releases = releases + return releases + +listReleases = function_deprecated_by(list_releases) + +def intern_release(name, releases=list_releases()): + return releases.get(name) + +internRelease = function_deprecated_by(intern_release) + +del listReleases +del list_releases + +def read_lines_sha1(lines): + m = hashlib.sha1() + for l in lines: + if isinstance(l, bytes): + m.update(l) + else: + m.update(l.encode("UTF-8")) + return m.hexdigest() + +readLinesSHA1 = function_deprecated_by(read_lines_sha1) + +def patches_from_ed_script(source, + re_cmd=re.compile(r'^(\d+)(?:,(\d+))?([acd])$')): + """Converts source to a stream of patches. + + Patches are triples of line indexes: + + - number of the first line to be replaced + - one plus the number of the last line to be replaced + - list of line replacements + + This is enough to model arbitrary additions, deletions and + replacements. + """ + + i = iter(source) + + for line in i: + match = re_cmd.match(line) + if match is None: + raise ValueError("invalid patch command: %r" % line) + + (first, last, cmd) = match.groups() + first = int(first) + if last is not None: + last = int(last) + + if cmd == 'd': + first = first - 1 + if last is None: + last = first + 1 + yield (first, last, []) + continue + + if cmd == 'a': + if last is not None: + raise ValueError("invalid patch argument: %r" % line) + last = first + else: # cmd == c + first = first - 1 + if last is None: + last = first + 1 + + lines = [] + for l in i: + if l == '': + raise ValueError("end of stream in command: %r" % line) + if l == '.\n' or l == '.': + break + lines.append(l) + yield (first, last, lines) + +patchesFromEdScript = function_deprecated_by(patches_from_ed_script) + +def patch_lines(lines, patches): + """Applies patches to lines. Updates lines in place.""" + for (first, last, args) in patches: + lines[first:last] = args + +patchLines = function_deprecated_by(patch_lines) + +def replace_file(lines, local): + + import os.path + + local_new = local + '.new' + new_file = open(local_new, 'w+') + + try: + for l in lines: + new_file.write(l) + new_file.close() + os.rename(local_new, local) + finally: + if os.path.exists(local_new): + os.unlink(local_new) + +replaceFile = function_deprecated_by(replace_file) + +def download_gunzip_lines(remote): + """Downloads a file from a remote location and gunzips it. + + Returns the lines in the file.""" + + # The implementation is rather crude, but it seems that the gzip + # module needs a real file for input. + + import gzip + import tempfile + import urllib + + (handle, fname) = tempfile.mkstemp() + try: + os.close(handle) + (filename, headers) = urllib.urlretrieve(remote, fname) + gfile = gzip.GzipFile(filename) + lines = gfile.readlines() + gfile.close() + finally: + os.unlink(fname) + return lines + +downloadGunzipLines = function_deprecated_by(download_gunzip_lines) + +def download_file(remote, local): + """Copies a gzipped remote file to the local system. + + remote - URL, without the .gz suffix + local - name of the local file + """ + + lines = download_gunzip_lines(remote + '.gz') + replace_file(lines, local) + return lines + +downloadFile = function_deprecated_by(download_file) + +def update_file(remote, local, verbose=None): + """Updates the local file by downloading a remote patch. + + Returns a list of lines in the local file. + """ + + try: + local_file = open(local) + except IOError: + if verbose: + print("update_file: no local copy, downloading full file") + return download_file(remote, local) + + lines = local_file.readlines() + local_file.close() + local_hash = read_lines_sha1(lines) + patches_to_apply = [] + patch_hashes = {} + + import urllib + index_name = remote + '.diff/Index' + + re_whitespace=re.compile('\s+') + + try: + index_url = urllib.urlopen(index_name) + index_fields = list(PackageFile(index_name, index_url)) + except ParseError: + # FIXME: urllib does not raise a proper exception, so we parse + # the error message. + if verbose: + print("update_file: could not interpret patch index file") + return download_file(remote, local) + except IOError: + if verbose: + print("update_file: could not download patch index file") + return download_file(remote, local) + + for fields in index_fields: + for (field, value) in fields: + if field == 'SHA1-Current': + (remote_hash, remote_size) = re_whitespace.split(value) + if local_hash == remote_hash: + if verbose: + print("update_file: local file is up-to-date") + return lines + continue + + if field =='SHA1-History': + for entry in value.splitlines(): + if entry == '': + continue + (hist_hash, hist_size, patch_name) \ + = re_whitespace.split(entry) + + # After the first patch, we have to apply all + # remaining patches. + if patches_to_apply or hist_hash == local_hash: + patches_to_apply.append(patch_name) + + continue + + if field == 'SHA1-Patches': + for entry in value.splitlines(): + if entry == '': + continue + (patch_hash, patch_size, patch_name) \ + = re_whitespace.split(entry) + patch_hashes[patch_name] = patch_hash + continue + + if verbose: + print("update_file: field %r ignored" % field) + + if not patches_to_apply: + if verbose: + print("update_file: could not find historic entry", local_hash) + return download_file(remote, local) + + for patch_name in patches_to_apply: + print("update_file: downloading patch %r" % patch_name) + patch_contents = download_gunzip_lines(remote + '.diff/' + patch_name + + '.gz') + if read_lines_sha1(patch_contents ) != patch_hashes[patch_name]: + raise ValueError("patch %r was garbled" % patch_name) + patch_lines(lines, patches_from_ed_script(patch_contents)) + + new_hash = read_lines_sha1(lines) + if new_hash != remote_hash: + raise ValueError("patch failed, got %s instead of %s" + % (new_hash, remote_hash)) + + replace_file(lines, local) + return lines + +updateFile = function_deprecated_by(update_file) + +def merge_as_sets(*args): + """Create an order set (represented as a list) of the objects in + the sequences passed as arguments.""" + s = {} + for x in args: + for y in x: + s[y] = True + return sorted(s) + +mergeAsSets = function_deprecated_by(merge_as_sets) |