From 9db62b92fa21b955eca4a2672e191c56f98eb6ff Mon Sep 17 00:00:00 2001 From: Daniel Silverstone Date: Mon, 4 Aug 2014 14:35:24 +0100 Subject: Debian bits and a README to mention where it comes from --- README | 11 + debian/__init__.py | 3 + debian/debian_support.py | 648 +++++++++++++++++++++++ debian/deprecation.py | 38 ++ debian/foo/arfile.py | 350 ++++++++++++ debian/foo/changelog.py | 609 +++++++++++++++++++++ debian/foo/deb822.py | 1318 ++++++++++++++++++++++++++++++++++++++++++++++ debian/foo/debfile.py | 325 ++++++++++++ debian/foo/debtags.py | 513 ++++++++++++++++++ debian/foo/doc-debtags | 104 ++++ 10 files changed, 3919 insertions(+) create mode 100644 README create mode 100644 debian/__init__.py create mode 100644 debian/debian_support.py create mode 100644 debian/deprecation.py create mode 100644 debian/foo/arfile.py create mode 100644 debian/foo/changelog.py create mode 100644 debian/foo/deb822.py create mode 100644 debian/foo/debfile.py create mode 100644 debian/foo/debtags.py create mode 100755 debian/foo/doc-debtags diff --git a/README b/README new file mode 100644 index 0000000..1b89f16 --- /dev/null +++ b/README @@ -0,0 +1,11 @@ +Firehose +======== + +*TODO*: Explain Firehose + + +Debian Version Comparison +========================= + +The `debian/` directory contains a subset of the `python-debian` module which +is under the GPL and has its authors listed in the files themselves. diff --git a/debian/__init__.py b/debian/__init__.py new file mode 100644 index 0000000..b28b04f --- /dev/null +++ b/debian/__init__.py @@ -0,0 +1,3 @@ + + + diff --git a/debian/debian_support.py b/debian/debian_support.py new file mode 100644 index 0000000..8a72d63 --- /dev/null +++ b/debian/debian_support.py @@ -0,0 +1,648 @@ +# debian_support.py -- Python module for Debian metadata +# Copyright (C) 2005 Florian Weimer +# Copyright (C) 2010 John Wright +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +"""This module implements facilities to deal with Debian-specific metadata.""" + +from __future__ import absolute_import, print_function + +import os +import re +import hashlib +import types + +from debian.deprecation import function_deprecated_by + +try: + import apt_pkg + apt_pkg.init() + _have_apt_pkg = True +except ImportError: + _have_apt_pkg = False + +class ParseError(Exception): + """An exception which is used to signal a parse failure. + + Attributes: + + filename - name of the file + lineno - line number in the file + msg - error message + + """ + + def __init__(self, filename, lineno, msg): + assert type(lineno) == types.IntType + self.filename = filename + self.lineno = lineno + self.msg = msg + + def __str__(self): + return self.msg + + def __repr__(self): + return "ParseError(%r, %d, %r)" % (self.filename, + self.lineno, + self.msg) + + def print_out(self, file): + """Writes a machine-parsable error message to file.""" + file.write("%s:%d: %s\n" % (self.filename, self.lineno, self.msg)) + file.flush() + + printOut = function_deprecated_by(print_out) + +class BaseVersion(object): + """Base class for classes representing Debian versions + + It doesn't implement any comparison, but it does check for valid versions + according to Section 5.6.12 in the Debian Policy Manual. Since splitting + the version into epoch, upstream_version, and debian_revision components is + pretty much free with the validation, it sets those fields as properties of + the object, and sets the raw version to the full_version property. A + missing epoch or debian_revision results in the respective property set to + None. Setting any of the properties results in the full_version being + recomputed and the rest of the properties set from that. + + It also implements __str__, just returning the raw version given to the + initializer. + """ + + re_valid_version = re.compile( + r"^((?P\d+):)?" + "(?P[A-Za-z0-9.+:~-]+?)" + "(-(?P[A-Za-z0-9+.~]+))?$") + magic_attrs = ('full_version', 'epoch', 'upstream_version', + 'debian_revision', 'debian_version') + + def __init__(self, version): + self.full_version = version + + def _set_full_version(self, version): + m = self.re_valid_version.match(version) + if not m: + raise ValueError("Invalid version string %r" % version) + # If there no epoch ("1:..."), then the upstream version can not + # contain a :. + if (m.group("epoch") is None and ":" in m.group("upstream_version")): + raise ValueError("Invalid version string %r" % version) + + self.__full_version = version + self.__epoch = m.group("epoch") + self.__upstream_version = m.group("upstream_version") + self.__debian_revision = m.group("debian_revision") + + def __setattr__(self, attr, value): + if attr not in self.magic_attrs: + super(BaseVersion, self).__setattr__(attr, value) + return + + # For compatibility with the old changelog.Version class + if attr == "debian_version": + attr = "debian_revision" + + if attr == "full_version": + self._set_full_version(str(value)) + else: + if value is not None: + value = str(value) + private = "_BaseVersion__%s" % attr + old_value = getattr(self, private) + setattr(self, private, value) + try: + self._update_full_version() + except ValueError: + # Don't leave it in an invalid state + setattr(self, private, old_value) + self._update_full_version() + raise ValueError("Setting %s to %r results in invalid version" + % (attr, value)) + + def __getattr__(self, attr): + if attr not in self.magic_attrs: + return super(BaseVersion, self).__getattribute__(attr) + + # For compatibility with the old changelog.Version class + if attr == "debian_version": + attr = "debian_revision" + + private = "_BaseVersion__%s" % attr + return getattr(self, private) + + def _update_full_version(self): + version = "" + if self.__epoch is not None: + version += self.__epoch + ":" + version += self.__upstream_version + if self.__debian_revision: + version += "-" + self.__debian_revision + self.full_version = version + + def __str__(self): + return self.full_version + + def __repr__(self): + return "%s('%s')" % (self.__class__.__name__, self) + + def _compare(self, other): + raise NotImplementedError + + # TODO: Once we support only Python >= 2.7, we can simplify this using + # @functools.total_ordering. + + def __lt__(self, other): + return self._compare(other) < 0 + + def __le__(self, other): + return self._compare(other) <= 0 + + def __eq__(self, other): + return self._compare(other) == 0 + + def __ne__(self, other): + return self._compare(other) != 0 + + def __ge__(self, other): + return self._compare(other) >= 0 + + def __gt__(self, other): + return self._compare(other) > 0 + + def __hash__(self): + return hash(str(self)) + +class AptPkgVersion(BaseVersion): + """Represents a Debian package version, using apt_pkg.VersionCompare""" + + def __init__(self, version): + if not _have_apt_pkg: + raise NotImplementedError("apt_pkg not available; install the " + "python-apt package") + super(AptPkgVersion, self).__init__(version) + + def _compare(self, other): + return apt_pkg.version_compare(str(self), str(other)) + +# NativeVersion based on the DpkgVersion class by Raphael Hertzog in +# svn://svn.debian.org/qa/trunk/pts/www/bin/common.py r2361 +class NativeVersion(BaseVersion): + """Represents a Debian package version, with native Python comparison""" + + re_all_digits_or_not = re.compile("\d+|\D+") + re_digits = re.compile("\d+") + re_digit = re.compile("\d") + re_alpha = re.compile("[A-Za-z]") + + def _compare(self, other): + # Convert other into an instance of BaseVersion if it's not already. + # (All we need is epoch, upstream_version, and debian_revision + # attributes, which BaseVersion gives us.) Requires other's string + # representation to be the raw version. + if not isinstance(other, BaseVersion): + try: + other = BaseVersion(str(other)) + except ValueError as e: + raise ValueError("Couldn't convert %r to BaseVersion: %s" + % (other, e)) + + lepoch = int(self.epoch or "0") + repoch = int(other.epoch or "0") + if lepoch < repoch: + return -1 + elif lepoch > repoch: + return 1 + res = self._version_cmp_part(self.upstream_version, + other.upstream_version) + if res != 0: + return res + return self._version_cmp_part(self.debian_revision or "0", + other.debian_revision or "0") + + @classmethod + def _order(cls, x): + """Return an integer value for character x""" + if x == '~': + return -1 + elif cls.re_digit.match(x): + return int(x) + 1 + elif cls.re_alpha.match(x): + return ord(x) + else: + return ord(x) + 256 + + @classmethod + def _version_cmp_string(cls, va, vb): + la = [cls._order(x) for x in va] + lb = [cls._order(x) for x in vb] + while la or lb: + a = 0 + b = 0 + if la: + a = la.pop(0) + if lb: + b = lb.pop(0) + if a < b: + return -1 + elif a > b: + return 1 + return 0 + + @classmethod + def _version_cmp_part(cls, va, vb): + la = cls.re_all_digits_or_not.findall(va) + lb = cls.re_all_digits_or_not.findall(vb) + while la or lb: + a = "0" + b = "0" + if la: + a = la.pop(0) + if lb: + b = lb.pop(0) + if cls.re_digits.match(a) and cls.re_digits.match(b): + a = int(a) + b = int(b) + if a < b: + return -1 + elif a > b: + return 1 + else: + res = cls._version_cmp_string(a, b) + if res != 0: + return res + return 0 + +if _have_apt_pkg: + class Version(AptPkgVersion): + pass +else: + class Version(NativeVersion): + pass + +def version_compare(a, b): + va = Version(a) + vb = Version(b) + if va < vb: + return -1 + elif va > vb: + return 1 + else: + return 0 + +class PackageFile: + """A Debian package file. + + Objects of this class can be used to read Debian's Source and + Packages files.""" + + re_field = re.compile(r'^([A-Za-z][A-Za-z0-9-]+):(?:\s*(.*?))?\s*$') + re_continuation = re.compile(r'^\s+(?:\.|(\S.*?)\s*)$') + + def __init__(self, name, file_obj=None): + """Creates a new package file object. + + name - the name of the file the data comes from + file_obj - an alternate data source; the default is to open the + file with the indicated name. + """ + if file_obj is None: + file_obj = open(name) + self.name = name + self.file = file_obj + self.lineno = 0 + + def __iter__(self): + line = self.file.readline() + self.lineno += 1 + pkg = [] + while line: + if line.strip(' \t') == '\n': + if len(pkg) == 0: + self.raise_syntax_error('expected package record') + yield pkg + pkg = [] + line = self.file.readline() + self.lineno += 1 + continue + + match = self.re_field.match(line) + if not match: + self.raise_syntax_error("expected package field") + (name, contents) = match.groups() + contents = contents or '' + + while True: + line = self.file.readline() + self.lineno += 1 + match = self.re_continuation.match(line) + if match: + (ncontents,) = match.groups() + if ncontents is None: + ncontents = "" + contents = "%s\n%s" % (contents, ncontents) + else: + break + pkg.append((name, contents)) + if pkg: + yield pkg + + def raise_syntax_error(self, msg, lineno=None): + if lineno is None: + lineno = self.lineno + raise ParseError(self.name, lineno, msg) + + raiseSyntaxError = function_deprecated_by(raise_syntax_error) + +class PseudoEnum: + """A base class for types which resemble enumeration types.""" + def __init__(self, name, order): + self._name = name + self._order = order + def __repr__(self): + return '%s(%r)' % (self.__class__._name__, self._name) + def __str__(self): + return self._name + # TODO: Once we support only Python >= 2.7, we can simplify this using + # @functools.total_ordering. + def __lt__(self, other): + return self._order < other._order + def __le__(self, other): + return self._order <= other._order + def __eq__(self, other): + return self._order == other._order + def __ne__(self, other): + return self._order != other._order + def __ge__(self, other): + return self._order >= other._order + def __gt__(self, other): + return self._order > other._order + def __hash__(self): + return hash(self._order) + +class Release(PseudoEnum): pass + +def list_releases(): + releases = {} + rels = ("potato", "woody", "sarge", "etch", "lenny", "sid") + for r in range(len(rels)): + releases[rels[r]] = Release(rels[r], r) + Release.releases = releases + return releases + +listReleases = function_deprecated_by(list_releases) + +def intern_release(name, releases=list_releases()): + return releases.get(name) + +internRelease = function_deprecated_by(intern_release) + +del listReleases +del list_releases + +def read_lines_sha1(lines): + m = hashlib.sha1() + for l in lines: + if isinstance(l, bytes): + m.update(l) + else: + m.update(l.encode("UTF-8")) + return m.hexdigest() + +readLinesSHA1 = function_deprecated_by(read_lines_sha1) + +def patches_from_ed_script(source, + re_cmd=re.compile(r'^(\d+)(?:,(\d+))?([acd])$')): + """Converts source to a stream of patches. + + Patches are triples of line indexes: + + - number of the first line to be replaced + - one plus the number of the last line to be replaced + - list of line replacements + + This is enough to model arbitrary additions, deletions and + replacements. + """ + + i = iter(source) + + for line in i: + match = re_cmd.match(line) + if match is None: + raise ValueError("invalid patch command: %r" % line) + + (first, last, cmd) = match.groups() + first = int(first) + if last is not None: + last = int(last) + + if cmd == 'd': + first = first - 1 + if last is None: + last = first + 1 + yield (first, last, []) + continue + + if cmd == 'a': + if last is not None: + raise ValueError("invalid patch argument: %r" % line) + last = first + else: # cmd == c + first = first - 1 + if last is None: + last = first + 1 + + lines = [] + for l in i: + if l == '': + raise ValueError("end of stream in command: %r" % line) + if l == '.\n' or l == '.': + break + lines.append(l) + yield (first, last, lines) + +patchesFromEdScript = function_deprecated_by(patches_from_ed_script) + +def patch_lines(lines, patches): + """Applies patches to lines. Updates lines in place.""" + for (first, last, args) in patches: + lines[first:last] = args + +patchLines = function_deprecated_by(patch_lines) + +def replace_file(lines, local): + + import os.path + + local_new = local + '.new' + new_file = open(local_new, 'w+') + + try: + for l in lines: + new_file.write(l) + new_file.close() + os.rename(local_new, local) + finally: + if os.path.exists(local_new): + os.unlink(local_new) + +replaceFile = function_deprecated_by(replace_file) + +def download_gunzip_lines(remote): + """Downloads a file from a remote location and gunzips it. + + Returns the lines in the file.""" + + # The implementation is rather crude, but it seems that the gzip + # module needs a real file for input. + + import gzip + import tempfile + import urllib + + (handle, fname) = tempfile.mkstemp() + try: + os.close(handle) + (filename, headers) = urllib.urlretrieve(remote, fname) + gfile = gzip.GzipFile(filename) + lines = gfile.readlines() + gfile.close() + finally: + os.unlink(fname) + return lines + +downloadGunzipLines = function_deprecated_by(download_gunzip_lines) + +def download_file(remote, local): + """Copies a gzipped remote file to the local system. + + remote - URL, without the .gz suffix + local - name of the local file + """ + + lines = download_gunzip_lines(remote + '.gz') + replace_file(lines, local) + return lines + +downloadFile = function_deprecated_by(download_file) + +def update_file(remote, local, verbose=None): + """Updates the local file by downloading a remote patch. + + Returns a list of lines in the local file. + """ + + try: + local_file = open(local) + except IOError: + if verbose: + print("update_file: no local copy, downloading full file") + return download_file(remote, local) + + lines = local_file.readlines() + local_file.close() + local_hash = read_lines_sha1(lines) + patches_to_apply = [] + patch_hashes = {} + + import urllib + index_name = remote + '.diff/Index' + + re_whitespace=re.compile('\s+') + + try: + index_url = urllib.urlopen(index_name) + index_fields = list(PackageFile(index_name, index_url)) + except ParseError: + # FIXME: urllib does not raise a proper exception, so we parse + # the error message. + if verbose: + print("update_file: could not interpret patch index file") + return download_file(remote, local) + except IOError: + if verbose: + print("update_file: could not download patch index file") + return download_file(remote, local) + + for fields in index_fields: + for (field, value) in fields: + if field == 'SHA1-Current': + (remote_hash, remote_size) = re_whitespace.split(value) + if local_hash == remote_hash: + if verbose: + print("update_file: local file is up-to-date") + return lines + continue + + if field =='SHA1-History': + for entry in value.splitlines(): + if entry == '': + continue + (hist_hash, hist_size, patch_name) \ + = re_whitespace.split(entry) + + # After the first patch, we have to apply all + # remaining patches. + if patches_to_apply or hist_hash == local_hash: + patches_to_apply.append(patch_name) + + continue + + if field == 'SHA1-Patches': + for entry in value.splitlines(): + if entry == '': + continue + (patch_hash, patch_size, patch_name) \ + = re_whitespace.split(entry) + patch_hashes[patch_name] = patch_hash + continue + + if verbose: + print("update_file: field %r ignored" % field) + + if not patches_to_apply: + if verbose: + print("update_file: could not find historic entry", local_hash) + return download_file(remote, local) + + for patch_name in patches_to_apply: + print("update_file: downloading patch %r" % patch_name) + patch_contents = download_gunzip_lines(remote + '.diff/' + patch_name + + '.gz') + if read_lines_sha1(patch_contents ) != patch_hashes[patch_name]: + raise ValueError("patch %r was garbled" % patch_name) + patch_lines(lines, patches_from_ed_script(patch_contents)) + + new_hash = read_lines_sha1(lines) + if new_hash != remote_hash: + raise ValueError("patch failed, got %s instead of %s" + % (new_hash, remote_hash)) + + replace_file(lines, local) + return lines + +updateFile = function_deprecated_by(update_file) + +def merge_as_sets(*args): + """Create an order set (represented as a list) of the objects in + the sequences passed as arguments.""" + s = {} + for x in args: + for y in x: + s[y] = True + return sorted(s) + +mergeAsSets = function_deprecated_by(merge_as_sets) diff --git a/debian/deprecation.py b/debian/deprecation.py new file mode 100644 index 0000000..b9e4c09 --- /dev/null +++ b/debian/deprecation.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- vim: fileencoding=utf-8 : +# +# debian/deprecation.py +# Utility module to deprecate features +# +# Copyright © Ben Finney +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation, either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +""" Utility module to deprecate features """ + +import warnings + +def function_deprecated_by(func): + """ Return a function that warns it is deprecated by another function. + + Returns a new function that warns it is deprecated by function + ``func``, then acts as a pass-through wrapper for ``func``. + + """ + func_name = func.__name__ + warn_msg = "Use %(func_name)s instead" % vars() + def deprecated_func(*args, **kwargs): + warnings.warn(warn_msg, DeprecationWarning, stacklevel=2) + return func(*args, **kwargs) + return deprecated_func diff --git a/debian/foo/arfile.py b/debian/foo/arfile.py new file mode 100644 index 0000000..fe935f3 --- /dev/null +++ b/debian/foo/arfile.py @@ -0,0 +1,350 @@ +# ArFile: a Python representation of ar (as in "man 1 ar") archives. +# Copyright (C) 2007 Stefano Zacchiroli +# Copyright (C) 2007 Filippo Giunchedi +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import print_function + +import sys + +GLOBAL_HEADER = b"!\n" +GLOBAL_HEADER_LENGTH = len(GLOBAL_HEADER) + +FILE_HEADER_LENGTH = 60 +FILE_MAGIC = b"`\n" + +class ArError(Exception): + pass + +class ArFile(object): + """ Representation of an ar archive, see man 1 ar. + + The interface of this class tries to mimick that of the TarFile module in + the standard library. + + ArFile objects have the following (read-only) properties: + - members same as getmembers() + """ + + def __init__(self, filename=None, mode='r', fileobj=None, + encoding=None, errors=None): + """ Build an ar file representation starting from either a filename or + an existing file object. The only supported mode is 'r'. + + In Python 3, the encoding and errors parameters control how member + names are decoded into Unicode strings. Like tarfile, the default + encoding is sys.getfilesystemencoding() and the default error handling + scheme is 'surrogateescape' (>= 3.2) or 'strict' (< 3.2). + """ + + self.__members = [] + self.__members_dict = {} + self.__fname = filename + self.__fileobj = fileobj + if encoding is None: + encoding = sys.getfilesystemencoding() + self.__encoding = encoding + if errors is None: + if sys.version >= '3.2': + errors = 'surrogateescape' + else: + errors = 'strict' + self.__errors = errors + + if mode == "r": + self.__index_archive() + pass # TODO write support + + def __index_archive(self): + if self.__fname: + fp = open(self.__fname, "rb") + elif self.__fileobj: + fp = self.__fileobj + else: + raise ArError("Unable to open valid file") + + if fp.read(GLOBAL_HEADER_LENGTH) != GLOBAL_HEADER: + raise ArError("Unable to find global header") + + while True: + newmember = ArMember.from_file(fp, self.__fname, + encoding=self.__encoding, + errors=self.__errors) + if not newmember: + break + self.__members.append(newmember) + self.__members_dict[newmember.name] = newmember + if newmember.size % 2 == 0: # even, no padding + fp.seek(newmember.size, 1) # skip to next header + else: + fp.seek(newmember.size + 1 , 1) # skip to next header + + if self.__fname: + fp.close() + + def getmember(self, name): + """ Return the (last occurrence of a) member in the archive whose name + is 'name'. Raise KeyError if no member matches the given name. + + Note that in case of name collisions the only way to retrieve all + members matching a given name is to use getmembers. """ + + return self.__members_dict[name] + + def getmembers(self): + """ Return a list of all members contained in the archive. + + The list has the same order of members in the archive and can contain + duplicate members (i.e. members with the same name) if they are + duplicate in the archive itself. """ + + return self.__members + + members = property(getmembers) + + def getnames(self): + """ Return a list of all member names in the archive. """ + + return [f.name for f in self.__members] + + def extractall(): + """ Not (yet) implemented. """ + + raise NotImplementedError # TODO + + def extract(self, member, path): + """ Not (yet) implemented. """ + + raise NotImplementedError # TODO + + def extractfile(self, member): + """ Return a file object corresponding to the requested member. A member + can be specified either as a string (its name) or as a ArMember + instance. """ + + for m in self.__members: + if isinstance(member, ArMember) and m.name == member.name: + return m + elif member == m.name: + return m + else: + return None + + # container emulation + + def __iter__(self): + """ Iterate over the members of the present ar archive. """ + + return iter(self.__members) + + def __getitem__(self, name): + """ Same as .getmember(name). """ + + return self.getmember(name) + + +class ArMember(object): + """ Member of an ar archive. + + Implements most of a file object interface: read, readline, next, + readlines, seek, tell, close. + + ArMember objects have the following (read-only) properties: + - name member name in an ar archive + - mtime modification time + - owner owner user + - group owner group + - fmode file permissions + - size size in bytes + - fname file name""" + + def __init__(self): + self.__name = None # member name (i.e. filename) in the archive + self.__mtime = None # last modification time + self.__owner = None # owner user + self.__group = None # owner group + self.__fmode = None # permissions + self.__size = None # member size in bytes + self.__fname = None # file name associated with this member + self.__fp = None # file pointer + self.__offset = None # start-of-data offset + self.__end = None # end-of-data offset + + def from_file(fp, fname, encoding=None, errors=None): + """fp is an open File object positioned on a valid file header inside + an ar archive. Return a new ArMember on success, None otherwise. """ + + buf = fp.read(FILE_HEADER_LENGTH) + + if not buf: + return None + + # sanity checks + if len(buf) < FILE_HEADER_LENGTH: + raise IOError("Incorrect header length") + + if buf[58:60] != FILE_MAGIC: + raise IOError("Incorrect file magic") + + if sys.version >= '3': + if encoding is None: + encoding = sys.getfilesystemencoding() + if errors is None: + if sys.version >= '3.2': + errors = 'surrogateescape' + else: + errors = 'strict' + + # http://en.wikipedia.org/wiki/Ar_(Unix) + #from to Name Format + #0 15 File name ASCII + #16 27 File modification date Decimal + #28 33 Owner ID Decimal + #34 39 Group ID Decimal + #40 47 File mode Octal + #48 57 File size in bytes Decimal + #58 59 File magic \140\012 + + # XXX struct.unpack can be used as well here + f = ArMember() + f.__name = buf[0:16].split(b"/")[0].strip() + if sys.version >= '3': + f.__name = f.__name.decode(encoding, errors) + f.__mtime = int(buf[16:28]) + f.__owner = int(buf[28:34]) + f.__group = int(buf[34:40]) + f.__fmode = buf[40:48] # XXX octal value + f.__size = int(buf[48:58]) + + f.__fname = fname + f.__offset = fp.tell() # start-of-data + f.__end = f.__offset + f.__size + + return f + + from_file = staticmethod(from_file) + + # file interface + + # XXX this is not a sequence like file objects + def read(self, size=0): + if self.__fp is None: + self.__fp = open(self.__fname, "rb") + self.__fp.seek(self.__offset) + + cur = self.__fp.tell() + + if size > 0 and size <= self.__end - cur: # there's room + return self.__fp.read(size) + + if cur >= self.__end or cur < self.__offset: + return b'' + + return self.__fp.read(self.__end - cur) + + def readline(self, size=None): + if self.__fp is None: + self.__fp = open(self.__fname, "rb") + self.__fp.seek(self.__offset) + + if size is not None: + buf = self.__fp.readline(size) + if self.__fp.tell() > self.__end: + return b'' + + return buf + + buf = self.__fp.readline() + if self.__fp.tell() > self.__end: + return b'' + else: + return buf + + def readlines(self, sizehint=0): + if self.__fp is None: + self.__fp = open(self.__fname, "rb") + self.__fp.seek(self.__offset) + + buf = None + lines = [] + while True: + buf = self.readline() + if not buf: + break + lines.append(buf) + + return lines + + def seek(self, offset, whence=0): + if self.__fp is None: + self.__fp = open(self.__fname, "rb") + self.__fp.seek(self.__offset) + + if self.__fp.tell() < self.__offset: + self.__fp.seek(self.__offset) + + if whence < 2 and offset + self.__fp.tell() < self.__offset: + raise IOError("Can't seek at %d" % offset) + + if whence == 1: + self.__fp.seek(offset, 1) + elif whence == 0: + self.__fp.seek(self.__offset + offset, 0) + elif whence == 2: + self.__fp.seek(self.__end + offset, 0) + + def tell(self): + if self.__fp is None: + self.__fp = open(self.__fname, "rb") + self.__fp.seek(self.__offset) + + cur = self.__fp.tell() + + if cur < self.__offset: + return 0 + else: + return cur - self.__offset + + def seekable(self): + return True + + def close(self): + if self.__fp is not None: + self.__fp.close() + + def next(self): + return self.readline() + + def __iter__(self): + def nextline(): + line = self.readline() + if line: + yield line + + return iter(nextline()) + + name = property(lambda self: self.__name) + mtime = property(lambda self: self.__mtime) + owner = property(lambda self: self.__owner) + group = property(lambda self: self.__group) + fmode = property(lambda self: self.__fmode) + size = property(lambda self: self.__size) + fname = property(lambda self: self.__fname) + +if __name__ == '__main__': + # test + # ar r test.ar .. + a = ArFile("test.ar") + print("\n".join(a.getnames())) diff --git a/debian/foo/changelog.py b/debian/foo/changelog.py new file mode 100644 index 0000000..e99c06a --- /dev/null +++ b/debian/foo/changelog.py @@ -0,0 +1,609 @@ +# changelog.py -- Python module for Debian changelogs +# Copyright (C) 2006-7 James Westby +# Copyright (C) 2008 Canonical Ltd. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +# The parsing code is based on that from dpkg which is: +# Copyright 1996 Ian Jackson +# Copyright 2005 Frank Lichtenheld +# and licensed under the same license as above. + +"""This module implements facilities to deal with Debian changelogs.""" + +from __future__ import absolute_import + +import os +import pwd +import re +import socket +import warnings +import sys + +import six + +from debian import debian_support + +# Python 3 doesn't have StandardError, but let's avoid changing our +# exception inheritance hierarchy for Python 2. +try: + _base_exception_class = StandardError +except NameError: + _base_exception_class = Exception + +class ChangelogParseError(_base_exception_class): + """Indicates that the changelog could not be parsed""" + is_user_error = True + + def __init__(self, line): + self._line=line + + def __str__(self): + return "Could not parse changelog: "+self._line + +class ChangelogCreateError(_base_exception_class): + """Indicates that changelog could not be created, as all the information + required was not given""" + +class VersionError(_base_exception_class): + """Indicates that the version does not conform to the required format""" + + is_user_error = True + + def __init__(self, version): + self._version=version + + def __str__(self): + return "Could not parse version: "+self._version + +# TODO(jsw): Remove this in favor of using debian_support.Version directly. I +# don't think we gain anything by using this empty subclass. +class Version(debian_support.Version): + """Represents a version of a Debian package.""" + # debian_support.Version now has all the functionality we need + +class ChangeBlock(object): + """Holds all the information about one block from the changelog.""" + + def __init__(self, package=None, version=None, distributions=None, + urgency=None, urgency_comment=None, changes=None, + author=None, date=None, other_pairs=None, encoding='utf-8'): + self._raw_version = None + self._set_version(version) + self.package = package + self.distributions = distributions + self.urgency = urgency or "unknown" + self.urgency_comment = urgency_comment or '' + self._changes = changes + self.author = author + self.date = date + self._trailing = [] + self.other_pairs = other_pairs or {} + self._encoding = encoding + self._no_trailer = False + self._trailer_separator = " " + + def _set_version(self, version): + if version is not None: + self._raw_version = str(version) + + def _get_version(self): + return Version(self._raw_version) + + version = property(_get_version, _set_version) + + def other_keys_normalised(self): + norm_dict = {} + for (key, value) in other_pairs.items(): + key = key[0].upper() + key[1:].lower() + m = xbcs_re.match(key) + if m is None: + key = "XS-%s" % key + norm_dict[key] = value + return norm_dict + + def changes(self): + return self._changes + + def add_trailing_line(self, line): + self._trailing.append(line) + + def add_change(self, change): + if self._changes is None: + self._changes = [change] + else: + #Bit of trickery to keep the formatting nicer with a blank + #line at the end if there is one + changes = self._changes + changes.reverse() + added = False + for i in range(len(changes)): + m = blankline.match(changes[i]) + if m is None: + changes.insert(i, change) + added = True + break + changes.reverse() + if not added: + changes.append(change) + self._changes = changes + + def _format(self): + # TODO(jsw): Switch to StringIO or a list to join at the end. + block = "" + if self.package is None: + raise ChangelogCreateError("Package not specified") + block += self.package + " " + if self._raw_version is None: + raise ChangelogCreateError("Version not specified") + block += "(" + self._raw_version + ") " + if self.distributions is None: + raise ChangelogCreateError("Distribution not specified") + block += self.distributions + "; " + if self.urgency is None: + raise ChangelogCreateError("Urgency not specified") + block += "urgency=" + self.urgency + self.urgency_comment + for (key, value) in self.other_pairs.items(): + block += ", %s=%s" % (key, value) + block += '\n' + if self.changes() is None: + raise ChangelogCreateError("Changes not specified") + for change in self.changes(): + block += change + "\n" + if not self._no_trailer: + if self.author is None: + raise ChangelogCreateError("Author not specified") + if self.date is None: + raise ChangelogCreateError("Date not specified") + block += " -- " + self.author + self._trailer_separator \ + + self.date + "\n" + for line in self._trailing: + block += line + "\n" + return block + + if sys.version >= '3': + __str__ = _format + + def __bytes__(self): + return str(self).encode(self._encoding) + else: + __unicode__ = _format + + def __str__(self): + return unicode(self).encode(self._encoding) + +topline = re.compile(r'^(\w%(name_chars)s*) \(([^\(\) \t]+)\)' + '((\s+%(name_chars)s+)+)\;' + % {'name_chars': '[-+0-9a-z.]'}, + re.IGNORECASE) +blankline = re.compile('^\s*$') +change = re.compile('^\s\s+.*$') +endline = re.compile('^ -- (.*) <(.*)>( ?)((\w+\,\s*)?\d{1,2}\s+\w+\s+' + '\d{4}\s+\d{1,2}:\d\d:\d\d\s+[-+]\d{4}(\s+\([^\\\(\)]\))?\s*)$') +endline_nodetails = re.compile('^ --(?: (.*) <(.*)>( ?)((\w+\,\s*)?\d{1,2}' + '\s+\w+\s+\d{4}\s+\d{1,2}:\d\d:\d\d\s+[-+]\d{4}' + '(\s+\([^\\\(\)]\))?))?\s*$') +keyvalue= re.compile('^([-0-9a-z]+)=\s*(.*\S)$', re.IGNORECASE) +value_re = re.compile('^([-0-9a-z]+)((\s+.*)?)$', re.IGNORECASE) +xbcs_re = re.compile('^X[BCS]+-', re.IGNORECASE) +emacs_variables = re.compile('^(;;\s*)?Local variables:', re.IGNORECASE) +vim_variables = re.compile('^vim:', re.IGNORECASE) +cvs_keyword = re.compile('^\$\w+:.*\$') +comments = re.compile('^\# ') +more_comments = re.compile('^/\*.*\*/') + +old_format_re1 = re.compile('^(\w+\s+\w+\s+\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}' + '\s+[\w\s]*\d{4})\s+(.*)\s+(<|\()(.*)(\)|>)') +old_format_re2 = re.compile('^(\w+\s+\w+\s+\d{1,2},?\s*\d{4})\s+(.*)' + '\s+(<|\()(.*)(\)|>)') +old_format_re3 = re.compile('^(\w[-+0-9a-z.]*) \(([^\(\) \t]+)\)\;?', + re.IGNORECASE) +old_format_re4 = re.compile('^([\w.+-]+)(-| )(\S+) Debian (\S+)', + re.IGNORECASE) +old_format_re5 = re.compile('^Changes from version (.*) to (.*):', + re.IGNORECASE) +old_format_re6 = re.compile('^Changes for [\w.+-]+-[\w.+-]+:?\s*$', + re.IGNORECASE) +old_format_re7 = re.compile('^Old Changelog:\s*$', re.IGNORECASE) +old_format_re8 = re.compile('^(?:\d+:)?\w[\w.+~-]*:?\s*$') + + +class Changelog(object): + """Represents a debian/changelog file.""" + + # TODO(jsw): Avoid masking the 'file' built-in. + def __init__(self, file=None, max_blocks=None, + allow_empty_author=False, strict=True, encoding='utf-8'): + """Initializer. + + Args: + file: The contents of the changelog, either as a str, unicode object, + or an iterator of lines (each of which is either a str or unicode) + max_blocks: The maximum number of blocks to parse from the input. + (Default: no limit) + allow_empty_author: Whether to allow an empty author in the trailer + line of a change block. (Default: False) + strict: Whether to raise an exception if there are errors. (Default: + use a warning) + encoding: If the input is a str or iterator of str, the encoding to + use when interpreting the input. + """ + self._encoding = encoding + self._blocks = [] + self.initial_blank_lines = [] + if file is not None: + try: + self.parse_changelog(file, max_blocks=max_blocks, + allow_empty_author=allow_empty_author, + strict=strict) + except ChangelogParseError: + pass + + def _parse_error(self, message, strict): + if strict: + raise ChangelogParseError(message) + else: + warnings.warn(message) + + def parse_changelog(self, file, max_blocks=None, + allow_empty_author=False, strict=True, encoding=None): + first_heading = "first heading" + next_heading_or_eof = "next heading of EOF" + start_of_change_data = "start of change data" + more_changes_or_trailer = "more change data or trailer" + slurp_to_end = "slurp to end" + + encoding = encoding or self._encoding + + if file is None: + self._parse_error('Empty changelog file.', strict) + return + + self._blocks = [] + self.initial_blank_lines = [] + + current_block = ChangeBlock(encoding=encoding) + changes = [] + + state = first_heading + old_state = None + if isinstance(file, bytes): + file = file.decode(encoding) + if isinstance(file, six.string_types): + # Make sure the changelog file is not empty. + if len(file.strip()) == 0: + self._parse_error('Empty changelog file.', strict) + return + + file = file.splitlines() + for line in file: + if not isinstance(line, six.text_type): + line = line.decode(encoding) + # Support both lists of lines without the trailing newline and + # those with trailing newlines (e.g. when given a file object + # directly) + line = line.rstrip('\n') + if state == first_heading or state == next_heading_or_eof: + top_match = topline.match(line) + blank_match = blankline.match(line) + if top_match is not None: + if (max_blocks is not None + and len(self._blocks) >= max_blocks): + return + current_block.package = top_match.group(1) + current_block._raw_version = top_match.group(2) + current_block.distributions = top_match.group(3).lstrip() + + pairs = line.split(";", 1)[1] + all_keys = {} + other_pairs = {} + for pair in pairs.split(','): + pair = pair.strip() + kv_match = keyvalue.match(pair) + if kv_match is None: + self._parse_error("Invalid key-value " + "pair after ';': %s" % pair, strict) + continue + key = kv_match.group(1) + value = kv_match.group(2) + if key.lower() in all_keys: + self._parse_error("Repeated key-value: " + "%s" % key.lower(), strict) + all_keys[key.lower()] = value + if key.lower() == "urgency": + val_match = value_re.match(value) + if val_match is None: + self._parse_error("Badly formatted " + "urgency value: %s" % value, strict) + else: + current_block.urgency = val_match.group(1) + comment = val_match.group(2) + if comment is not None: + current_block.urgency_comment = comment + else: + other_pairs[key] = value + current_block.other_pairs = other_pairs + state = start_of_change_data + elif blank_match is not None: + if state == first_heading: + self.initial_blank_lines.append(line) + else: + self._blocks[-1].add_trailing_line(line) + else: + emacs_match = emacs_variables.match(line) + vim_match = vim_variables.match(line) + cvs_match = cvs_keyword.match(line) + comments_match = comments.match(line) + more_comments_match = more_comments.match(line) + if ((emacs_match is not None or vim_match is not None) + and state != first_heading): + self._blocks[-1].add_trailing_line(line) + old_state = state + state = slurp_to_end + continue + if (cvs_match is not None or comments_match is not None + or more_comments_match is not None): + if state == first_heading: + self.initial_blank_lines.append(line) + else: + self._blocks[-1].add_trailing_line(line) + continue + if ((old_format_re1.match(line) is not None + or old_format_re2.match(line) is not None + or old_format_re3.match(line) is not None + or old_format_re4.match(line) is not None + or old_format_re5.match(line) is not None + or old_format_re6.match(line) is not None + or old_format_re7.match(line) is not None + or old_format_re8.match(line) is not None) + and state != first_heading): + self._blocks[-1].add_trailing_line(line) + old_state = state + state = slurp_to_end + continue + self._parse_error("Unexpected line while looking " + "for %s: %s" % (state, line), strict) + if state == first_heading: + self.initial_blank_lines.append(line) + else: + self._blocks[-1].add_trailing_line(line) + elif (state == start_of_change_data + or state == more_changes_or_trailer): + change_match = change.match(line) + end_match = endline.match(line) + end_no_details_match = endline_nodetails.match(line) + blank_match = blankline.match(line) + if change_match is not None: + changes.append(line) + state = more_changes_or_trailer + elif end_match is not None: + if end_match.group(3) != ' ': + self._parse_error("Badly formatted trailer " + "line: %s" % line, strict) + current_block._trailer_separator = end_match.group(3) + current_block.author = "%s <%s>" \ + % (end_match.group(1), end_match.group(2)) + current_block.date = end_match.group(4) + current_block._changes = changes + self._blocks.append(current_block) + changes = [] + current_block = ChangeBlock(encoding=encoding) + state = next_heading_or_eof + elif end_no_details_match is not None: + if not allow_empty_author: + self._parse_error("Badly formatted trailer " + "line: %s" % line, strict) + continue + current_block._changes = changes + self._blocks.append(current_block) + changes = [] + current_block = ChangeBlock(encoding=encoding) + state = next_heading_or_eof + elif blank_match is not None: + changes.append(line) + else: + cvs_match = cvs_keyword.match(line) + comments_match = comments.match(line) + more_comments_match = more_comments.match(line) + if (cvs_match is not None or comments_match is not None + or more_comments_match is not None): + changes.append(line) + continue + self._parse_error("Unexpected line while looking " + "for %s: %s" % (state, line), strict) + changes.append(line) + elif state == slurp_to_end: + if old_state == next_heading_or_eof: + self._blocks[-1].add_trailing_line(line) + else: + changes.append(line) + else: + assert False, "Unknown state: %s" % state + + if ((state != next_heading_or_eof and state != slurp_to_end) + or (state == slurp_to_end and old_state != next_heading_or_eof)): + self._parse_error("Found eof where expected %s" % state, + strict) + current_block._changes = changes + current_block._no_trailer = True + self._blocks.append(current_block) + + def get_version(self): + """Return a Version object for the last version""" + return self._blocks[0].version + + def set_version(self, version): + """Set the version of the last changelog block + + version can be a full version string, or a Version object + """ + self._blocks[0].version = Version(version) + + version = property(get_version, set_version, + doc="Version object for last changelog block""") + + ### For convenience, let's expose some of the version properties + full_version = property(lambda self: self.version.full_version) + epoch = property(lambda self: self.version.epoch) + debian_version = property(lambda self: self.version.debian_revision) + debian_revision = property(lambda self: self.version.debian_revision) + upstream_version = property(lambda self: self.version.upstream_version) + + def get_package(self): + """Returns the name of the package in the last version.""" + return self._blocks[0].package + + def set_package(self, package): + self._blocks[0].package = package + + package = property(get_package, set_package, + doc="Name of the package in the last version") + + def get_versions(self): + """Returns a list of version objects that the package went through.""" + return [block.version for block in self._blocks] + + versions = property(get_versions, + doc="List of version objects the package went through") + + def _raw_versions(self): + return [block._raw_version for block in self._blocks] + + def _format(self): + pieces = [] + pieces.append(six.u('\n').join(self.initial_blank_lines)) + for block in self._blocks: + pieces.append(six.text_type(block)) + return six.u('').join(pieces) + + if sys.version >= '3': + __str__ = _format + + def __bytes__(self): + return str(self).encode(self._encoding) + else: + __unicode__ = _format + + def __str__(self): + return unicode(self).encode(self._encoding) + + def __iter__(self): + return iter(self._blocks) + + def __len__(self): + return len(self._blocks) + + def set_distributions(self, distributions): + self._blocks[0].distributions = distributions + distributions = property(lambda self: self._blocks[0].distributions, + set_distributions) + + def set_urgency(self, urgency): + self._blocks[0].urgency = urgency + urgency = property(lambda self: self._blocks[0].urgency, set_urgency) + + def add_change(self, change): + self._blocks[0].add_change(change) + + def set_author(self, author): + self._blocks[0].author = author + author = property(lambda self: self._blocks[0].author, set_author) + + def set_date(self, date): + self._blocks[0].date = date + date = property(lambda self: self._blocks[0].date, set_date) + + def new_block(self, **kwargs): + kwargs.setdefault('encoding', self._encoding) + block = ChangeBlock(**kwargs) + block.add_trailing_line('') + self._blocks.insert(0, block) + + def write_to_open_file(self, file): + file.write(self.__str__()) + + +def get_maintainer(): + """Get the maintainer information in the same manner as dch. + + This function gets the information about the current user for + the maintainer field using environment variables of gecos + informations as approriate. + + It uses the same methods as dch to get the information, namely + DEBEMAIL, DEBFULLNAME, EMAIL, NAME, /etc/mailname and gecos. + + :returns: a tuple of the full name, email pair as strings. + Either of the pair may be None if that value couldn't + be determined. + """ + env = os.environ + regex = re.compile(r"^(.*)\s+<(.*)>$") + + # Split email and name + if 'DEBEMAIL' in env: + match_obj = regex.match(env['DEBEMAIL']) + if match_obj: + if not 'DEBFULLNAME' in env: + env['DEBFULLNAME'] = match_obj.group(1) + env['DEBEMAIL'] = match_obj.group(2) + if 'DEBEMAIL' not in env or 'DEBFULLNAME' not in env: + if 'EMAIL' in env: + match_obj = regex.match(env['EMAIL']) + if match_obj: + if not 'DEBFULLNAME' in env: + env['DEBFULLNAME'] = match_obj.group(1) + env['EMAIL'] = match_obj.group(2) + + # Get maintainer's name + if 'DEBFULLNAME' in env: + maintainer = env['DEBFULLNAME'] + elif 'NAME' in env: + maintainer = env['NAME'] + else: + # Use password database if no data in environment variables + try: + maintainer = re.sub(r',.*', '', pwd.getpwuid(os.getuid()).pw_gecos) + except (KeyError, AttributeError): + maintainer = None + + # Get maintainer's mail address + if 'DEBEMAIL' in env: + email = env['DEBEMAIL'] + elif 'EMAIL' in env: + email = env['EMAIL'] + else: + addr = None + if os.path.exists('/etc/mailname'): + f = open('/etc/mailname') + try: + addr = f.readline().strip() + finally: + f.close() + if not addr: + addr = socket.getfqdn() + if addr: + user = pwd.getpwuid(os.getuid()).pw_name + if not user: + addr = None + else: + addr = "%s@%s" % (user, addr) + + if addr: + email = addr + else: + email = None + + return (maintainer, email) diff --git a/debian/foo/deb822.py b/debian/foo/deb822.py new file mode 100644 index 0000000..bd910d6 --- /dev/null +++ b/debian/foo/deb822.py @@ -0,0 +1,1318 @@ +# vim: fileencoding=utf-8 +# +# A python interface for various rfc822-like formatted files used by Debian +# (.changes, .dsc, Packages, Sources, etc) +# +# Copyright (C) 2005-2006 dann frazier +# Copyright (C) 2006-2010 John Wright +# Copyright (C) 2006 Adeodato Simó +# Copyright (C) 2008 Stefano Zacchiroli +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation, either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from __future__ import absolute_import, print_function + +from debian.deprecation import function_deprecated_by + +try: + import apt_pkg + # This module uses apt_pkg only for its TagFile interface. + apt_pkg.TagFile + _have_apt_pkg = True +except (ImportError, AttributeError): + _have_apt_pkg = False + +import chardet +import os +import re +import subprocess +import sys +import warnings + +try: + from StringIO import StringIO + BytesIO = StringIO +except ImportError: + from io import BytesIO, StringIO +try: + from collections import Mapping, MutableMapping + _mapping_mixin = Mapping + _mutable_mapping_mixin = MutableMapping +except ImportError: + from UserDict import DictMixin + _mapping_mixin = DictMixin + _mutable_mapping_mixin = DictMixin + +import six + +if sys.version >= '3': + import io + def _is_real_file(f): + if not isinstance(f, io.IOBase): + return False + try: + f.fileno() + return True + except (AttributeError, io.UnsupportedOperation): + return False +else: + def _is_real_file(f): + return isinstance(f, file) and hasattr(f, 'fileno') + + +GPGV_DEFAULT_KEYRINGS = frozenset(['/usr/share/keyrings/debian-keyring.gpg']) +GPGV_EXECUTABLE = '/usr/bin/gpgv' + + +class TagSectionWrapper(_mapping_mixin, object): + """Wrap a TagSection object, using its find_raw method to get field values + + This allows us to pick which whitespace to strip off the beginning and end + of the data, so we don't lose leading newlines. + """ + + def __init__(self, section): + self.__section = section + + def __iter__(self): + for key in self.__section.keys(): + if not key.startswith('#'): + yield key + + def __len__(self): + return len([key for key in self.__section.keys() + if not key.startswith('#')]) + + def __getitem__(self, key): + s = self.__section.find_raw(key) + + if s is None: + raise KeyError(key) + + # Get just the stuff after the first ':' + # Could use s.partition if we only supported python >= 2.5 + data = s[s.find(b':')+1:] + + # Get rid of spaces and tabs after the ':', but not newlines, and strip + # off any newline at the end of the data. + return data.lstrip(b' \t').rstrip(b'\n') + + +class OrderedSet(object): + """A set-like object that preserves order when iterating over it + + We use this to keep track of keys in Deb822Dict, because it's much faster + to look up if a key is in a set than in a list. + """ + + def __init__(self, iterable=[]): + self.__set = set() + self.__order = [] + for item in iterable: + self.add(item) + + def add(self, item): + if item not in self: + # set.add will raise TypeError if something's unhashable, so we + # don't have to handle that ourselves + self.__set.add(item) + self.__order.append(item) + + def remove(self, item): + # set.remove will raise KeyError, so we don't need to handle that + # ourselves + self.__set.remove(item) + self.__order.remove(item) + + def __iter__(self): + # Return an iterator of items in the order they were added + return iter(self.__order) + + def __len__(self): + return len(self.__order) + + def __contains__(self, item): + # This is what makes OrderedSet faster than using a list to keep track + # of keys. Lookup in a set is O(1) instead of O(n) for a list. + return item in self.__set + + ### list-like methods + append = add + + def extend(self, iterable): + for item in iterable: + self.add(item) + ### + + +class Deb822Dict(_mutable_mapping_mixin, object): + # Subclassing _mutable_mapping_mixin because we're overriding so much + # dict functionality that subclassing dict requires overriding many more + # than the methods that _mutable_mapping_mixin requires. + """A dictionary-like object suitable for storing RFC822-like data. + + Deb822Dict behaves like a normal dict, except: + - key lookup is case-insensitive + - key order is preserved + - if initialized with a _parsed parameter, it will pull values from + that dictionary-like object as needed (rather than making a copy). + The _parsed dict is expected to be able to handle case-insensitive + keys. + + If _parsed is not None, an optional _fields parameter specifies which keys + in the _parsed dictionary are exposed. + """ + + # See the end of the file for the definition of _strI + + def __init__(self, _dict=None, _parsed=None, _fields=None, + encoding="utf-8"): + self.__dict = {} + self.__keys = OrderedSet() + self.__parsed = None + self.encoding = encoding + + if _dict is not None: + # _dict may be a dict or a list of two-sized tuples + if hasattr(_dict, 'items'): + items = _dict.items() + else: + items = list(_dict) + + try: + for k, v in items: + self[k] = v + except ValueError: + this = len(self.__keys) + len_ = len(items[this]) + raise ValueError('dictionary update sequence element #%d has ' + 'length %d; 2 is required' % (this, len_)) + + if _parsed is not None: + self.__parsed = _parsed + if _fields is None: + self.__keys.extend([ _strI(k) for k in self.__parsed ]) + else: + self.__keys.extend([ _strI(f) for f in _fields if f in self.__parsed ]) + + def _detect_encoding(self, value): + """If value is not already Unicode, decode it intelligently.""" + if isinstance(value, bytes): + try: + return value.decode(self.encoding) + except UnicodeDecodeError as e: + # Evidently, the value wasn't encoded with the encoding the + # user specified. Try detecting it. + warnings.warn('decoding from %s failed; attempting to detect ' + 'the true encoding' % self.encoding, + UnicodeWarning) + result = chardet.detect(value) + try: + return value.decode(result['encoding']) + except UnicodeDecodeError: + raise e + else: + # Assume the rest of the paragraph is in this encoding as + # well (there's no sense in repeating this exercise for + # every field). + self.encoding = result['encoding'] + else: + return value + + ### BEGIN _mutable_mapping_mixin methods + + def __iter__(self): + for key in self.__keys: + yield str(key) + + def __len__(self): + return len(self.__keys) + + def __setitem__(self, key, value): + key = _strI(key) + self.__keys.add(key) + self.__dict[key] = value + + def __getitem__(self, key): + key = _strI(key) + try: + value = self.__dict[key] + except KeyError: + if self.__parsed is not None and key in self.__keys: + value = self.__parsed[key] + else: + raise + + return self._detect_encoding(value) + + def __delitem__(self, key): + key = _strI(key) + self.__keys.remove(key) + try: + del self.__dict[key] + except KeyError: + # If we got this far, the key was in self.__keys, so it must have + # only been in the self.__parsed dict. + pass + + def __contains__(self, key): + key = _strI(key) + return key in self.__keys + + if sys.version < '3': + has_key = __contains__ + + ### END _mutable_mapping_mixin methods + + def __repr__(self): + return '{%s}' % ', '.join(['%r: %r' % (k, v) for k, v in self.items()]) + + def __eq__(self, other): + mykeys = sorted(self) + otherkeys = sorted(other) + if not mykeys == otherkeys: + return False + + for key in mykeys: + if self[key] != other[key]: + return False + + # If we got here, everything matched + return True + + # Overriding __eq__ blocks inheritance of __hash__ in Python 3, and + # instances of this class are not sensibly hashable anyway. + __hash__ = None + + def copy(self): + # Use self.__class__ so this works as expected for subclasses + copy = self.__class__(self) + return copy + + # TODO implement __str__() and make dump() use that? + + +class Deb822(Deb822Dict): + + def __init__(self, sequence=None, fields=None, _parsed=None, + encoding="utf-8"): + """Create a new Deb822 instance. + + :param sequence: a string, or any any object that returns a line of + input each time, normally a file. Alternately, sequence can + be a dict that contains the initial key-value pairs. + + :param fields: if given, it is interpreted as a list of fields that + should be parsed (the rest will be discarded). + + :param _parsed: internal parameter. + + :param encoding: When parsing strings, interpret them in this encoding. + (All values are given back as unicode objects, so an encoding is + necessary in order to properly interpret the strings.) + """ + + if hasattr(sequence, 'items'): + _dict = sequence + sequence = None + else: + _dict = None + Deb822Dict.__init__(self, _dict=_dict, _parsed=_parsed, _fields=fields, + encoding=encoding) + + if sequence is not None: + try: + self._internal_parser(sequence, fields) + except EOFError: + pass + + self.gpg_info = None + + def iter_paragraphs(cls, sequence, fields=None, use_apt_pkg=True, + shared_storage=False, encoding="utf-8"): + """Generator that yields a Deb822 object for each paragraph in sequence. + + :param sequence: same as in __init__. + + :param fields: likewise. + + :param use_apt_pkg: if sequence is a file, apt_pkg will be used + if available to parse the file, since it's much much faster. Set + this parameter to False to disable using apt_pkg. + :param shared_storage: not used, here for historical reasons. Deb822 + objects never use shared storage anymore. + :param encoding: Interpret the paragraphs in this encoding. + (All values are given back as unicode objects, so an encoding is + necessary in order to properly interpret the strings.) + """ + + if _have_apt_pkg and use_apt_pkg and _is_real_file(sequence): + kwargs = {} + if sys.version >= '3': + # bytes=True is supported for both Python 2 and 3, but we + # only actually need it for Python 3, so this saves us from + # having to require a newer version of python-apt for Python + # 2 as well. This allows us to apply our own encoding + # handling, which is more tolerant of mixed-encoding files. + kwargs['bytes'] = True + parser = apt_pkg.TagFile(sequence, **kwargs) + for section in parser: + paragraph = cls(fields=fields, + _parsed=TagSectionWrapper(section), + encoding=encoding) + if paragraph: + yield paragraph + + else: + iterable = iter(sequence) + x = cls(iterable, fields, encoding=encoding) + while len(x) != 0: + yield x + x = cls(iterable, fields, encoding=encoding) + + iter_paragraphs = classmethod(iter_paragraphs) + + ### + + @staticmethod + def _skip_useless_lines(sequence): + """Yields only lines that do not begin with '#'. + + Also skips any blank lines at the beginning of the input. + """ + at_beginning = True + for line in sequence: + # The bytes/str polymorphism required here to support Python 3 + # is unpleasant, but fortunately limited. We need this because + # at this point we might have been given either bytes or + # Unicode, and we haven't yet got to the point where we can try + # to decode a whole paragraph and detect its encoding. + if isinstance(line, bytes): + if line.startswith(b'#'): + continue + else: + if line.startswith('#'): + continue + if at_beginning: + if isinstance(line, bytes): + if not line.rstrip(b'\r\n'): + continue + else: + if not line.rstrip('\r\n'): + continue + at_beginning = False + yield line + + def _internal_parser(self, sequence, fields=None): + # The key is non-whitespace, non-colon characters before any colon. + key_part = r"^(?P[^: \t\n\r\f\v]+)\s*:\s*" + single = re.compile(key_part + r"(?P\S.*?)\s*$") + multi = re.compile(key_part + r"$") + multidata = re.compile(r"^\s(?P.+?)\s*$") + + wanted_field = lambda f: fields is None or f in fields + + if isinstance(sequence, (six.string_types, bytes)): + sequence = sequence.splitlines() + + curkey = None + content = "" + + for line in self.gpg_stripped_paragraph( + self._skip_useless_lines(sequence)): + line = self._detect_encoding(line) + + m = single.match(line) + if m: + if curkey: + self[curkey] = content + + if not wanted_field(m.group('key')): + curkey = None + continue + + curkey = m.group('key') + content = m.group('data') + continue + + m = multi.match(line) + if m: + if curkey: + self[curkey] = content + + if not wanted_field(m.group('key')): + curkey = None + continue + + curkey = m.group('key') + content = "" + continue + + m = multidata.match(line) + if m: + content += '\n' + line # XXX not m.group('data')? + continue + + if curkey: + self[curkey] = content + + def __str__(self): + return self.dump() + + def __unicode__(self): + return self.dump() + + if sys.version >= '3': + def __bytes__(self): + return self.dump().encode(self.encoding) + + # __repr__ is handled by Deb822Dict + + def get_as_string(self, key): + """Return the self[key] as a string (or unicode) + + The default implementation just returns unicode(self[key]); however, + this can be overridden in subclasses (e.g. _multivalued) that can take + special values. + """ + return six.text_type(self[key]) + + def dump(self, fd=None, encoding=None): + """Dump the the contents in the original format + + If fd is None, return a unicode object. + + If fd is not None, attempt to encode the output to the encoding the + object was initialized with, or the value of the encoding argument if + it is not None. This will raise UnicodeEncodeError if the encoding + can't support all the characters in the Deb822Dict values. + """ + + if fd is None: + fd = StringIO() + return_string = True + else: + return_string = False + + if encoding is None: + # Use the encoding we've been using to decode strings with if none + # was explicitly specified + encoding = self.encoding + + for key in self: + value = self.get_as_string(key) + if not value or value[0] == '\n': + # Avoid trailing whitespace after "Field:" if it's on its own + # line or the value is empty. We don't have to worry about the + # case where value == '\n', since we ensure that is not the + # case in __setitem__. + entry = '%s:%s\n' % (key, value) + else: + entry = '%s: %s\n' % (key, value) + if not return_string: + fd.write(entry.encode(encoding)) + else: + fd.write(entry) + if return_string: + return fd.getvalue() + + ### + + def is_single_line(self, s): + if s.count("\n"): + return False + else: + return True + + isSingleLine = function_deprecated_by(is_single_line) + + def is_multi_line(self, s): + return not self.is_single_line(s) + + isMultiLine = function_deprecated_by(is_multi_line) + + def _merge_fields(self, s1, s2): + if not s2: + return s1 + if not s1: + return s2 + + if self.is_single_line(s1) and self.is_single_line(s2): + ## some fields are delimited by a single space, others + ## a comma followed by a space. this heuristic assumes + ## that there are multiple items in one of the string fields + ## so that we can pick up on the delimiter being used + delim = ' ' + if (s1 + s2).count(', '): + delim = ', ' + + L = sorted((s1 + delim + s2).split(delim)) + + prev = merged = L[0] + + for item in L[1:]: + ## skip duplicate entries + if item == prev: + continue + merged = merged + delim + item + prev = item + return merged + + if self.is_multi_line(s1) and self.is_multi_line(s2): + for item in s2.splitlines(True): + if item not in s1.splitlines(True): + s1 = s1 + "\n" + item + return s1 + + raise ValueError + + _mergeFields = function_deprecated_by(_merge_fields) + + def merge_fields(self, key, d1, d2=None): + ## this method can work in two ways - abstract that away + if d2 == None: + x1 = self + x2 = d1 + else: + x1 = d1 + x2 = d2 + + ## we only have to do work if both objects contain our key + ## otherwise, we just take the one that does, or raise an + ## exception if neither does + if key in x1 and key in x2: + merged = self._mergeFields(x1[key], x2[key]) + elif key in x1: + merged = x1[key] + elif key in x2: + merged = x2[key] + else: + raise KeyError + + ## back to the two different ways - if this method was called + ## upon an object, update that object in place. + ## return nothing in this case, to make the author notice a + ## problem if she assumes the object itself will not be modified + if d2 == None: + self[key] = merged + return None + + return merged + + mergeFields = function_deprecated_by(merge_fields) + + def split_gpg_and_payload(sequence): + """Return a (gpg_pre, payload, gpg_post) tuple + + Each element of the returned tuple is a list of lines (with trailing + whitespace stripped). + """ + + gpg_pre_lines = [] + lines = [] + gpg_post_lines = [] + state = b'SAFE' + gpgre = re.compile(br'^-----(?PBEGIN|END) PGP (?P[^-]+)-----$') + blank_line = re.compile(b'^$') + first_line = True + + for line in sequence: + # Some consumers of this method require bytes (encoding + # detection and signature checking). However, we might have + # been given a file opened in text mode, in which case it's + # simplest to encode to bytes. + if sys.version >= '3' and isinstance(line, str): + line = line.encode() + + line = line.strip(b'\r\n') + + # skip initial blank lines, if any + if first_line: + if blank_line.match(line): + continue + else: + first_line = False + + m = gpgre.match(line) + + if not m: + if state == b'SAFE': + if not blank_line.match(line): + lines.append(line) + else: + if not gpg_pre_lines: + # There's no gpg signature, so we should stop at + # this blank line + break + elif state == b'SIGNED MESSAGE': + if blank_line.match(line): + state = b'SAFE' + else: + gpg_pre_lines.append(line) + elif state == b'SIGNATURE': + gpg_post_lines.append(line) + else: + if m.group('action') == b'BEGIN': + state = m.group('what') + elif m.group('action') == b'END': + gpg_post_lines.append(line) + break + if not blank_line.match(line): + if not lines: + gpg_pre_lines.append(line) + else: + gpg_post_lines.append(line) + + if len(lines): + return (gpg_pre_lines, lines, gpg_post_lines) + else: + raise EOFError('only blank lines found in input') + + split_gpg_and_payload = staticmethod(split_gpg_and_payload) + + def gpg_stripped_paragraph(cls, sequence): + return cls.split_gpg_and_payload(sequence)[1] + + gpg_stripped_paragraph = classmethod(gpg_stripped_paragraph) + + def get_gpg_info(self, keyrings=None): + """Return a GpgInfo object with GPG signature information + + This method will raise ValueError if the signature is not available + (e.g. the original text cannot be found). + + :param keyrings: list of keyrings to use (see GpgInfo.from_sequence) + """ + + # raw_text is saved (as a string) only for Changes and Dsc (see + # _gpg_multivalued.__init__) which is small compared to Packages or + # Sources which contain no signature + if not hasattr(self, 'raw_text'): + raise ValueError("original text cannot be found") + + if self.gpg_info is None: + self.gpg_info = GpgInfo.from_sequence(self.raw_text, + keyrings=keyrings) + + return self.gpg_info + + def validate_input(self, key, value): + """Raise ValueError if value is not a valid value for key + + Subclasses that do interesting things for different keys may wish to + override this method. + """ + + # The value cannot end in a newline (if it did, dumping the object + # would result in multiple stanzas) + if value.endswith('\n'): + raise ValueError("value must not end in '\\n'") + + # Make sure there are no blank lines (actually, the first one is + # allowed to be blank, but no others), and each subsequent line starts + # with whitespace + for line in value.splitlines()[1:]: + if not line: + raise ValueError("value must not have blank lines") + if not line[0].isspace(): + raise ValueError("each line must start with whitespace") + + def __setitem__(self, key, value): + self.validate_input(key, value) + Deb822Dict.__setitem__(self, key, value) + + +# XXX check what happens if input contains more that one signature +class GpgInfo(dict): + """A wrapper around gnupg parsable output obtained via --status-fd + + This class is really a dictionary containing parsed output from gnupg plus + some methods to make sense of the data. + Keys are keywords and values are arguments suitably splitted. + See /usr/share/doc/gnupg/DETAILS.gz""" + + # keys with format "key keyid uid" + uidkeys = ('GOODSIG', 'EXPSIG', 'EXPKEYSIG', 'REVKEYSIG', 'BADSIG') + + def valid(self): + """Is the signature valid?""" + return 'GOODSIG' in self or 'VALIDSIG' in self + +# XXX implement as a property? +# XXX handle utf-8 %-encoding + def uid(self): + """Return the primary ID of the signee key, None is not available""" + pass + + @classmethod + def from_output(cls, out, err=None): + """Create a new GpgInfo object from gpg(v) --status-fd output (out) and + optionally collect stderr as well (err). + + Both out and err can be lines in newline-terminated sequence or regular strings.""" + + n = cls() + + if isinstance(out, six.string_types): + out = out.split('\n') + if isinstance(err, six.string_types): + err = err.split('\n') + + n.out = out + n.err = err + + header = '[GNUPG:] ' + for l in out: + if not l.startswith(header): + continue + + l = l[len(header):] + l = l.strip('\n') + + # str.partition() would be better, 2.5 only though + s = l.find(' ') + key = l[:s] + if key in cls.uidkeys: + # value is "keyid UID", don't split UID + value = l[s+1:].split(' ', 1) + else: + value = l[s+1:].split(' ') + + n[key] = value + return n + + @classmethod + def from_sequence(cls, sequence, keyrings=None, executable=None): + """Create a new GpgInfo object from the given sequence. + + :param sequence: sequence of lines of bytes or a single byte string + + :param keyrings: list of keyrings to use (default: + ['/usr/share/keyrings/debian-keyring.gpg']) + + :param executable: list of args for subprocess.Popen, the first element + being the gpgv executable (default: ['/usr/bin/gpgv']) + """ + + keyrings = keyrings or GPGV_DEFAULT_KEYRINGS + executable = executable or [GPGV_EXECUTABLE] + + # XXX check for gpg as well and use --verify accordingly? + args = list(executable) + #args.extend(["--status-fd", "1", "--no-default-keyring"]) + args.extend(["--status-fd", "1"]) + for k in keyrings: + args.extend(["--keyring", k]) + + if "--keyring" not in args: + raise IOError("cannot access any of the given keyrings") + + p = subprocess.Popen(args, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=False) + # XXX what to do with exit code? + + if isinstance(sequence, bytes): + inp = sequence + else: + inp = cls._get_full_bytes(sequence) + out, err = p.communicate(inp) + + return cls.from_output(out.decode('utf-8'), + err.decode('utf-8')) + + @staticmethod + def _get_full_bytes(sequence): + """Return a byte string from a sequence of lines of bytes. + + This method detects if the sequence's lines are newline-terminated, and + constructs the byte string appropriately. + """ + # Peek at the first line to see if it's newline-terminated. + sequence_iter = iter(sequence) + try: + first_line = next(sequence_iter) + except StopIteration: + return b"" + join_str = b'\n' + if first_line.endswith(b'\n'): + join_str = b'' + return first_line + join_str + join_str.join(sequence_iter) + + @classmethod + def from_file(cls, target, *args, **kwargs): + """Create a new GpgInfo object from the given file. + + See GpgInfo.from_sequence. + """ + with open(target, 'rb') as target_file: + return cls.from_sequence(target_file, *args, **kwargs) + + +class PkgRelation(object): + """Inter-package relationships + + Structured representation of the relationships of a package to another, + i.e. of what can appear in a Deb882 field like Depends, Recommends, + Suggests, ... (see Debian Policy 7.1). + """ + + # XXX *NOT* a real dependency parser, and that is not even a goal here, we + # just parse as much as we need to split the various parts composing a + # dependency, checking their correctness wrt policy is out of scope + __dep_RE = re.compile( \ + r'^\s*(?P[a-zA-Z0-9.+\-]{2,})(\s*\(\s*(?P[>=<]+)\s*(?P[0-9a-zA-Z:\-+~.]+)\s*\))?(\s*\[(?P[\s!\w\-]+)\])?\s*$') + __comma_sep_RE = re.compile(r'\s*,\s*') + __pipe_sep_RE = re.compile(r'\s*\|\s*') + __blank_sep_RE = re.compile(r'\s*') + + @classmethod + def parse_relations(cls, raw): + """Parse a package relationship string (i.e. the value of a field like + Depends, Recommends, Build-Depends ...) + """ + def parse_archs(raw): + # assumption: no space beween '!' and architecture name + archs = [] + for arch in cls.__blank_sep_RE.split(raw.strip()): + if len(arch) and arch[0] == '!': + archs.append((False, arch[1:])) + else: + archs.append((True, arch)) + return archs + + def parse_rel(raw): + match = cls.__dep_RE.match(raw) + if match: + parts = match.groupdict() + d = { 'name': parts['name'] } + if not (parts['relop'] is None or parts['version'] is None): + d['version'] = (parts['relop'], parts['version']) + else: + d['version'] = None + if parts['archs'] is None: + d['arch'] = None + else: + d['arch'] = parse_archs(parts['archs']) + return d + else: + print('deb822.py: WARNING: cannot parse package' \ + ' relationship "%s", returning it raw' % raw, + file=sys.stderr) + return { 'name': raw, 'version': None, 'arch': None } + + tl_deps = cls.__comma_sep_RE.split(raw.strip()) # top-level deps + cnf = map(cls.__pipe_sep_RE.split, tl_deps) + return [[parse_rel(or_dep) for or_dep in or_deps] for or_deps in cnf] + + @staticmethod + def str(rels): + """Format to string structured inter-package relationships + + Perform the inverse operation of parse_relations, returning a string + suitable to be written in a package stanza. + """ + def pp_arch(arch_spec): + (excl, arch) = arch_spec + if excl: + return arch + else: + return '!' + arch + + def pp_atomic_dep(dep): + s = dep['name'] + if dep.get('version') is not None: + s += ' (%s %s)' % dep['version'] + if dep.get('arch') is not None: + s += ' [%s]' % ' '.join(map(pp_arch, dep['arch'])) + return s + + pp_or_dep = lambda deps: ' | '.join(map(pp_atomic_dep, deps)) + return ', '.join(map(pp_or_dep, rels)) + + +class _lowercase_dict(dict): + """Dictionary wrapper which lowercase keys upon lookup.""" + + def __getitem__(self, key): + return dict.__getitem__(self, key.lower()) + + +class _PkgRelationMixin(object): + """Package relationship mixin + + Inheriting from this mixin you can extend a Deb882 object with attributes + letting you access inter-package relationship in a structured way, rather + than as strings. For example, while you can usually use pkg['depends'] to + obtain the Depends string of package pkg, mixing in with this class you + gain pkg.depends to access Depends as a Pkgrel instance + + To use, subclass _PkgRelationMixin from a class with a _relationship_fields + attribute. It should be a list of field names for which structured access + is desired; for each of them a method wild be added to the inherited class. + The method name will be the lowercase version of field name; '-' will be + mangled as '_'. The method would return relationships in the same format of + the PkgRelation' relations property. + + See Packages and Sources as examples. + """ + + def __init__(self, *args, **kwargs): + self.__relations = _lowercase_dict({}) + self.__parsed_relations = False + for name in self._relationship_fields: + # To avoid reimplementing Deb822 key lookup logic we use a really + # simple dict subclass which just lowercase keys upon lookup. Since + # dictionary building happens only here, we ensure that all keys + # are in fact lowercase. + # With this trick we enable users to use the same key (i.e. field + # name) of Deb822 objects on the dictionary returned by the + # relations property. + keyname = name.lower() + if name in self: + self.__relations[keyname] = None # lazy value + # all lazy values will be expanded before setting + # __parsed_relations to True + else: + self.__relations[keyname] = [] + + @property + def relations(self): + """Return a dictionary of inter-package relationships among the current + and other packages. + + Dictionary keys depend on the package kind. Binary packages have keys + like 'depends', 'recommends', ... while source packages have keys like + 'build-depends', 'build-depends-indep' and so on. See the Debian policy + for the comprehensive field list. + + Dictionary values are package relationships returned as lists of lists + of dictionaries (see below for some examples). + + The encoding of package relationships is as follows: + - the top-level lists corresponds to the comma-separated list of + Deb822, their components form a conjuction, i.e. they have to be + AND-ed together + - the inner lists corresponds to the pipe-separated list of Deb822, + their components form a disjunction, i.e. they have to be OR-ed + together + - member of the inner lists are dictionaries with the following keys: + - name: package (or virtual package) name + - version: A pair if the relationship is + versioned, None otherwise. operator is one of "<<", + "<=", "=", ">=", ">>"; version is the given version as + a string. + - arch: A list of pairs if the + relationship is architecture specific, None otherwise. + Polarity is a boolean (false if the architecture is + negated with "!", true otherwise), architecture the + Debian archtiecture name as a string. + + Examples: + + "emacs | emacsen, make, debianutils (>= 1.7)" becomes + [ [ {'name': 'emacs'}, {'name': 'emacsen'} ], + [ {'name': 'make'} ], + [ {'name': 'debianutils', 'version': ('>=', '1.7')} ] ] + + "tcl8.4-dev, procps [!hurd-i386]" becomes + [ [ {'name': 'tcl8.4-dev'} ], + [ {'name': 'procps', 'arch': (false, 'hurd-i386')} ] ] + """ + if not self.__parsed_relations: + lazy_rels = filter(lambda n: self.__relations[n] is None, + self.__relations.keys()) + for n in lazy_rels: + self.__relations[n] = PkgRelation.parse_relations(self[n]) + self.__parsed_relations = True + return self.__relations + + +class _multivalued(Deb822): + """A class with (R/W) support for multivalued fields. + + To use, create a subclass with a _multivalued_fields attribute. It should + be a dictionary with *lower-case* keys, with lists of human-readable + identifiers of the fields as the values. Please see Dsc, Changes, and + PdiffIndex as examples. + """ + + def __init__(self, *args, **kwargs): + Deb822.__init__(self, *args, **kwargs) + + for field, fields in self._multivalued_fields.items(): + try: + contents = self[field] + except KeyError: + continue + + if self.is_multi_line(contents): + self[field] = [] + updater_method = self[field].append + else: + self[field] = Deb822Dict() + updater_method = self[field].update + + for line in filter(None, contents.splitlines()): + updater_method(Deb822Dict(zip(fields, line.split()))) + + def validate_input(self, key, value): + if key.lower() in self._multivalued_fields: + # It's difficult to write a validator for multivalued fields, and + # basically futile, since we allow mutable lists. In any case, + # with sanity checking in get_as_string, we shouldn't ever output + # unparseable data. + pass + else: + Deb822.validate_input(self, key, value) + + def get_as_string(self, key): + keyl = key.lower() + if keyl in self._multivalued_fields: + fd = StringIO() + if hasattr(self[key], 'keys'): # single-line + array = [ self[key] ] + else: # multi-line + fd.write("\n") + array = self[key] + + order = self._multivalued_fields[keyl] + try: + field_lengths = self._fixed_field_lengths + except AttributeError: + field_lengths = {} + for item in array: + for x in order: + raw_value = six.text_type(item[x]) + try: + length = field_lengths[keyl][x] + except KeyError: + value = raw_value + else: + value = (length - len(raw_value)) * " " + raw_value + if "\n" in value: + raise ValueError("'\\n' not allowed in component of " + "multivalued field %s" % key) + fd.write(" %s" % value) + fd.write("\n") + return fd.getvalue().rstrip("\n") + else: + return Deb822.get_as_string(self, key) + + +class _gpg_multivalued(_multivalued): + """A _multivalued class that can support gpg signed objects + + This class's feature is that it stores the raw text before parsing so that + gpg can verify the signature. Use it just like you would use the + _multivalued class. + + This class only stores raw text if it is given a raw string, or if it + detects a gpg signature when given a file or sequence of lines (see + Deb822.split_gpg_and_payload for details). + """ + + def __init__(self, *args, **kwargs): + try: + sequence = args[0] + except IndexError: + sequence = kwargs.get("sequence", None) + + if sequence is not None: + if isinstance(sequence, bytes): + self.raw_text = sequence + elif isinstance(sequence, six.string_types): + # If the file is really in some other encoding, then this + # probably won't verify correctly, but this is the best we + # can reasonably manage. For accurate verification, the + # file should be opened in binary mode. + self.raw_text = sequence.encode('utf-8') + elif hasattr(sequence, "items"): + # sequence is actually a dict(-like) object, so we don't have + # the raw text. + pass + else: + try: + gpg_pre_lines, lines, gpg_post_lines = \ + self.split_gpg_and_payload(sequence) + except EOFError: + # Empty input + gpg_pre_lines = lines = gpg_post_lines = [] + if gpg_pre_lines and gpg_post_lines: + raw_text = BytesIO() + raw_text.write(b"\n".join(gpg_pre_lines)) + raw_text.write(b"\n\n") + raw_text.write(b"\n".join(lines)) + raw_text.write(b"\n\n") + raw_text.write(b"\n".join(gpg_post_lines)) + self.raw_text = raw_text.getvalue() + try: + args = list(args) + args[0] = lines + except IndexError: + kwargs["sequence"] = lines + + _multivalued.__init__(self, *args, **kwargs) + + +class Dsc(_gpg_multivalued): + _multivalued_fields = { + "files": [ "md5sum", "size", "name" ], + "checksums-sha1": ["sha1", "size", "name"], + "checksums-sha256": ["sha256", "size", "name"], + } + + +class Changes(_gpg_multivalued): + _multivalued_fields = { + "files": [ "md5sum", "size", "section", "priority", "name" ], + "checksums-sha1": ["sha1", "size", "name"], + "checksums-sha256": ["sha256", "size", "name"], + } + + def get_pool_path(self): + """Return the path in the pool where the files would be installed""" + + # This is based on the section listed for the first file. While + # it is possible, I think, for a package to provide files in multiple + # sections, I haven't seen it in practice. In any case, this should + # probably detect such a situation and complain, or return a list... + + s = self['files'][0]['section'] + + try: + section, subsection = s.split('/') + except ValueError: + # main is implicit + section = 'main' + + if self['source'].startswith('lib'): + subdir = self['source'][:4] + else: + subdir = self['source'][0] + + return 'pool/%s/%s/%s' % (section, subdir, self['source']) + + +class PdiffIndex(_multivalued): + _multivalued_fields = { + "sha1-current": [ "SHA1", "size" ], + "sha1-history": [ "SHA1", "size", "date" ], + "sha1-patches": [ "SHA1", "size", "date" ], + } + + @property + def _fixed_field_lengths(self): + fixed_field_lengths = {} + for key in self._multivalued_fields: + if hasattr(self[key], 'keys'): + # Not multi-line -- don't need to compute the field length for + # this one + continue + length = self._get_size_field_length(key) + fixed_field_lengths[key] = {"size": length} + return fixed_field_lengths + + def _get_size_field_length(self, key): + lengths = [len(str(item['size'])) for item in self[key]] + return max(lengths) + + +class Release(_multivalued): + """Represents a Release file + + Set the size_field_behavior attribute to "dak" to make the size field + length only as long as the longest actual value. The default, + "apt-ftparchive" makes the field 16 characters long regardless. + """ + # FIXME: Add support for detecting the behavior of the input, if + # constructed from actual 822 text. + + _multivalued_fields = { + "md5sum": [ "md5sum", "size", "name" ], + "sha1": [ "sha1", "size", "name" ], + "sha256": [ "sha256", "size", "name" ], + } + + __size_field_behavior = "apt-ftparchive" + def set_size_field_behavior(self, value): + if value not in ["apt-ftparchive", "dak"]: + raise ValueError("size_field_behavior must be either " + "'apt-ftparchive' or 'dak'") + else: + self.__size_field_behavior = value + size_field_behavior = property(lambda self: self.__size_field_behavior, + set_size_field_behavior) + + @property + def _fixed_field_lengths(self): + fixed_field_lengths = {} + for key in self._multivalued_fields: + length = self._get_size_field_length(key) + fixed_field_lengths[key] = {"size": length} + return fixed_field_lengths + + def _get_size_field_length(self, key): + if self.size_field_behavior == "apt-ftparchive": + return 16 + elif self.size_field_behavior == "dak": + lengths = [len(str(item['size'])) for item in self[key]] + return max(lengths) + + +class Sources(Dsc, _PkgRelationMixin): + """Represent an APT source package list""" + + _relationship_fields = [ 'build-depends', 'build-depends-indep', + 'build-conflicts', 'build-conflicts-indep', 'binary' ] + + def __init__(self, *args, **kwargs): + Dsc.__init__(self, *args, **kwargs) + _PkgRelationMixin.__init__(self, *args, **kwargs) + + +class Packages(Deb822, _PkgRelationMixin): + """Represent an APT binary package list""" + + _relationship_fields = [ 'depends', 'pre-depends', 'recommends', + 'suggests', 'breaks', 'conflicts', 'provides', 'replaces', + 'enhances' ] + + def __init__(self, *args, **kwargs): + Deb822.__init__(self, *args, **kwargs) + _PkgRelationMixin.__init__(self, *args, **kwargs) + + +class _CaseInsensitiveString(str): + """Case insensitive string. + """ + + def __new__(cls, str_): + s = str.__new__(cls, str_) + s.str_lower = str_.lower() + s.str_lower_hash = hash(s.str_lower) + return s + + def __hash__(self): + return self.str_lower_hash + + def __eq__(self, other): + return self.str_lower == other.lower() + + def lower(self): + return self.str_lower + + +_strI = _CaseInsensitiveString diff --git a/debian/foo/debfile.py b/debian/foo/debfile.py new file mode 100644 index 0000000..a728a77 --- /dev/null +++ b/debian/foo/debfile.py @@ -0,0 +1,325 @@ +# DebFile: a Python representation of Debian .deb binary packages. +# Copyright (C) 2007-2008 Stefano Zacchiroli +# Copyright (C) 2007 Filippo Giunchedi +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import absolute_import, print_function + +import gzip +import tarfile +import sys + +from debian.arfile import ArFile, ArError +from debian.changelog import Changelog +from debian.deb822 import Deb822 + +DATA_PART = 'data.tar' # w/o extension +CTRL_PART = 'control.tar' +PART_EXTS = ['gz', 'bz2'] # possible extensions +INFO_PART = 'debian-binary' +MAINT_SCRIPTS = ['preinst', 'postinst', 'prerm', 'postrm', 'config'] + +CONTROL_FILE = 'control' +CHANGELOG_NATIVE = 'usr/share/doc/%s/changelog.gz' # with package stem +CHANGELOG_DEBIAN = 'usr/share/doc/%s/changelog.Debian.gz' +MD5_FILE = 'md5sums' + + +class DebError(ArError): + pass + + +class DebPart(object): + """'Part' of a .deb binary package. + + A .deb package is considered as made of 2 parts: a 'data' part + (corresponding to the 'data.tar.gz' archive embedded in a .deb) and a + 'control' part (the 'control.tar.gz' archive). Each of them is represented + by an instance of this class. Each archive should be a compressed tar + archive; supported compression formats are: .tar.gz, .tar.bz2 . + + When referring to file members of the underlying .tar.gz archive, file + names can be specified in one of 3 formats "file", "./file", "/file". In + all cases the file is considered relative to the root of the archive. For + the control part the preferred mechanism is the first one (as in + deb.control.get_content('control') ); for the data part the preferred + mechanism is the third one (as in deb.data.get_file('/etc/vim/vimrc') ). + """ + + def __init__(self, member): + self.__member = member # arfile.ArMember file member + self.__tgz = None + + def tgz(self): + """Return a TarFile object corresponding to this part of a .deb + package. + + Despite the name, this method gives access to various kind of + compressed tar archives, not only gzipped ones. + """ + + if self.__tgz is None: + name = self.__member.name + if name.endswith('.gz'): + gz = gzip.GzipFile(fileobj=self.__member, mode='r') + self.__tgz = tarfile.TarFile(fileobj=gz, mode='r') + elif name.endswith('.bz2'): + # Tarfile's __init__ doesn't allow for r:bz2 modes, but the + # open() classmethod does ... + self.__tgz = tarfile.open(fileobj=self.__member, mode='r:bz2') + else: + raise DebError("part '%s' has unexpected extension" % name) + return self.__tgz + + @staticmethod + def __normalize_member(fname): + """ try (not so hard) to obtain a member file name in a form relative + to the .tar.gz root and with no heading '.' """ + + if fname.startswith('./'): + fname = fname[2:] + elif fname.startswith('/'): + fname = fname[1:] + return fname + + # XXX in some of the following methods, compatibility among >= 2.5 and << + # 2.5 python versions had to be taken into account. TarFile << 2.5 indeed + # was buggied and returned member file names with an heading './' only for + # the *first* file member. TarFile >= 2.5 fixed this and has the heading + # './' for all file members. + + def has_file(self, fname): + """Check if this part contains a given file name.""" + + fname = DebPart.__normalize_member(fname) + names = self.tgz().getnames() + return (('./' + fname in names) \ + or (fname in names)) # XXX python << 2.5 TarFile compatibility + + def get_file(self, fname, encoding=None, errors=None): + """Return a file object corresponding to a given file name. + + If encoding is given, then the file object will return Unicode data; + otherwise, it will return binary data. + """ + + fname = DebPart.__normalize_member(fname) + try: + fobj = self.tgz().extractfile('./' + fname) + except KeyError: # XXX python << 2.5 TarFile compatibility + fobj = self.tgz().extractfile(fname) + if encoding is not None: + if sys.version >= '3': + import io + if not hasattr(fobj, 'flush'): + # XXX http://bugs.python.org/issue13815 + fobj.flush = lambda: None + return io.TextIOWrapper(fobj, encoding=encoding, errors=errors) + else: + import codecs + if errors is None: + errors = 'strict' + return codecs.EncodedFile(fobj, encoding, errors=errors) + else: + return fobj + + def get_content(self, fname, encoding=None, errors=None): + """Return the string content of a given file, or None (e.g. for + directories). + + If encoding is given, then the content will be a Unicode object; + otherwise, it will contain binary data. + """ + + f = self.get_file(fname, encoding=encoding, errors=errors) + content = None + if f: # can be None for non regular or link files + content = f.read() + f.close() + return content + + # container emulation + + def __iter__(self): + return iter(self.tgz().getnames()) + + def __contains__(self, fname): + return self.has_file(fname) + + if sys.version < '3': + def has_key(self, fname): + return self.has_file(fname) + + def __getitem__(self, fname): + return self.get_content(fname) + + def close(self): + self.__member.close() + + +class DebData(DebPart): + + pass + + +class DebControl(DebPart): + + def scripts(self): + """ Return a dictionary of maintainer scripts (postinst, prerm, ...) + mapping script names to script text. """ + + scripts = {} + for fname in MAINT_SCRIPTS: + if self.has_file(fname): + scripts[fname] = self.get_content(fname) + + return scripts + + def debcontrol(self): + """ Return the debian/control as a Deb822 (a Debian-specific dict-like + class) object. + + For a string representation of debian/control try + .get_content('control') """ + + return Deb822(self.get_content(CONTROL_FILE)) + + def md5sums(self, encoding=None, errors=None): + """ Return a dictionary mapping filenames (of the data part) to + md5sums. Fails if the control part does not contain a 'md5sum' file. + + Keys of the returned dictionary are the left-hand side values of lines + in the md5sums member of control.tar.gz, usually file names relative to + the file system root (without heading '/' or './'). + + The returned keys are Unicode objects if an encoding is specified, + otherwise binary. The returned values are always Unicode.""" + + if not self.has_file(MD5_FILE): + raise DebError("'%s' file not found, can't list MD5 sums" % + MD5_FILE) + + md5_file = self.get_file(MD5_FILE, encoding=encoding, errors=errors) + sums = {} + if encoding is None: + newline = b'\r\n' + else: + newline = '\r\n' + for line in md5_file.readlines(): + # we need to support spaces in filenames, .split() is not enough + md5, fname = line.rstrip(newline).split(None, 1) + if sys.version >= '3' and isinstance(md5, bytes): + sums[fname] = md5.decode() + else: + sums[fname] = md5 + md5_file.close() + return sums + + +class DebFile(ArFile): + """Representation of a .deb file (a Debian binary package) + + DebFile objects have the following (read-only) properties: + - version debian .deb file format version (not related with the + contained package version), 2.0 at the time of writing + for all .deb packages in the Debian archive + - data DebPart object corresponding to the data.tar.gz (or + other compressed tar) archive contained in the .deb + file + - control DebPart object corresponding to the control.tar.gz (or + other compressed tar) archive contained in the .deb + file + """ + + def __init__(self, filename=None, mode='r', fileobj=None): + ArFile.__init__(self, filename, mode, fileobj) + actual_names = set(self.getnames()) + + def compressed_part_name(basename): + global PART_EXTS + candidates = [ '%s.%s' % (basename, ext) for ext in PART_EXTS ] + parts = actual_names.intersection(set(candidates)) + if not parts: + raise DebError("missing required part in given .deb" \ + " (expected one of: %s)" % candidates) + elif len(parts) > 1: + raise DebError("too many parts in given .deb" \ + " (was looking for only one of: %s)" % candidates) + else: # singleton list + return list(parts)[0] + + if not INFO_PART in actual_names: + raise DebError("missing required part in given .deb" \ + " (expected: '%s')" % INFO_PART) + + self.__parts = {} + self.__parts[CTRL_PART] = DebControl(self.getmember( + compressed_part_name(CTRL_PART))) + self.__parts[DATA_PART] = DebData(self.getmember( + compressed_part_name(DATA_PART))) + self.__pkgname = None # updated lazily by __updatePkgName + + f = self.getmember(INFO_PART) + self.__version = f.read().strip() + f.close() + + def __updatePkgName(self): + self.__pkgname = self.debcontrol()['package'] + + version = property(lambda self: self.__version) + data = property(lambda self: self.__parts[DATA_PART]) + control = property(lambda self: self.__parts[CTRL_PART]) + + # proxy methods for the appropriate parts + + def debcontrol(self): + """ See .control.debcontrol() """ + return self.control.debcontrol() + + def scripts(self): + """ See .control.scripts() """ + return self.control.scripts() + + def md5sums(self, encoding=None, errors=None): + """ See .control.md5sums() """ + return self.control.md5sums(encoding=encoding, errors=errors) + + def changelog(self): + """ Return a Changelog object for the changelog.Debian.gz of the + present .deb package. Return None if no changelog can be found. """ + + if self.__pkgname is None: + self.__updatePkgName() + + for fname in [ CHANGELOG_DEBIAN % self.__pkgname, + CHANGELOG_NATIVE % self.__pkgname ]: + if self.data.has_file(fname): + gz = gzip.GzipFile(fileobj=self.data.get_file(fname)) + raw_changelog = gz.read() + gz.close() + return Changelog(raw_changelog) + return None + + def close(self): + self.control.close() + self.data.close() + + +if __name__ == '__main__': + import sys + deb = DebFile(filename=sys.argv[1]) + tgz = deb.control.tgz() + print(tgz.getmember('control')) + diff --git a/debian/foo/debtags.py b/debian/foo/debtags.py new file mode 100644 index 0000000..9dca3e9 --- /dev/null +++ b/debian/foo/debtags.py @@ -0,0 +1,513 @@ + +# debtags.py -- Access and manipulate Debtags information +# Copyright (C) 2006-2007 Enrico Zini +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import absolute_import, print_function + +import re +try: + import cPickle as pickle +except ImportError: + import pickle + +import six + +from debian.deprecation import function_deprecated_by + +def parse_tags(input): + lre = re.compile(r"^(.+?)(?::?\s*|:\s+(.+?)\s*)$") + for line in input: + # Is there a way to remove the last character of a line that does not + # make a copy of the entire line? + m = lre.match(line) + pkgs = set(m.group(1).split(', ')) + if m.group(2): + tags = set(m.group(2).split(', ')) + else: + tags = set() + yield pkgs, tags + +parseTags = function_deprecated_by(parse_tags) + +def read_tag_database(input): + "Read the tag database, returning a pkg->tags dictionary" + db = {} + for pkgs, tags in parse_tags(input): + # Create the tag set using the native set + for p in pkgs: + db[p] = tags.copy() + return db; + +readTagDatabase = function_deprecated_by(read_tag_database) + +def read_tag_database_reversed(input): + "Read the tag database, returning a tag->pkgs dictionary" + db = {} + for pkgs, tags in parse_tags(input): + # Create the tag set using the native set + for tag in tags: + if tag in db: + db[tag] |= pkgs + else: + db[tag] = pkgs.copy() + return db; + +readTagDatabaseReversed = function_deprecated_by(read_tag_database_reversed) + +def read_tag_database_both_ways(input, tag_filter = None): + "Read the tag database, returning a pkg->tags and a tag->pkgs dictionary" + db = {} + dbr = {} + for pkgs, tags in parse_tags(input): + # Create the tag set using the native set + if tag_filter == None: + tags = set(tags) + else: + tags = set(filter(tag_filter, tags)) + for pkg in pkgs: + db[pkg] = tags.copy() + for tag in tags: + if tag in dbr: + dbr[tag] |= pkgs + else: + dbr[tag] = pkgs.copy() + return db, dbr; + +readTagDatabaseBothWays = function_deprecated_by(read_tag_database_both_ways) + +def reverse(db): + "Reverse a tag database, from package -> tags to tag->packages" + res = {} + for pkg, tags in db.items(): + for tag in tags: + if tag not in res: + res[tag] = set() + res[tag].add(pkg) + return res + + +def output(db): + "Write the tag database" + for pkg, tags in db.items(): + # Using % here seems awkward to me, but if I use calls to + # sys.stdout.write it becomes a bit slower + print("%s:" % (pkg), ", ".join(tags)) + + +def relevance_index_function(full, sub): + #return (float(sub.card(tag)) / float(sub.tag_count())) / \ + # (float(full.card(tag)) / float(full.tag_count())) + #return sub.card(tag) * full.card(tag) / sub.tag_count() + + # New cardinality divided by the old cardinality + #return float(sub.card(tag)) / float(full.card(tag)) + + ## Same as before, but weighted by the relevance the tag had in the + ## full collection, to downplay the importance of rare tags + #return float(sub.card(tag) * full.card(tag)) / float(full.card(tag) * full.tag_count()) + # Simplified version: + #return float(sub.card(tag)) / float(full.tag_count()) + + # Weighted by the square root of the relevance, to downplay the very + # common tags a bit + #return lambda tag: float(sub.card(tag)) / float(full.card(tag)) * math.sqrt(full.card(tag) / float(full.tag_count())) + #return lambda tag: float(sub.card(tag)) / float(full.card(tag)) * math.sqrt(full.card(tag) / float(full.package_count())) + # One useless factor removed, and simplified further, thanks to Benjamin Mesing + return lambda tag: float(sub.card(tag)**2) / float(full.card(tag)) + + # The difference between how many packages are in and how many packages are out + # (problems: tags that mean many different things can be very much out + # as well. In the case of 'image editor', for example, there will be + # lots of editors not for images in the outside group. + # It is very, very good for nonambiguous keywords like 'image'. + #return lambda tag: 2 * sub.card(tag) - full.card(tag) + # Same but it tries to downplay the 'how many are out' value in the + # case of popular tags, to mitigate the 'there will always be popular + # tags left out' cases. Does not seem to be much of an improvement. + #return lambda tag: sub.card(tag) - float(full.card(tag) - sub.card(tag))/(math.sin(float(full.card(tag))*3.1415/full.package_count())/4 + 0.75) + +relevanceIndexFunction = function_deprecated_by(relevance_index_function) + +class DB: + """ + In-memory database mapping packages to tags and tags to packages. + """ + + def __init__(self): + self.db = {} + self.rdb = {} + + def read(self, input, tag_filter=None): + """ + Read the database from a file. + + Example:: + # Read the system Debtags database + db.read(open("/var/lib/debtags/package-tags", "r")) + """ + self.db, self.rdb = read_tag_database_both_ways(input, tag_filter) + + def qwrite(self, file): + "Quickly write the data to a pickled file" + pickle.dump(self.db, file) + pickle.dump(self.rdb, file) + + def qread(self, file): + "Quickly read the data from a pickled file" + self.db = pickle.load(file) + self.rdb = pickle.load(file) + + def insert(self, pkg, tags): + self.db[pkg] = tags.copy() + for tag in tags: + if tag in self.rdb: + self.rdb[tag].add(pkg) + else: + self.rdb[tag] = set((pkg)) + + def dump(self): + output(self.db) + + def dump_reverse(self): + output(self.rdb) + + dumpReverse = function_deprecated_by(dump_reverse) + + def reverse(self): + "Return the reverse collection, sharing tagsets with this one" + res = DB() + res.db = self.rdb + res.rdb = self.db + return res + + def facet_collection(self): + """ + Return a copy of this collection, but replaces the tag names + with only their facets. + """ + fcoll = DB() + tofacet = re.compile(r"^([^:]+).+") + for pkg, tags in self.iter_packagesTags(): + ftags = set([tofacet.sub(r"\1", t) for t in tags]) + fcoll.insert(pkg, ftags) + return fcoll + + facetCollection = function_deprecated_by(facet_collection) + + def copy(self): + """ + Return a copy of this collection, with the tagsets copied as + well. + """ + res = DB() + res.db = self.db.copy() + res.rdb = self.rdb.copy() + return res + + def reverse_copy(self): + """ + Return the reverse collection, with a copy of the tagsets of + this one. + """ + res = DB() + res.db = self.rdb.copy() + res.rdb = self.db.copy() + return res + + reverseCopy = function_deprecated_by(reverse_copy) + + def choose_packages(self, package_iter): + """ + Return a collection with only the packages in package_iter, + sharing tagsets with this one + """ + res = DB() + db = {} + for pkg in package_iter: + if pkg in self.db: db[pkg] = self.db[pkg] + res.db = db + res.rdb = reverse(db) + return res + + choosePackages = function_deprecated_by(choose_packages) + + def choose_packages_copy(self, package_iter): + """ + Return a collection with only the packages in package_iter, + with a copy of the tagsets of this one + """ + res = DB() + db = {} + for pkg in package_iter: + db[pkg] = self.db[pkg] + res.db = db + res.rdb = reverse(db) + return res + + choosePackagesCopy = function_deprecated_by(choose_packages_copy) + + def filter_packages(self, package_filter): + """ + Return a collection with only those packages that match a + filter, sharing tagsets with this one. The filter will match + on the package. + """ + res = DB() + db = {} + for pkg in filter(package_filter, six.iterkeys(self.db)): + db[pkg] = self.db[pkg] + res.db = db + res.rdb = reverse(db) + return res + + filterPackages = function_deprecated_by(filter_packages) + + def filter_packages_copy(self, filter): + """ + Return a collection with only those packages that match a + filter, with a copy of the tagsets of this one. The filter + will match on the package. + """ + res = DB() + db = {} + for pkg in filter(filter, six.iterkeys(self.db)): + db[pkg] = self.db[pkg].copy() + res.db = db + res.rdb = reverse(db) + return res + + filterPackagesCopy = function_deprecated_by(filter_packages_copy) + + def filter_packages_tags(self, package_tag_filter): + """ + Return a collection with only those packages that match a + filter, sharing tagsets with this one. The filter will match + on (package, tags). + """ + res = DB() + db = {} + for pkg, tags in filter(package_tag_filter, six.iteritems(self.db)): + db[pkg] = self.db[pkg] + res.db = db + res.rdb = reverse(db) + return res + + filterPackagesTags = function_deprecated_by(filter_packages_tags) + + def filter_packages_tags_copy(self, package_tag_filter): + """ + Return a collection with only those packages that match a + filter, with a copy of the tagsets of this one. The filter + will match on (package, tags). + """ + res = DB() + db = {} + for pkg, tags in filter(package_tag_filter, six.iteritems(self.db)): + db[pkg] = self.db[pkg].copy() + res.db = db + res.rdb = reverse(db) + return res + + filterPackagesTagsCopy = function_deprecated_by(filter_packages_tags_copy) + + def filter_tags(self, tag_filter): + """ + Return a collection with only those tags that match a + filter, sharing package sets with this one. The filter will match + on the tag. + """ + res = DB() + rdb = {} + for tag in filter(tag_filter, six.iterkeys(self.rdb)): + rdb[tag] = self.rdb[tag] + res.rdb = rdb + res.db = reverse(rdb) + return res + + filterTags = function_deprecated_by(filter_tags) + + def filter_tags_copy(self, tag_filter): + """ + Return a collection with only those tags that match a + filter, with a copy of the package sets of this one. The + filter will match on the tag. + """ + res = DB() + rdb = {} + for tag in filter(tag_filter, six.iterkeys(self.rdb)): + rdb[tag] = self.rdb[tag].copy() + res.rdb = rdb + res.db = reverse(rdb) + return res + + filterTagsCopy = function_deprecated_by(filter_tags_copy) + + def has_package(self, pkg): + """Check if the collection contains the given package""" + return pkg in self.db + + hasPackage = function_deprecated_by(has_package) + + def has_tag(self, tag): + """Check if the collection contains packages tagged with tag""" + return tag in self.rdb + + hasTag = function_deprecated_by(has_tag) + + def tags_of_package(self, pkg): + """Return the tag set of a package""" + return pkg in self.db and self.db[pkg] or set() + + tagsOfPackage = function_deprecated_by(tags_of_package) + + def packages_of_tag(self, tag): + """Return the package set of a tag""" + return tag in self.rdb and self.rdb[tag] or set() + + packagesOfTag = function_deprecated_by(packages_of_tag) + + def tags_of_packages(self, pkgs): + """Return the set of tags that have all the packages in pkgs""" + res = None + for p in pkgs: + if res == None: + res = set(self.tags_of_package(p)) + else: + res &= self.tags_of_package(p) + return res + + tagsOfPackages = function_deprecated_by(tags_of_packages) + + def packages_of_tags(self, tags): + """Return the set of packages that have all the tags in tags""" + res = None + for t in tags: + if res == None: + res = set(self.packages_of_tag(t)) + else: + res &= self.packages_of_tag(t) + return res + + packagesOfTags = function_deprecated_by(packages_of_tags) + + def card(self, tag): + """ + Return the cardinality of a tag + """ + return tag in self.rdb and len(self.rdb[tag]) or 0 + + def discriminance(self, tag): + """ + Return the discriminance index if the tag. + + Th discriminance index of the tag is defined as the minimum + number of packages that would be eliminated by selecting only + those tagged with this tag or only those not tagged with this + tag. + """ + n = self.card(tag) + tot = self.package_count() + return min(n, tot - n) + + def iter_packages(self): + """Iterate over the packages""" + return six.iterkeys(self.db) + + iterPackages = function_deprecated_by(iter_packages) + + def iter_tags(self): + """Iterate over the tags""" + return six.iterkeys(self.rdb) + + iterTags = function_deprecated_by(iter_tags) + + def iter_packages_tags(self): + """Iterate over 2-tuples of (pkg, tags)""" + return six.iteritems(self.db) + + iterPackagesTags = function_deprecated_by(iter_packages_tags) + + def iter_tags_packages(self): + """Iterate over 2-tuples of (tag, pkgs)""" + return six.iteritems(self.rdb) + + iterTagsPackages = function_deprecated_by(iter_tags_packages) + + def package_count(self): + """Return the number of packages""" + return len(self.db) + + packageCount = function_deprecated_by(package_count) + + def tag_count(self): + """Return the number of tags""" + return len(self.rdb) + + tagCount = function_deprecated_by(tag_count) + + def ideal_tagset(self, tags): + """ + Return an ideal selection of the top tags in a list of tags. + + Return the tagset made of the highest number of tags taken in + consecutive sequence from the beginning of the given vector, + that would intersecate with the tagset of a comfortable amount + of packages. + + Comfortable is defined in terms of how far it is from 7. + """ + + # TODO: the scoring function is quite ok, but may need more + # tuning. I also center it on 15 instead of 7 since we're + # setting a starting point for the search, not a target point + def score_fun(x): + return float((x-15)*(x-15))/x + + hits = [] + tagset = set() + min_score = 3 + for i in range(len(tags)): + pkgs = self.packages_of_tags(tags[:i+1]) + card = len(pkgs) + if card == 0: break; + score = score_fun(card) + if score < min_score: + min_score = score + tagset = set(tags[:i+1]) + + # Return always at least the first tag + if len(tagset) == 0: + return set(tags[:1]) + else: + return tagset + + idealTagset = function_deprecated_by(ideal_tagset) + + def correlations(self): + """ + Generate the list of correlation as a tuple (hastag, hasalsotag, score). + + Every touple will indicate that the tag 'hastag' tends to also + have 'hasalsotag' with a score of 'score'. + """ + for pivot in self.iter_tags(): + with_ = self.filter_packages_tags(lambda pt: pivot in pt[1]) + without = self.filter_packages_tags(lambda pt: pivot not in pt[1]) + for tag in with_.iter_tags(): + if tag == pivot: continue + has = float(with_.card(tag)) / float(with_.package_count()) + hasnt = float(without.card(tag)) / float(without.package_count()) + yield pivot, tag, has - hasnt diff --git a/debian/foo/doc-debtags b/debian/foo/doc-debtags new file mode 100755 index 0000000..366f1bf --- /dev/null +++ b/debian/foo/doc-debtags @@ -0,0 +1,104 @@ +#!/usr/bin/python + +from __future__ import absolute_import, print_function + +import sys +import os +import inspect + +sys.path.insert(0, os.path.join(sys.path[0], os.pardir)) + +from debian import debtags + +def print_indented (spaces, string): + for line in string.split("\n"): + for i in range(1,spaces): + sys.stdout.write(" ") + sys.stdout.write(line) + sys.stdout.write("\n") + +def document (callable): + if callable.__doc__ != None: + print_indented(2, callable.__name__) + print_indented(4, inspect.getdoc(callable)) + print() + + +print("""debtags.py README +================= + +The Debtags python module provides support for accessing and manipulating +Debtags tag data. + +The module provides a single class, debtags.DB, which implements various kinds +of tag operations on an in-memory tag database. + +The database can be queried both as a database of packages with associated tags +and as a database of tags with associated packages. Performance are good in +both ways: querying the tags of a package has the same peed as querying the +packages having a tag. + +debtags.DB allows both simple queries and more complex algorithms to be +implemented easily and efficiently. Have a look at the Sample usage section +below for some examples. + + +Classes +======= + +There is only one class: debtags.DB: +""") + +document (debtags.DB) + +print(""" +The methods of debtags.DB are: +""") + +for m in dir(debtags.DB): + if m[0:2] != '__' and callable(getattr(debtags.DB, m)): + document(getattr(debtags.DB, m)) + +print("""Iteration +========= + +debtags.DB provides various iteration methods to iterate the collection either +in a package-centered or in a tag-centered way: +""") + +document(debtags.DB.iter_packages) +document(debtags.DB.iter_packages_tags) +document(debtags.DB.iter_tags) +document(debtags.DB.iter_tags_packages) + + +print("""Sample usage +============ + +This example reads the system debtags database and performs a simple tag +search:: + + import debtags + + db = debtags.DB() + db.read(open("/var/lib/debtags/package-tags", "r")) + print(db.package_count(), "packages in the database") + print("Image editors:") + for pkg in db.packages_of_tags(set(("use::editing", "works-with::image:raster"))): + print(" *", pkg) + +This example computes the set of tags that belong to all the packages in a +list, then shows all the other packages that have those tags: + + import debtags + + db = debtags.DB() + db.read(open("/var/lib/debtags/package-tags", "r")) + tags = db.tags_of_packages(("gimp", "krita")) + print("Common tags:") + for tag in tags: + print(" *", tag) + print("Packages similar to gimp and krita:") + for pkg in db.packages_of_tags(tags): + print(" *", pkg) +""") -- cgit v1.2.1