diff options
Diffstat (limited to 'debian/foo/deb822.py')
-rw-r--r-- | debian/foo/deb822.py | 1318 |
1 files changed, 1318 insertions, 0 deletions
diff --git a/debian/foo/deb822.py b/debian/foo/deb822.py new file mode 100644 index 0000000..bd910d6 --- /dev/null +++ b/debian/foo/deb822.py @@ -0,0 +1,1318 @@ +# vim: fileencoding=utf-8 +# +# A python interface for various rfc822-like formatted files used by Debian +# (.changes, .dsc, Packages, Sources, etc) +# +# Copyright (C) 2005-2006 dann frazier <dannf@dannf.org> +# Copyright (C) 2006-2010 John Wright <john@johnwright.org> +# Copyright (C) 2006 Adeodato Simó <dato@net.com.org.es> +# Copyright (C) 2008 Stefano Zacchiroli <zack@upsilon.cc> +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation, either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from __future__ import absolute_import, print_function + +from debian.deprecation import function_deprecated_by + +try: + import apt_pkg + # This module uses apt_pkg only for its TagFile interface. + apt_pkg.TagFile + _have_apt_pkg = True +except (ImportError, AttributeError): + _have_apt_pkg = False + +import chardet +import os +import re +import subprocess +import sys +import warnings + +try: + from StringIO import StringIO + BytesIO = StringIO +except ImportError: + from io import BytesIO, StringIO +try: + from collections import Mapping, MutableMapping + _mapping_mixin = Mapping + _mutable_mapping_mixin = MutableMapping +except ImportError: + from UserDict import DictMixin + _mapping_mixin = DictMixin + _mutable_mapping_mixin = DictMixin + +import six + +if sys.version >= '3': + import io + def _is_real_file(f): + if not isinstance(f, io.IOBase): + return False + try: + f.fileno() + return True + except (AttributeError, io.UnsupportedOperation): + return False +else: + def _is_real_file(f): + return isinstance(f, file) and hasattr(f, 'fileno') + + +GPGV_DEFAULT_KEYRINGS = frozenset(['/usr/share/keyrings/debian-keyring.gpg']) +GPGV_EXECUTABLE = '/usr/bin/gpgv' + + +class TagSectionWrapper(_mapping_mixin, object): + """Wrap a TagSection object, using its find_raw method to get field values + + This allows us to pick which whitespace to strip off the beginning and end + of the data, so we don't lose leading newlines. + """ + + def __init__(self, section): + self.__section = section + + def __iter__(self): + for key in self.__section.keys(): + if not key.startswith('#'): + yield key + + def __len__(self): + return len([key for key in self.__section.keys() + if not key.startswith('#')]) + + def __getitem__(self, key): + s = self.__section.find_raw(key) + + if s is None: + raise KeyError(key) + + # Get just the stuff after the first ':' + # Could use s.partition if we only supported python >= 2.5 + data = s[s.find(b':')+1:] + + # Get rid of spaces and tabs after the ':', but not newlines, and strip + # off any newline at the end of the data. + return data.lstrip(b' \t').rstrip(b'\n') + + +class OrderedSet(object): + """A set-like object that preserves order when iterating over it + + We use this to keep track of keys in Deb822Dict, because it's much faster + to look up if a key is in a set than in a list. + """ + + def __init__(self, iterable=[]): + self.__set = set() + self.__order = [] + for item in iterable: + self.add(item) + + def add(self, item): + if item not in self: + # set.add will raise TypeError if something's unhashable, so we + # don't have to handle that ourselves + self.__set.add(item) + self.__order.append(item) + + def remove(self, item): + # set.remove will raise KeyError, so we don't need to handle that + # ourselves + self.__set.remove(item) + self.__order.remove(item) + + def __iter__(self): + # Return an iterator of items in the order they were added + return iter(self.__order) + + def __len__(self): + return len(self.__order) + + def __contains__(self, item): + # This is what makes OrderedSet faster than using a list to keep track + # of keys. Lookup in a set is O(1) instead of O(n) for a list. + return item in self.__set + + ### list-like methods + append = add + + def extend(self, iterable): + for item in iterable: + self.add(item) + ### + + +class Deb822Dict(_mutable_mapping_mixin, object): + # Subclassing _mutable_mapping_mixin because we're overriding so much + # dict functionality that subclassing dict requires overriding many more + # than the methods that _mutable_mapping_mixin requires. + """A dictionary-like object suitable for storing RFC822-like data. + + Deb822Dict behaves like a normal dict, except: + - key lookup is case-insensitive + - key order is preserved + - if initialized with a _parsed parameter, it will pull values from + that dictionary-like object as needed (rather than making a copy). + The _parsed dict is expected to be able to handle case-insensitive + keys. + + If _parsed is not None, an optional _fields parameter specifies which keys + in the _parsed dictionary are exposed. + """ + + # See the end of the file for the definition of _strI + + def __init__(self, _dict=None, _parsed=None, _fields=None, + encoding="utf-8"): + self.__dict = {} + self.__keys = OrderedSet() + self.__parsed = None + self.encoding = encoding + + if _dict is not None: + # _dict may be a dict or a list of two-sized tuples + if hasattr(_dict, 'items'): + items = _dict.items() + else: + items = list(_dict) + + try: + for k, v in items: + self[k] = v + except ValueError: + this = len(self.__keys) + len_ = len(items[this]) + raise ValueError('dictionary update sequence element #%d has ' + 'length %d; 2 is required' % (this, len_)) + + if _parsed is not None: + self.__parsed = _parsed + if _fields is None: + self.__keys.extend([ _strI(k) for k in self.__parsed ]) + else: + self.__keys.extend([ _strI(f) for f in _fields if f in self.__parsed ]) + + def _detect_encoding(self, value): + """If value is not already Unicode, decode it intelligently.""" + if isinstance(value, bytes): + try: + return value.decode(self.encoding) + except UnicodeDecodeError as e: + # Evidently, the value wasn't encoded with the encoding the + # user specified. Try detecting it. + warnings.warn('decoding from %s failed; attempting to detect ' + 'the true encoding' % self.encoding, + UnicodeWarning) + result = chardet.detect(value) + try: + return value.decode(result['encoding']) + except UnicodeDecodeError: + raise e + else: + # Assume the rest of the paragraph is in this encoding as + # well (there's no sense in repeating this exercise for + # every field). + self.encoding = result['encoding'] + else: + return value + + ### BEGIN _mutable_mapping_mixin methods + + def __iter__(self): + for key in self.__keys: + yield str(key) + + def __len__(self): + return len(self.__keys) + + def __setitem__(self, key, value): + key = _strI(key) + self.__keys.add(key) + self.__dict[key] = value + + def __getitem__(self, key): + key = _strI(key) + try: + value = self.__dict[key] + except KeyError: + if self.__parsed is not None and key in self.__keys: + value = self.__parsed[key] + else: + raise + + return self._detect_encoding(value) + + def __delitem__(self, key): + key = _strI(key) + self.__keys.remove(key) + try: + del self.__dict[key] + except KeyError: + # If we got this far, the key was in self.__keys, so it must have + # only been in the self.__parsed dict. + pass + + def __contains__(self, key): + key = _strI(key) + return key in self.__keys + + if sys.version < '3': + has_key = __contains__ + + ### END _mutable_mapping_mixin methods + + def __repr__(self): + return '{%s}' % ', '.join(['%r: %r' % (k, v) for k, v in self.items()]) + + def __eq__(self, other): + mykeys = sorted(self) + otherkeys = sorted(other) + if not mykeys == otherkeys: + return False + + for key in mykeys: + if self[key] != other[key]: + return False + + # If we got here, everything matched + return True + + # Overriding __eq__ blocks inheritance of __hash__ in Python 3, and + # instances of this class are not sensibly hashable anyway. + __hash__ = None + + def copy(self): + # Use self.__class__ so this works as expected for subclasses + copy = self.__class__(self) + return copy + + # TODO implement __str__() and make dump() use that? + + +class Deb822(Deb822Dict): + + def __init__(self, sequence=None, fields=None, _parsed=None, + encoding="utf-8"): + """Create a new Deb822 instance. + + :param sequence: a string, or any any object that returns a line of + input each time, normally a file. Alternately, sequence can + be a dict that contains the initial key-value pairs. + + :param fields: if given, it is interpreted as a list of fields that + should be parsed (the rest will be discarded). + + :param _parsed: internal parameter. + + :param encoding: When parsing strings, interpret them in this encoding. + (All values are given back as unicode objects, so an encoding is + necessary in order to properly interpret the strings.) + """ + + if hasattr(sequence, 'items'): + _dict = sequence + sequence = None + else: + _dict = None + Deb822Dict.__init__(self, _dict=_dict, _parsed=_parsed, _fields=fields, + encoding=encoding) + + if sequence is not None: + try: + self._internal_parser(sequence, fields) + except EOFError: + pass + + self.gpg_info = None + + def iter_paragraphs(cls, sequence, fields=None, use_apt_pkg=True, + shared_storage=False, encoding="utf-8"): + """Generator that yields a Deb822 object for each paragraph in sequence. + + :param sequence: same as in __init__. + + :param fields: likewise. + + :param use_apt_pkg: if sequence is a file, apt_pkg will be used + if available to parse the file, since it's much much faster. Set + this parameter to False to disable using apt_pkg. + :param shared_storage: not used, here for historical reasons. Deb822 + objects never use shared storage anymore. + :param encoding: Interpret the paragraphs in this encoding. + (All values are given back as unicode objects, so an encoding is + necessary in order to properly interpret the strings.) + """ + + if _have_apt_pkg and use_apt_pkg and _is_real_file(sequence): + kwargs = {} + if sys.version >= '3': + # bytes=True is supported for both Python 2 and 3, but we + # only actually need it for Python 3, so this saves us from + # having to require a newer version of python-apt for Python + # 2 as well. This allows us to apply our own encoding + # handling, which is more tolerant of mixed-encoding files. + kwargs['bytes'] = True + parser = apt_pkg.TagFile(sequence, **kwargs) + for section in parser: + paragraph = cls(fields=fields, + _parsed=TagSectionWrapper(section), + encoding=encoding) + if paragraph: + yield paragraph + + else: + iterable = iter(sequence) + x = cls(iterable, fields, encoding=encoding) + while len(x) != 0: + yield x + x = cls(iterable, fields, encoding=encoding) + + iter_paragraphs = classmethod(iter_paragraphs) + + ### + + @staticmethod + def _skip_useless_lines(sequence): + """Yields only lines that do not begin with '#'. + + Also skips any blank lines at the beginning of the input. + """ + at_beginning = True + for line in sequence: + # The bytes/str polymorphism required here to support Python 3 + # is unpleasant, but fortunately limited. We need this because + # at this point we might have been given either bytes or + # Unicode, and we haven't yet got to the point where we can try + # to decode a whole paragraph and detect its encoding. + if isinstance(line, bytes): + if line.startswith(b'#'): + continue + else: + if line.startswith('#'): + continue + if at_beginning: + if isinstance(line, bytes): + if not line.rstrip(b'\r\n'): + continue + else: + if not line.rstrip('\r\n'): + continue + at_beginning = False + yield line + + def _internal_parser(self, sequence, fields=None): + # The key is non-whitespace, non-colon characters before any colon. + key_part = r"^(?P<key>[^: \t\n\r\f\v]+)\s*:\s*" + single = re.compile(key_part + r"(?P<data>\S.*?)\s*$") + multi = re.compile(key_part + r"$") + multidata = re.compile(r"^\s(?P<data>.+?)\s*$") + + wanted_field = lambda f: fields is None or f in fields + + if isinstance(sequence, (six.string_types, bytes)): + sequence = sequence.splitlines() + + curkey = None + content = "" + + for line in self.gpg_stripped_paragraph( + self._skip_useless_lines(sequence)): + line = self._detect_encoding(line) + + m = single.match(line) + if m: + if curkey: + self[curkey] = content + + if not wanted_field(m.group('key')): + curkey = None + continue + + curkey = m.group('key') + content = m.group('data') + continue + + m = multi.match(line) + if m: + if curkey: + self[curkey] = content + + if not wanted_field(m.group('key')): + curkey = None + continue + + curkey = m.group('key') + content = "" + continue + + m = multidata.match(line) + if m: + content += '\n' + line # XXX not m.group('data')? + continue + + if curkey: + self[curkey] = content + + def __str__(self): + return self.dump() + + def __unicode__(self): + return self.dump() + + if sys.version >= '3': + def __bytes__(self): + return self.dump().encode(self.encoding) + + # __repr__ is handled by Deb822Dict + + def get_as_string(self, key): + """Return the self[key] as a string (or unicode) + + The default implementation just returns unicode(self[key]); however, + this can be overridden in subclasses (e.g. _multivalued) that can take + special values. + """ + return six.text_type(self[key]) + + def dump(self, fd=None, encoding=None): + """Dump the the contents in the original format + + If fd is None, return a unicode object. + + If fd is not None, attempt to encode the output to the encoding the + object was initialized with, or the value of the encoding argument if + it is not None. This will raise UnicodeEncodeError if the encoding + can't support all the characters in the Deb822Dict values. + """ + + if fd is None: + fd = StringIO() + return_string = True + else: + return_string = False + + if encoding is None: + # Use the encoding we've been using to decode strings with if none + # was explicitly specified + encoding = self.encoding + + for key in self: + value = self.get_as_string(key) + if not value or value[0] == '\n': + # Avoid trailing whitespace after "Field:" if it's on its own + # line or the value is empty. We don't have to worry about the + # case where value == '\n', since we ensure that is not the + # case in __setitem__. + entry = '%s:%s\n' % (key, value) + else: + entry = '%s: %s\n' % (key, value) + if not return_string: + fd.write(entry.encode(encoding)) + else: + fd.write(entry) + if return_string: + return fd.getvalue() + + ### + + def is_single_line(self, s): + if s.count("\n"): + return False + else: + return True + + isSingleLine = function_deprecated_by(is_single_line) + + def is_multi_line(self, s): + return not self.is_single_line(s) + + isMultiLine = function_deprecated_by(is_multi_line) + + def _merge_fields(self, s1, s2): + if not s2: + return s1 + if not s1: + return s2 + + if self.is_single_line(s1) and self.is_single_line(s2): + ## some fields are delimited by a single space, others + ## a comma followed by a space. this heuristic assumes + ## that there are multiple items in one of the string fields + ## so that we can pick up on the delimiter being used + delim = ' ' + if (s1 + s2).count(', '): + delim = ', ' + + L = sorted((s1 + delim + s2).split(delim)) + + prev = merged = L[0] + + for item in L[1:]: + ## skip duplicate entries + if item == prev: + continue + merged = merged + delim + item + prev = item + return merged + + if self.is_multi_line(s1) and self.is_multi_line(s2): + for item in s2.splitlines(True): + if item not in s1.splitlines(True): + s1 = s1 + "\n" + item + return s1 + + raise ValueError + + _mergeFields = function_deprecated_by(_merge_fields) + + def merge_fields(self, key, d1, d2=None): + ## this method can work in two ways - abstract that away + if d2 == None: + x1 = self + x2 = d1 + else: + x1 = d1 + x2 = d2 + + ## we only have to do work if both objects contain our key + ## otherwise, we just take the one that does, or raise an + ## exception if neither does + if key in x1 and key in x2: + merged = self._mergeFields(x1[key], x2[key]) + elif key in x1: + merged = x1[key] + elif key in x2: + merged = x2[key] + else: + raise KeyError + + ## back to the two different ways - if this method was called + ## upon an object, update that object in place. + ## return nothing in this case, to make the author notice a + ## problem if she assumes the object itself will not be modified + if d2 == None: + self[key] = merged + return None + + return merged + + mergeFields = function_deprecated_by(merge_fields) + + def split_gpg_and_payload(sequence): + """Return a (gpg_pre, payload, gpg_post) tuple + + Each element of the returned tuple is a list of lines (with trailing + whitespace stripped). + """ + + gpg_pre_lines = [] + lines = [] + gpg_post_lines = [] + state = b'SAFE' + gpgre = re.compile(br'^-----(?P<action>BEGIN|END) PGP (?P<what>[^-]+)-----$') + blank_line = re.compile(b'^$') + first_line = True + + for line in sequence: + # Some consumers of this method require bytes (encoding + # detection and signature checking). However, we might have + # been given a file opened in text mode, in which case it's + # simplest to encode to bytes. + if sys.version >= '3' and isinstance(line, str): + line = line.encode() + + line = line.strip(b'\r\n') + + # skip initial blank lines, if any + if first_line: + if blank_line.match(line): + continue + else: + first_line = False + + m = gpgre.match(line) + + if not m: + if state == b'SAFE': + if not blank_line.match(line): + lines.append(line) + else: + if not gpg_pre_lines: + # There's no gpg signature, so we should stop at + # this blank line + break + elif state == b'SIGNED MESSAGE': + if blank_line.match(line): + state = b'SAFE' + else: + gpg_pre_lines.append(line) + elif state == b'SIGNATURE': + gpg_post_lines.append(line) + else: + if m.group('action') == b'BEGIN': + state = m.group('what') + elif m.group('action') == b'END': + gpg_post_lines.append(line) + break + if not blank_line.match(line): + if not lines: + gpg_pre_lines.append(line) + else: + gpg_post_lines.append(line) + + if len(lines): + return (gpg_pre_lines, lines, gpg_post_lines) + else: + raise EOFError('only blank lines found in input') + + split_gpg_and_payload = staticmethod(split_gpg_and_payload) + + def gpg_stripped_paragraph(cls, sequence): + return cls.split_gpg_and_payload(sequence)[1] + + gpg_stripped_paragraph = classmethod(gpg_stripped_paragraph) + + def get_gpg_info(self, keyrings=None): + """Return a GpgInfo object with GPG signature information + + This method will raise ValueError if the signature is not available + (e.g. the original text cannot be found). + + :param keyrings: list of keyrings to use (see GpgInfo.from_sequence) + """ + + # raw_text is saved (as a string) only for Changes and Dsc (see + # _gpg_multivalued.__init__) which is small compared to Packages or + # Sources which contain no signature + if not hasattr(self, 'raw_text'): + raise ValueError("original text cannot be found") + + if self.gpg_info is None: + self.gpg_info = GpgInfo.from_sequence(self.raw_text, + keyrings=keyrings) + + return self.gpg_info + + def validate_input(self, key, value): + """Raise ValueError if value is not a valid value for key + + Subclasses that do interesting things for different keys may wish to + override this method. + """ + + # The value cannot end in a newline (if it did, dumping the object + # would result in multiple stanzas) + if value.endswith('\n'): + raise ValueError("value must not end in '\\n'") + + # Make sure there are no blank lines (actually, the first one is + # allowed to be blank, but no others), and each subsequent line starts + # with whitespace + for line in value.splitlines()[1:]: + if not line: + raise ValueError("value must not have blank lines") + if not line[0].isspace(): + raise ValueError("each line must start with whitespace") + + def __setitem__(self, key, value): + self.validate_input(key, value) + Deb822Dict.__setitem__(self, key, value) + + +# XXX check what happens if input contains more that one signature +class GpgInfo(dict): + """A wrapper around gnupg parsable output obtained via --status-fd + + This class is really a dictionary containing parsed output from gnupg plus + some methods to make sense of the data. + Keys are keywords and values are arguments suitably splitted. + See /usr/share/doc/gnupg/DETAILS.gz""" + + # keys with format "key keyid uid" + uidkeys = ('GOODSIG', 'EXPSIG', 'EXPKEYSIG', 'REVKEYSIG', 'BADSIG') + + def valid(self): + """Is the signature valid?""" + return 'GOODSIG' in self or 'VALIDSIG' in self + +# XXX implement as a property? +# XXX handle utf-8 %-encoding + def uid(self): + """Return the primary ID of the signee key, None is not available""" + pass + + @classmethod + def from_output(cls, out, err=None): + """Create a new GpgInfo object from gpg(v) --status-fd output (out) and + optionally collect stderr as well (err). + + Both out and err can be lines in newline-terminated sequence or regular strings.""" + + n = cls() + + if isinstance(out, six.string_types): + out = out.split('\n') + if isinstance(err, six.string_types): + err = err.split('\n') + + n.out = out + n.err = err + + header = '[GNUPG:] ' + for l in out: + if not l.startswith(header): + continue + + l = l[len(header):] + l = l.strip('\n') + + # str.partition() would be better, 2.5 only though + s = l.find(' ') + key = l[:s] + if key in cls.uidkeys: + # value is "keyid UID", don't split UID + value = l[s+1:].split(' ', 1) + else: + value = l[s+1:].split(' ') + + n[key] = value + return n + + @classmethod + def from_sequence(cls, sequence, keyrings=None, executable=None): + """Create a new GpgInfo object from the given sequence. + + :param sequence: sequence of lines of bytes or a single byte string + + :param keyrings: list of keyrings to use (default: + ['/usr/share/keyrings/debian-keyring.gpg']) + + :param executable: list of args for subprocess.Popen, the first element + being the gpgv executable (default: ['/usr/bin/gpgv']) + """ + + keyrings = keyrings or GPGV_DEFAULT_KEYRINGS + executable = executable or [GPGV_EXECUTABLE] + + # XXX check for gpg as well and use --verify accordingly? + args = list(executable) + #args.extend(["--status-fd", "1", "--no-default-keyring"]) + args.extend(["--status-fd", "1"]) + for k in keyrings: + args.extend(["--keyring", k]) + + if "--keyring" not in args: + raise IOError("cannot access any of the given keyrings") + + p = subprocess.Popen(args, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + universal_newlines=False) + # XXX what to do with exit code? + + if isinstance(sequence, bytes): + inp = sequence + else: + inp = cls._get_full_bytes(sequence) + out, err = p.communicate(inp) + + return cls.from_output(out.decode('utf-8'), + err.decode('utf-8')) + + @staticmethod + def _get_full_bytes(sequence): + """Return a byte string from a sequence of lines of bytes. + + This method detects if the sequence's lines are newline-terminated, and + constructs the byte string appropriately. + """ + # Peek at the first line to see if it's newline-terminated. + sequence_iter = iter(sequence) + try: + first_line = next(sequence_iter) + except StopIteration: + return b"" + join_str = b'\n' + if first_line.endswith(b'\n'): + join_str = b'' + return first_line + join_str + join_str.join(sequence_iter) + + @classmethod + def from_file(cls, target, *args, **kwargs): + """Create a new GpgInfo object from the given file. + + See GpgInfo.from_sequence. + """ + with open(target, 'rb') as target_file: + return cls.from_sequence(target_file, *args, **kwargs) + + +class PkgRelation(object): + """Inter-package relationships + + Structured representation of the relationships of a package to another, + i.e. of what can appear in a Deb882 field like Depends, Recommends, + Suggests, ... (see Debian Policy 7.1). + """ + + # XXX *NOT* a real dependency parser, and that is not even a goal here, we + # just parse as much as we need to split the various parts composing a + # dependency, checking their correctness wrt policy is out of scope + __dep_RE = re.compile( \ + r'^\s*(?P<name>[a-zA-Z0-9.+\-]{2,})(\s*\(\s*(?P<relop>[>=<]+)\s*(?P<version>[0-9a-zA-Z:\-+~.]+)\s*\))?(\s*\[(?P<archs>[\s!\w\-]+)\])?\s*$') + __comma_sep_RE = re.compile(r'\s*,\s*') + __pipe_sep_RE = re.compile(r'\s*\|\s*') + __blank_sep_RE = re.compile(r'\s*') + + @classmethod + def parse_relations(cls, raw): + """Parse a package relationship string (i.e. the value of a field like + Depends, Recommends, Build-Depends ...) + """ + def parse_archs(raw): + # assumption: no space beween '!' and architecture name + archs = [] + for arch in cls.__blank_sep_RE.split(raw.strip()): + if len(arch) and arch[0] == '!': + archs.append((False, arch[1:])) + else: + archs.append((True, arch)) + return archs + + def parse_rel(raw): + match = cls.__dep_RE.match(raw) + if match: + parts = match.groupdict() + d = { 'name': parts['name'] } + if not (parts['relop'] is None or parts['version'] is None): + d['version'] = (parts['relop'], parts['version']) + else: + d['version'] = None + if parts['archs'] is None: + d['arch'] = None + else: + d['arch'] = parse_archs(parts['archs']) + return d + else: + print('deb822.py: WARNING: cannot parse package' \ + ' relationship "%s", returning it raw' % raw, + file=sys.stderr) + return { 'name': raw, 'version': None, 'arch': None } + + tl_deps = cls.__comma_sep_RE.split(raw.strip()) # top-level deps + cnf = map(cls.__pipe_sep_RE.split, tl_deps) + return [[parse_rel(or_dep) for or_dep in or_deps] for or_deps in cnf] + + @staticmethod + def str(rels): + """Format to string structured inter-package relationships + + Perform the inverse operation of parse_relations, returning a string + suitable to be written in a package stanza. + """ + def pp_arch(arch_spec): + (excl, arch) = arch_spec + if excl: + return arch + else: + return '!' + arch + + def pp_atomic_dep(dep): + s = dep['name'] + if dep.get('version') is not None: + s += ' (%s %s)' % dep['version'] + if dep.get('arch') is not None: + s += ' [%s]' % ' '.join(map(pp_arch, dep['arch'])) + return s + + pp_or_dep = lambda deps: ' | '.join(map(pp_atomic_dep, deps)) + return ', '.join(map(pp_or_dep, rels)) + + +class _lowercase_dict(dict): + """Dictionary wrapper which lowercase keys upon lookup.""" + + def __getitem__(self, key): + return dict.__getitem__(self, key.lower()) + + +class _PkgRelationMixin(object): + """Package relationship mixin + + Inheriting from this mixin you can extend a Deb882 object with attributes + letting you access inter-package relationship in a structured way, rather + than as strings. For example, while you can usually use pkg['depends'] to + obtain the Depends string of package pkg, mixing in with this class you + gain pkg.depends to access Depends as a Pkgrel instance + + To use, subclass _PkgRelationMixin from a class with a _relationship_fields + attribute. It should be a list of field names for which structured access + is desired; for each of them a method wild be added to the inherited class. + The method name will be the lowercase version of field name; '-' will be + mangled as '_'. The method would return relationships in the same format of + the PkgRelation' relations property. + + See Packages and Sources as examples. + """ + + def __init__(self, *args, **kwargs): + self.__relations = _lowercase_dict({}) + self.__parsed_relations = False + for name in self._relationship_fields: + # To avoid reimplementing Deb822 key lookup logic we use a really + # simple dict subclass which just lowercase keys upon lookup. Since + # dictionary building happens only here, we ensure that all keys + # are in fact lowercase. + # With this trick we enable users to use the same key (i.e. field + # name) of Deb822 objects on the dictionary returned by the + # relations property. + keyname = name.lower() + if name in self: + self.__relations[keyname] = None # lazy value + # all lazy values will be expanded before setting + # __parsed_relations to True + else: + self.__relations[keyname] = [] + + @property + def relations(self): + """Return a dictionary of inter-package relationships among the current + and other packages. + + Dictionary keys depend on the package kind. Binary packages have keys + like 'depends', 'recommends', ... while source packages have keys like + 'build-depends', 'build-depends-indep' and so on. See the Debian policy + for the comprehensive field list. + + Dictionary values are package relationships returned as lists of lists + of dictionaries (see below for some examples). + + The encoding of package relationships is as follows: + - the top-level lists corresponds to the comma-separated list of + Deb822, their components form a conjuction, i.e. they have to be + AND-ed together + - the inner lists corresponds to the pipe-separated list of Deb822, + their components form a disjunction, i.e. they have to be OR-ed + together + - member of the inner lists are dictionaries with the following keys: + - name: package (or virtual package) name + - version: A pair <operator, version> if the relationship is + versioned, None otherwise. operator is one of "<<", + "<=", "=", ">=", ">>"; version is the given version as + a string. + - arch: A list of pairs <polarity, architecture> if the + relationship is architecture specific, None otherwise. + Polarity is a boolean (false if the architecture is + negated with "!", true otherwise), architecture the + Debian archtiecture name as a string. + + Examples: + + "emacs | emacsen, make, debianutils (>= 1.7)" becomes + [ [ {'name': 'emacs'}, {'name': 'emacsen'} ], + [ {'name': 'make'} ], + [ {'name': 'debianutils', 'version': ('>=', '1.7')} ] ] + + "tcl8.4-dev, procps [!hurd-i386]" becomes + [ [ {'name': 'tcl8.4-dev'} ], + [ {'name': 'procps', 'arch': (false, 'hurd-i386')} ] ] + """ + if not self.__parsed_relations: + lazy_rels = filter(lambda n: self.__relations[n] is None, + self.__relations.keys()) + for n in lazy_rels: + self.__relations[n] = PkgRelation.parse_relations(self[n]) + self.__parsed_relations = True + return self.__relations + + +class _multivalued(Deb822): + """A class with (R/W) support for multivalued fields. + + To use, create a subclass with a _multivalued_fields attribute. It should + be a dictionary with *lower-case* keys, with lists of human-readable + identifiers of the fields as the values. Please see Dsc, Changes, and + PdiffIndex as examples. + """ + + def __init__(self, *args, **kwargs): + Deb822.__init__(self, *args, **kwargs) + + for field, fields in self._multivalued_fields.items(): + try: + contents = self[field] + except KeyError: + continue + + if self.is_multi_line(contents): + self[field] = [] + updater_method = self[field].append + else: + self[field] = Deb822Dict() + updater_method = self[field].update + + for line in filter(None, contents.splitlines()): + updater_method(Deb822Dict(zip(fields, line.split()))) + + def validate_input(self, key, value): + if key.lower() in self._multivalued_fields: + # It's difficult to write a validator for multivalued fields, and + # basically futile, since we allow mutable lists. In any case, + # with sanity checking in get_as_string, we shouldn't ever output + # unparseable data. + pass + else: + Deb822.validate_input(self, key, value) + + def get_as_string(self, key): + keyl = key.lower() + if keyl in self._multivalued_fields: + fd = StringIO() + if hasattr(self[key], 'keys'): # single-line + array = [ self[key] ] + else: # multi-line + fd.write("\n") + array = self[key] + + order = self._multivalued_fields[keyl] + try: + field_lengths = self._fixed_field_lengths + except AttributeError: + field_lengths = {} + for item in array: + for x in order: + raw_value = six.text_type(item[x]) + try: + length = field_lengths[keyl][x] + except KeyError: + value = raw_value + else: + value = (length - len(raw_value)) * " " + raw_value + if "\n" in value: + raise ValueError("'\\n' not allowed in component of " + "multivalued field %s" % key) + fd.write(" %s" % value) + fd.write("\n") + return fd.getvalue().rstrip("\n") + else: + return Deb822.get_as_string(self, key) + + +class _gpg_multivalued(_multivalued): + """A _multivalued class that can support gpg signed objects + + This class's feature is that it stores the raw text before parsing so that + gpg can verify the signature. Use it just like you would use the + _multivalued class. + + This class only stores raw text if it is given a raw string, or if it + detects a gpg signature when given a file or sequence of lines (see + Deb822.split_gpg_and_payload for details). + """ + + def __init__(self, *args, **kwargs): + try: + sequence = args[0] + except IndexError: + sequence = kwargs.get("sequence", None) + + if sequence is not None: + if isinstance(sequence, bytes): + self.raw_text = sequence + elif isinstance(sequence, six.string_types): + # If the file is really in some other encoding, then this + # probably won't verify correctly, but this is the best we + # can reasonably manage. For accurate verification, the + # file should be opened in binary mode. + self.raw_text = sequence.encode('utf-8') + elif hasattr(sequence, "items"): + # sequence is actually a dict(-like) object, so we don't have + # the raw text. + pass + else: + try: + gpg_pre_lines, lines, gpg_post_lines = \ + self.split_gpg_and_payload(sequence) + except EOFError: + # Empty input + gpg_pre_lines = lines = gpg_post_lines = [] + if gpg_pre_lines and gpg_post_lines: + raw_text = BytesIO() + raw_text.write(b"\n".join(gpg_pre_lines)) + raw_text.write(b"\n\n") + raw_text.write(b"\n".join(lines)) + raw_text.write(b"\n\n") + raw_text.write(b"\n".join(gpg_post_lines)) + self.raw_text = raw_text.getvalue() + try: + args = list(args) + args[0] = lines + except IndexError: + kwargs["sequence"] = lines + + _multivalued.__init__(self, *args, **kwargs) + + +class Dsc(_gpg_multivalued): + _multivalued_fields = { + "files": [ "md5sum", "size", "name" ], + "checksums-sha1": ["sha1", "size", "name"], + "checksums-sha256": ["sha256", "size", "name"], + } + + +class Changes(_gpg_multivalued): + _multivalued_fields = { + "files": [ "md5sum", "size", "section", "priority", "name" ], + "checksums-sha1": ["sha1", "size", "name"], + "checksums-sha256": ["sha256", "size", "name"], + } + + def get_pool_path(self): + """Return the path in the pool where the files would be installed""" + + # This is based on the section listed for the first file. While + # it is possible, I think, for a package to provide files in multiple + # sections, I haven't seen it in practice. In any case, this should + # probably detect such a situation and complain, or return a list... + + s = self['files'][0]['section'] + + try: + section, subsection = s.split('/') + except ValueError: + # main is implicit + section = 'main' + + if self['source'].startswith('lib'): + subdir = self['source'][:4] + else: + subdir = self['source'][0] + + return 'pool/%s/%s/%s' % (section, subdir, self['source']) + + +class PdiffIndex(_multivalued): + _multivalued_fields = { + "sha1-current": [ "SHA1", "size" ], + "sha1-history": [ "SHA1", "size", "date" ], + "sha1-patches": [ "SHA1", "size", "date" ], + } + + @property + def _fixed_field_lengths(self): + fixed_field_lengths = {} + for key in self._multivalued_fields: + if hasattr(self[key], 'keys'): + # Not multi-line -- don't need to compute the field length for + # this one + continue + length = self._get_size_field_length(key) + fixed_field_lengths[key] = {"size": length} + return fixed_field_lengths + + def _get_size_field_length(self, key): + lengths = [len(str(item['size'])) for item in self[key]] + return max(lengths) + + +class Release(_multivalued): + """Represents a Release file + + Set the size_field_behavior attribute to "dak" to make the size field + length only as long as the longest actual value. The default, + "apt-ftparchive" makes the field 16 characters long regardless. + """ + # FIXME: Add support for detecting the behavior of the input, if + # constructed from actual 822 text. + + _multivalued_fields = { + "md5sum": [ "md5sum", "size", "name" ], + "sha1": [ "sha1", "size", "name" ], + "sha256": [ "sha256", "size", "name" ], + } + + __size_field_behavior = "apt-ftparchive" + def set_size_field_behavior(self, value): + if value not in ["apt-ftparchive", "dak"]: + raise ValueError("size_field_behavior must be either " + "'apt-ftparchive' or 'dak'") + else: + self.__size_field_behavior = value + size_field_behavior = property(lambda self: self.__size_field_behavior, + set_size_field_behavior) + + @property + def _fixed_field_lengths(self): + fixed_field_lengths = {} + for key in self._multivalued_fields: + length = self._get_size_field_length(key) + fixed_field_lengths[key] = {"size": length} + return fixed_field_lengths + + def _get_size_field_length(self, key): + if self.size_field_behavior == "apt-ftparchive": + return 16 + elif self.size_field_behavior == "dak": + lengths = [len(str(item['size'])) for item in self[key]] + return max(lengths) + + +class Sources(Dsc, _PkgRelationMixin): + """Represent an APT source package list""" + + _relationship_fields = [ 'build-depends', 'build-depends-indep', + 'build-conflicts', 'build-conflicts-indep', 'binary' ] + + def __init__(self, *args, **kwargs): + Dsc.__init__(self, *args, **kwargs) + _PkgRelationMixin.__init__(self, *args, **kwargs) + + +class Packages(Deb822, _PkgRelationMixin): + """Represent an APT binary package list""" + + _relationship_fields = [ 'depends', 'pre-depends', 'recommends', + 'suggests', 'breaks', 'conflicts', 'provides', 'replaces', + 'enhances' ] + + def __init__(self, *args, **kwargs): + Deb822.__init__(self, *args, **kwargs) + _PkgRelationMixin.__init__(self, *args, **kwargs) + + +class _CaseInsensitiveString(str): + """Case insensitive string. + """ + + def __new__(cls, str_): + s = str.__new__(cls, str_) + s.str_lower = str_.lower() + s.str_lower_hash = hash(s.str_lower) + return s + + def __hash__(self): + return self.str_lower_hash + + def __eq__(self, other): + return self.str_lower == other.lower() + + def lower(self): + return self.str_lower + + +_strI = _CaseInsensitiveString |