summaryrefslogtreecommitdiff
path: root/debian/foo/deb822.py
diff options
context:
space:
mode:
Diffstat (limited to 'debian/foo/deb822.py')
-rw-r--r--debian/foo/deb822.py1318
1 files changed, 1318 insertions, 0 deletions
diff --git a/debian/foo/deb822.py b/debian/foo/deb822.py
new file mode 100644
index 0000000..bd910d6
--- /dev/null
+++ b/debian/foo/deb822.py
@@ -0,0 +1,1318 @@
+# vim: fileencoding=utf-8
+#
+# A python interface for various rfc822-like formatted files used by Debian
+# (.changes, .dsc, Packages, Sources, etc)
+#
+# Copyright (C) 2005-2006 dann frazier <dannf@dannf.org>
+# Copyright (C) 2006-2010 John Wright <john@johnwright.org>
+# Copyright (C) 2006 Adeodato Simó <dato@net.com.org.es>
+# Copyright (C) 2008 Stefano Zacchiroli <zack@upsilon.cc>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation, either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+
+from __future__ import absolute_import, print_function
+
+from debian.deprecation import function_deprecated_by
+
+try:
+ import apt_pkg
+ # This module uses apt_pkg only for its TagFile interface.
+ apt_pkg.TagFile
+ _have_apt_pkg = True
+except (ImportError, AttributeError):
+ _have_apt_pkg = False
+
+import chardet
+import os
+import re
+import subprocess
+import sys
+import warnings
+
+try:
+ from StringIO import StringIO
+ BytesIO = StringIO
+except ImportError:
+ from io import BytesIO, StringIO
+try:
+ from collections import Mapping, MutableMapping
+ _mapping_mixin = Mapping
+ _mutable_mapping_mixin = MutableMapping
+except ImportError:
+ from UserDict import DictMixin
+ _mapping_mixin = DictMixin
+ _mutable_mapping_mixin = DictMixin
+
+import six
+
+if sys.version >= '3':
+ import io
+ def _is_real_file(f):
+ if not isinstance(f, io.IOBase):
+ return False
+ try:
+ f.fileno()
+ return True
+ except (AttributeError, io.UnsupportedOperation):
+ return False
+else:
+ def _is_real_file(f):
+ return isinstance(f, file) and hasattr(f, 'fileno')
+
+
+GPGV_DEFAULT_KEYRINGS = frozenset(['/usr/share/keyrings/debian-keyring.gpg'])
+GPGV_EXECUTABLE = '/usr/bin/gpgv'
+
+
+class TagSectionWrapper(_mapping_mixin, object):
+ """Wrap a TagSection object, using its find_raw method to get field values
+
+ This allows us to pick which whitespace to strip off the beginning and end
+ of the data, so we don't lose leading newlines.
+ """
+
+ def __init__(self, section):
+ self.__section = section
+
+ def __iter__(self):
+ for key in self.__section.keys():
+ if not key.startswith('#'):
+ yield key
+
+ def __len__(self):
+ return len([key for key in self.__section.keys()
+ if not key.startswith('#')])
+
+ def __getitem__(self, key):
+ s = self.__section.find_raw(key)
+
+ if s is None:
+ raise KeyError(key)
+
+ # Get just the stuff after the first ':'
+ # Could use s.partition if we only supported python >= 2.5
+ data = s[s.find(b':')+1:]
+
+ # Get rid of spaces and tabs after the ':', but not newlines, and strip
+ # off any newline at the end of the data.
+ return data.lstrip(b' \t').rstrip(b'\n')
+
+
+class OrderedSet(object):
+ """A set-like object that preserves order when iterating over it
+
+ We use this to keep track of keys in Deb822Dict, because it's much faster
+ to look up if a key is in a set than in a list.
+ """
+
+ def __init__(self, iterable=[]):
+ self.__set = set()
+ self.__order = []
+ for item in iterable:
+ self.add(item)
+
+ def add(self, item):
+ if item not in self:
+ # set.add will raise TypeError if something's unhashable, so we
+ # don't have to handle that ourselves
+ self.__set.add(item)
+ self.__order.append(item)
+
+ def remove(self, item):
+ # set.remove will raise KeyError, so we don't need to handle that
+ # ourselves
+ self.__set.remove(item)
+ self.__order.remove(item)
+
+ def __iter__(self):
+ # Return an iterator of items in the order they were added
+ return iter(self.__order)
+
+ def __len__(self):
+ return len(self.__order)
+
+ def __contains__(self, item):
+ # This is what makes OrderedSet faster than using a list to keep track
+ # of keys. Lookup in a set is O(1) instead of O(n) for a list.
+ return item in self.__set
+
+ ### list-like methods
+ append = add
+
+ def extend(self, iterable):
+ for item in iterable:
+ self.add(item)
+ ###
+
+
+class Deb822Dict(_mutable_mapping_mixin, object):
+ # Subclassing _mutable_mapping_mixin because we're overriding so much
+ # dict functionality that subclassing dict requires overriding many more
+ # than the methods that _mutable_mapping_mixin requires.
+ """A dictionary-like object suitable for storing RFC822-like data.
+
+ Deb822Dict behaves like a normal dict, except:
+ - key lookup is case-insensitive
+ - key order is preserved
+ - if initialized with a _parsed parameter, it will pull values from
+ that dictionary-like object as needed (rather than making a copy).
+ The _parsed dict is expected to be able to handle case-insensitive
+ keys.
+
+ If _parsed is not None, an optional _fields parameter specifies which keys
+ in the _parsed dictionary are exposed.
+ """
+
+ # See the end of the file for the definition of _strI
+
+ def __init__(self, _dict=None, _parsed=None, _fields=None,
+ encoding="utf-8"):
+ self.__dict = {}
+ self.__keys = OrderedSet()
+ self.__parsed = None
+ self.encoding = encoding
+
+ if _dict is not None:
+ # _dict may be a dict or a list of two-sized tuples
+ if hasattr(_dict, 'items'):
+ items = _dict.items()
+ else:
+ items = list(_dict)
+
+ try:
+ for k, v in items:
+ self[k] = v
+ except ValueError:
+ this = len(self.__keys)
+ len_ = len(items[this])
+ raise ValueError('dictionary update sequence element #%d has '
+ 'length %d; 2 is required' % (this, len_))
+
+ if _parsed is not None:
+ self.__parsed = _parsed
+ if _fields is None:
+ self.__keys.extend([ _strI(k) for k in self.__parsed ])
+ else:
+ self.__keys.extend([ _strI(f) for f in _fields if f in self.__parsed ])
+
+ def _detect_encoding(self, value):
+ """If value is not already Unicode, decode it intelligently."""
+ if isinstance(value, bytes):
+ try:
+ return value.decode(self.encoding)
+ except UnicodeDecodeError as e:
+ # Evidently, the value wasn't encoded with the encoding the
+ # user specified. Try detecting it.
+ warnings.warn('decoding from %s failed; attempting to detect '
+ 'the true encoding' % self.encoding,
+ UnicodeWarning)
+ result = chardet.detect(value)
+ try:
+ return value.decode(result['encoding'])
+ except UnicodeDecodeError:
+ raise e
+ else:
+ # Assume the rest of the paragraph is in this encoding as
+ # well (there's no sense in repeating this exercise for
+ # every field).
+ self.encoding = result['encoding']
+ else:
+ return value
+
+ ### BEGIN _mutable_mapping_mixin methods
+
+ def __iter__(self):
+ for key in self.__keys:
+ yield str(key)
+
+ def __len__(self):
+ return len(self.__keys)
+
+ def __setitem__(self, key, value):
+ key = _strI(key)
+ self.__keys.add(key)
+ self.__dict[key] = value
+
+ def __getitem__(self, key):
+ key = _strI(key)
+ try:
+ value = self.__dict[key]
+ except KeyError:
+ if self.__parsed is not None and key in self.__keys:
+ value = self.__parsed[key]
+ else:
+ raise
+
+ return self._detect_encoding(value)
+
+ def __delitem__(self, key):
+ key = _strI(key)
+ self.__keys.remove(key)
+ try:
+ del self.__dict[key]
+ except KeyError:
+ # If we got this far, the key was in self.__keys, so it must have
+ # only been in the self.__parsed dict.
+ pass
+
+ def __contains__(self, key):
+ key = _strI(key)
+ return key in self.__keys
+
+ if sys.version < '3':
+ has_key = __contains__
+
+ ### END _mutable_mapping_mixin methods
+
+ def __repr__(self):
+ return '{%s}' % ', '.join(['%r: %r' % (k, v) for k, v in self.items()])
+
+ def __eq__(self, other):
+ mykeys = sorted(self)
+ otherkeys = sorted(other)
+ if not mykeys == otherkeys:
+ return False
+
+ for key in mykeys:
+ if self[key] != other[key]:
+ return False
+
+ # If we got here, everything matched
+ return True
+
+ # Overriding __eq__ blocks inheritance of __hash__ in Python 3, and
+ # instances of this class are not sensibly hashable anyway.
+ __hash__ = None
+
+ def copy(self):
+ # Use self.__class__ so this works as expected for subclasses
+ copy = self.__class__(self)
+ return copy
+
+ # TODO implement __str__() and make dump() use that?
+
+
+class Deb822(Deb822Dict):
+
+ def __init__(self, sequence=None, fields=None, _parsed=None,
+ encoding="utf-8"):
+ """Create a new Deb822 instance.
+
+ :param sequence: a string, or any any object that returns a line of
+ input each time, normally a file. Alternately, sequence can
+ be a dict that contains the initial key-value pairs.
+
+ :param fields: if given, it is interpreted as a list of fields that
+ should be parsed (the rest will be discarded).
+
+ :param _parsed: internal parameter.
+
+ :param encoding: When parsing strings, interpret them in this encoding.
+ (All values are given back as unicode objects, so an encoding is
+ necessary in order to properly interpret the strings.)
+ """
+
+ if hasattr(sequence, 'items'):
+ _dict = sequence
+ sequence = None
+ else:
+ _dict = None
+ Deb822Dict.__init__(self, _dict=_dict, _parsed=_parsed, _fields=fields,
+ encoding=encoding)
+
+ if sequence is not None:
+ try:
+ self._internal_parser(sequence, fields)
+ except EOFError:
+ pass
+
+ self.gpg_info = None
+
+ def iter_paragraphs(cls, sequence, fields=None, use_apt_pkg=True,
+ shared_storage=False, encoding="utf-8"):
+ """Generator that yields a Deb822 object for each paragraph in sequence.
+
+ :param sequence: same as in __init__.
+
+ :param fields: likewise.
+
+ :param use_apt_pkg: if sequence is a file, apt_pkg will be used
+ if available to parse the file, since it's much much faster. Set
+ this parameter to False to disable using apt_pkg.
+ :param shared_storage: not used, here for historical reasons. Deb822
+ objects never use shared storage anymore.
+ :param encoding: Interpret the paragraphs in this encoding.
+ (All values are given back as unicode objects, so an encoding is
+ necessary in order to properly interpret the strings.)
+ """
+
+ if _have_apt_pkg and use_apt_pkg and _is_real_file(sequence):
+ kwargs = {}
+ if sys.version >= '3':
+ # bytes=True is supported for both Python 2 and 3, but we
+ # only actually need it for Python 3, so this saves us from
+ # having to require a newer version of python-apt for Python
+ # 2 as well. This allows us to apply our own encoding
+ # handling, which is more tolerant of mixed-encoding files.
+ kwargs['bytes'] = True
+ parser = apt_pkg.TagFile(sequence, **kwargs)
+ for section in parser:
+ paragraph = cls(fields=fields,
+ _parsed=TagSectionWrapper(section),
+ encoding=encoding)
+ if paragraph:
+ yield paragraph
+
+ else:
+ iterable = iter(sequence)
+ x = cls(iterable, fields, encoding=encoding)
+ while len(x) != 0:
+ yield x
+ x = cls(iterable, fields, encoding=encoding)
+
+ iter_paragraphs = classmethod(iter_paragraphs)
+
+ ###
+
+ @staticmethod
+ def _skip_useless_lines(sequence):
+ """Yields only lines that do not begin with '#'.
+
+ Also skips any blank lines at the beginning of the input.
+ """
+ at_beginning = True
+ for line in sequence:
+ # The bytes/str polymorphism required here to support Python 3
+ # is unpleasant, but fortunately limited. We need this because
+ # at this point we might have been given either bytes or
+ # Unicode, and we haven't yet got to the point where we can try
+ # to decode a whole paragraph and detect its encoding.
+ if isinstance(line, bytes):
+ if line.startswith(b'#'):
+ continue
+ else:
+ if line.startswith('#'):
+ continue
+ if at_beginning:
+ if isinstance(line, bytes):
+ if not line.rstrip(b'\r\n'):
+ continue
+ else:
+ if not line.rstrip('\r\n'):
+ continue
+ at_beginning = False
+ yield line
+
+ def _internal_parser(self, sequence, fields=None):
+ # The key is non-whitespace, non-colon characters before any colon.
+ key_part = r"^(?P<key>[^: \t\n\r\f\v]+)\s*:\s*"
+ single = re.compile(key_part + r"(?P<data>\S.*?)\s*$")
+ multi = re.compile(key_part + r"$")
+ multidata = re.compile(r"^\s(?P<data>.+?)\s*$")
+
+ wanted_field = lambda f: fields is None or f in fields
+
+ if isinstance(sequence, (six.string_types, bytes)):
+ sequence = sequence.splitlines()
+
+ curkey = None
+ content = ""
+
+ for line in self.gpg_stripped_paragraph(
+ self._skip_useless_lines(sequence)):
+ line = self._detect_encoding(line)
+
+ m = single.match(line)
+ if m:
+ if curkey:
+ self[curkey] = content
+
+ if not wanted_field(m.group('key')):
+ curkey = None
+ continue
+
+ curkey = m.group('key')
+ content = m.group('data')
+ continue
+
+ m = multi.match(line)
+ if m:
+ if curkey:
+ self[curkey] = content
+
+ if not wanted_field(m.group('key')):
+ curkey = None
+ continue
+
+ curkey = m.group('key')
+ content = ""
+ continue
+
+ m = multidata.match(line)
+ if m:
+ content += '\n' + line # XXX not m.group('data')?
+ continue
+
+ if curkey:
+ self[curkey] = content
+
+ def __str__(self):
+ return self.dump()
+
+ def __unicode__(self):
+ return self.dump()
+
+ if sys.version >= '3':
+ def __bytes__(self):
+ return self.dump().encode(self.encoding)
+
+ # __repr__ is handled by Deb822Dict
+
+ def get_as_string(self, key):
+ """Return the self[key] as a string (or unicode)
+
+ The default implementation just returns unicode(self[key]); however,
+ this can be overridden in subclasses (e.g. _multivalued) that can take
+ special values.
+ """
+ return six.text_type(self[key])
+
+ def dump(self, fd=None, encoding=None):
+ """Dump the the contents in the original format
+
+ If fd is None, return a unicode object.
+
+ If fd is not None, attempt to encode the output to the encoding the
+ object was initialized with, or the value of the encoding argument if
+ it is not None. This will raise UnicodeEncodeError if the encoding
+ can't support all the characters in the Deb822Dict values.
+ """
+
+ if fd is None:
+ fd = StringIO()
+ return_string = True
+ else:
+ return_string = False
+
+ if encoding is None:
+ # Use the encoding we've been using to decode strings with if none
+ # was explicitly specified
+ encoding = self.encoding
+
+ for key in self:
+ value = self.get_as_string(key)
+ if not value or value[0] == '\n':
+ # Avoid trailing whitespace after "Field:" if it's on its own
+ # line or the value is empty. We don't have to worry about the
+ # case where value == '\n', since we ensure that is not the
+ # case in __setitem__.
+ entry = '%s:%s\n' % (key, value)
+ else:
+ entry = '%s: %s\n' % (key, value)
+ if not return_string:
+ fd.write(entry.encode(encoding))
+ else:
+ fd.write(entry)
+ if return_string:
+ return fd.getvalue()
+
+ ###
+
+ def is_single_line(self, s):
+ if s.count("\n"):
+ return False
+ else:
+ return True
+
+ isSingleLine = function_deprecated_by(is_single_line)
+
+ def is_multi_line(self, s):
+ return not self.is_single_line(s)
+
+ isMultiLine = function_deprecated_by(is_multi_line)
+
+ def _merge_fields(self, s1, s2):
+ if not s2:
+ return s1
+ if not s1:
+ return s2
+
+ if self.is_single_line(s1) and self.is_single_line(s2):
+ ## some fields are delimited by a single space, others
+ ## a comma followed by a space. this heuristic assumes
+ ## that there are multiple items in one of the string fields
+ ## so that we can pick up on the delimiter being used
+ delim = ' '
+ if (s1 + s2).count(', '):
+ delim = ', '
+
+ L = sorted((s1 + delim + s2).split(delim))
+
+ prev = merged = L[0]
+
+ for item in L[1:]:
+ ## skip duplicate entries
+ if item == prev:
+ continue
+ merged = merged + delim + item
+ prev = item
+ return merged
+
+ if self.is_multi_line(s1) and self.is_multi_line(s2):
+ for item in s2.splitlines(True):
+ if item not in s1.splitlines(True):
+ s1 = s1 + "\n" + item
+ return s1
+
+ raise ValueError
+
+ _mergeFields = function_deprecated_by(_merge_fields)
+
+ def merge_fields(self, key, d1, d2=None):
+ ## this method can work in two ways - abstract that away
+ if d2 == None:
+ x1 = self
+ x2 = d1
+ else:
+ x1 = d1
+ x2 = d2
+
+ ## we only have to do work if both objects contain our key
+ ## otherwise, we just take the one that does, or raise an
+ ## exception if neither does
+ if key in x1 and key in x2:
+ merged = self._mergeFields(x1[key], x2[key])
+ elif key in x1:
+ merged = x1[key]
+ elif key in x2:
+ merged = x2[key]
+ else:
+ raise KeyError
+
+ ## back to the two different ways - if this method was called
+ ## upon an object, update that object in place.
+ ## return nothing in this case, to make the author notice a
+ ## problem if she assumes the object itself will not be modified
+ if d2 == None:
+ self[key] = merged
+ return None
+
+ return merged
+
+ mergeFields = function_deprecated_by(merge_fields)
+
+ def split_gpg_and_payload(sequence):
+ """Return a (gpg_pre, payload, gpg_post) tuple
+
+ Each element of the returned tuple is a list of lines (with trailing
+ whitespace stripped).
+ """
+
+ gpg_pre_lines = []
+ lines = []
+ gpg_post_lines = []
+ state = b'SAFE'
+ gpgre = re.compile(br'^-----(?P<action>BEGIN|END) PGP (?P<what>[^-]+)-----$')
+ blank_line = re.compile(b'^$')
+ first_line = True
+
+ for line in sequence:
+ # Some consumers of this method require bytes (encoding
+ # detection and signature checking). However, we might have
+ # been given a file opened in text mode, in which case it's
+ # simplest to encode to bytes.
+ if sys.version >= '3' and isinstance(line, str):
+ line = line.encode()
+
+ line = line.strip(b'\r\n')
+
+ # skip initial blank lines, if any
+ if first_line:
+ if blank_line.match(line):
+ continue
+ else:
+ first_line = False
+
+ m = gpgre.match(line)
+
+ if not m:
+ if state == b'SAFE':
+ if not blank_line.match(line):
+ lines.append(line)
+ else:
+ if not gpg_pre_lines:
+ # There's no gpg signature, so we should stop at
+ # this blank line
+ break
+ elif state == b'SIGNED MESSAGE':
+ if blank_line.match(line):
+ state = b'SAFE'
+ else:
+ gpg_pre_lines.append(line)
+ elif state == b'SIGNATURE':
+ gpg_post_lines.append(line)
+ else:
+ if m.group('action') == b'BEGIN':
+ state = m.group('what')
+ elif m.group('action') == b'END':
+ gpg_post_lines.append(line)
+ break
+ if not blank_line.match(line):
+ if not lines:
+ gpg_pre_lines.append(line)
+ else:
+ gpg_post_lines.append(line)
+
+ if len(lines):
+ return (gpg_pre_lines, lines, gpg_post_lines)
+ else:
+ raise EOFError('only blank lines found in input')
+
+ split_gpg_and_payload = staticmethod(split_gpg_and_payload)
+
+ def gpg_stripped_paragraph(cls, sequence):
+ return cls.split_gpg_and_payload(sequence)[1]
+
+ gpg_stripped_paragraph = classmethod(gpg_stripped_paragraph)
+
+ def get_gpg_info(self, keyrings=None):
+ """Return a GpgInfo object with GPG signature information
+
+ This method will raise ValueError if the signature is not available
+ (e.g. the original text cannot be found).
+
+ :param keyrings: list of keyrings to use (see GpgInfo.from_sequence)
+ """
+
+ # raw_text is saved (as a string) only for Changes and Dsc (see
+ # _gpg_multivalued.__init__) which is small compared to Packages or
+ # Sources which contain no signature
+ if not hasattr(self, 'raw_text'):
+ raise ValueError("original text cannot be found")
+
+ if self.gpg_info is None:
+ self.gpg_info = GpgInfo.from_sequence(self.raw_text,
+ keyrings=keyrings)
+
+ return self.gpg_info
+
+ def validate_input(self, key, value):
+ """Raise ValueError if value is not a valid value for key
+
+ Subclasses that do interesting things for different keys may wish to
+ override this method.
+ """
+
+ # The value cannot end in a newline (if it did, dumping the object
+ # would result in multiple stanzas)
+ if value.endswith('\n'):
+ raise ValueError("value must not end in '\\n'")
+
+ # Make sure there are no blank lines (actually, the first one is
+ # allowed to be blank, but no others), and each subsequent line starts
+ # with whitespace
+ for line in value.splitlines()[1:]:
+ if not line:
+ raise ValueError("value must not have blank lines")
+ if not line[0].isspace():
+ raise ValueError("each line must start with whitespace")
+
+ def __setitem__(self, key, value):
+ self.validate_input(key, value)
+ Deb822Dict.__setitem__(self, key, value)
+
+
+# XXX check what happens if input contains more that one signature
+class GpgInfo(dict):
+ """A wrapper around gnupg parsable output obtained via --status-fd
+
+ This class is really a dictionary containing parsed output from gnupg plus
+ some methods to make sense of the data.
+ Keys are keywords and values are arguments suitably splitted.
+ See /usr/share/doc/gnupg/DETAILS.gz"""
+
+ # keys with format "key keyid uid"
+ uidkeys = ('GOODSIG', 'EXPSIG', 'EXPKEYSIG', 'REVKEYSIG', 'BADSIG')
+
+ def valid(self):
+ """Is the signature valid?"""
+ return 'GOODSIG' in self or 'VALIDSIG' in self
+
+# XXX implement as a property?
+# XXX handle utf-8 %-encoding
+ def uid(self):
+ """Return the primary ID of the signee key, None is not available"""
+ pass
+
+ @classmethod
+ def from_output(cls, out, err=None):
+ """Create a new GpgInfo object from gpg(v) --status-fd output (out) and
+ optionally collect stderr as well (err).
+
+ Both out and err can be lines in newline-terminated sequence or regular strings."""
+
+ n = cls()
+
+ if isinstance(out, six.string_types):
+ out = out.split('\n')
+ if isinstance(err, six.string_types):
+ err = err.split('\n')
+
+ n.out = out
+ n.err = err
+
+ header = '[GNUPG:] '
+ for l in out:
+ if not l.startswith(header):
+ continue
+
+ l = l[len(header):]
+ l = l.strip('\n')
+
+ # str.partition() would be better, 2.5 only though
+ s = l.find(' ')
+ key = l[:s]
+ if key in cls.uidkeys:
+ # value is "keyid UID", don't split UID
+ value = l[s+1:].split(' ', 1)
+ else:
+ value = l[s+1:].split(' ')
+
+ n[key] = value
+ return n
+
+ @classmethod
+ def from_sequence(cls, sequence, keyrings=None, executable=None):
+ """Create a new GpgInfo object from the given sequence.
+
+ :param sequence: sequence of lines of bytes or a single byte string
+
+ :param keyrings: list of keyrings to use (default:
+ ['/usr/share/keyrings/debian-keyring.gpg'])
+
+ :param executable: list of args for subprocess.Popen, the first element
+ being the gpgv executable (default: ['/usr/bin/gpgv'])
+ """
+
+ keyrings = keyrings or GPGV_DEFAULT_KEYRINGS
+ executable = executable or [GPGV_EXECUTABLE]
+
+ # XXX check for gpg as well and use --verify accordingly?
+ args = list(executable)
+ #args.extend(["--status-fd", "1", "--no-default-keyring"])
+ args.extend(["--status-fd", "1"])
+ for k in keyrings:
+ args.extend(["--keyring", k])
+
+ if "--keyring" not in args:
+ raise IOError("cannot access any of the given keyrings")
+
+ p = subprocess.Popen(args, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ universal_newlines=False)
+ # XXX what to do with exit code?
+
+ if isinstance(sequence, bytes):
+ inp = sequence
+ else:
+ inp = cls._get_full_bytes(sequence)
+ out, err = p.communicate(inp)
+
+ return cls.from_output(out.decode('utf-8'),
+ err.decode('utf-8'))
+
+ @staticmethod
+ def _get_full_bytes(sequence):
+ """Return a byte string from a sequence of lines of bytes.
+
+ This method detects if the sequence's lines are newline-terminated, and
+ constructs the byte string appropriately.
+ """
+ # Peek at the first line to see if it's newline-terminated.
+ sequence_iter = iter(sequence)
+ try:
+ first_line = next(sequence_iter)
+ except StopIteration:
+ return b""
+ join_str = b'\n'
+ if first_line.endswith(b'\n'):
+ join_str = b''
+ return first_line + join_str + join_str.join(sequence_iter)
+
+ @classmethod
+ def from_file(cls, target, *args, **kwargs):
+ """Create a new GpgInfo object from the given file.
+
+ See GpgInfo.from_sequence.
+ """
+ with open(target, 'rb') as target_file:
+ return cls.from_sequence(target_file, *args, **kwargs)
+
+
+class PkgRelation(object):
+ """Inter-package relationships
+
+ Structured representation of the relationships of a package to another,
+ i.e. of what can appear in a Deb882 field like Depends, Recommends,
+ Suggests, ... (see Debian Policy 7.1).
+ """
+
+ # XXX *NOT* a real dependency parser, and that is not even a goal here, we
+ # just parse as much as we need to split the various parts composing a
+ # dependency, checking their correctness wrt policy is out of scope
+ __dep_RE = re.compile( \
+ r'^\s*(?P<name>[a-zA-Z0-9.+\-]{2,})(\s*\(\s*(?P<relop>[>=<]+)\s*(?P<version>[0-9a-zA-Z:\-+~.]+)\s*\))?(\s*\[(?P<archs>[\s!\w\-]+)\])?\s*$')
+ __comma_sep_RE = re.compile(r'\s*,\s*')
+ __pipe_sep_RE = re.compile(r'\s*\|\s*')
+ __blank_sep_RE = re.compile(r'\s*')
+
+ @classmethod
+ def parse_relations(cls, raw):
+ """Parse a package relationship string (i.e. the value of a field like
+ Depends, Recommends, Build-Depends ...)
+ """
+ def parse_archs(raw):
+ # assumption: no space beween '!' and architecture name
+ archs = []
+ for arch in cls.__blank_sep_RE.split(raw.strip()):
+ if len(arch) and arch[0] == '!':
+ archs.append((False, arch[1:]))
+ else:
+ archs.append((True, arch))
+ return archs
+
+ def parse_rel(raw):
+ match = cls.__dep_RE.match(raw)
+ if match:
+ parts = match.groupdict()
+ d = { 'name': parts['name'] }
+ if not (parts['relop'] is None or parts['version'] is None):
+ d['version'] = (parts['relop'], parts['version'])
+ else:
+ d['version'] = None
+ if parts['archs'] is None:
+ d['arch'] = None
+ else:
+ d['arch'] = parse_archs(parts['archs'])
+ return d
+ else:
+ print('deb822.py: WARNING: cannot parse package' \
+ ' relationship "%s", returning it raw' % raw,
+ file=sys.stderr)
+ return { 'name': raw, 'version': None, 'arch': None }
+
+ tl_deps = cls.__comma_sep_RE.split(raw.strip()) # top-level deps
+ cnf = map(cls.__pipe_sep_RE.split, tl_deps)
+ return [[parse_rel(or_dep) for or_dep in or_deps] for or_deps in cnf]
+
+ @staticmethod
+ def str(rels):
+ """Format to string structured inter-package relationships
+
+ Perform the inverse operation of parse_relations, returning a string
+ suitable to be written in a package stanza.
+ """
+ def pp_arch(arch_spec):
+ (excl, arch) = arch_spec
+ if excl:
+ return arch
+ else:
+ return '!' + arch
+
+ def pp_atomic_dep(dep):
+ s = dep['name']
+ if dep.get('version') is not None:
+ s += ' (%s %s)' % dep['version']
+ if dep.get('arch') is not None:
+ s += ' [%s]' % ' '.join(map(pp_arch, dep['arch']))
+ return s
+
+ pp_or_dep = lambda deps: ' | '.join(map(pp_atomic_dep, deps))
+ return ', '.join(map(pp_or_dep, rels))
+
+
+class _lowercase_dict(dict):
+ """Dictionary wrapper which lowercase keys upon lookup."""
+
+ def __getitem__(self, key):
+ return dict.__getitem__(self, key.lower())
+
+
+class _PkgRelationMixin(object):
+ """Package relationship mixin
+
+ Inheriting from this mixin you can extend a Deb882 object with attributes
+ letting you access inter-package relationship in a structured way, rather
+ than as strings. For example, while you can usually use pkg['depends'] to
+ obtain the Depends string of package pkg, mixing in with this class you
+ gain pkg.depends to access Depends as a Pkgrel instance
+
+ To use, subclass _PkgRelationMixin from a class with a _relationship_fields
+ attribute. It should be a list of field names for which structured access
+ is desired; for each of them a method wild be added to the inherited class.
+ The method name will be the lowercase version of field name; '-' will be
+ mangled as '_'. The method would return relationships in the same format of
+ the PkgRelation' relations property.
+
+ See Packages and Sources as examples.
+ """
+
+ def __init__(self, *args, **kwargs):
+ self.__relations = _lowercase_dict({})
+ self.__parsed_relations = False
+ for name in self._relationship_fields:
+ # To avoid reimplementing Deb822 key lookup logic we use a really
+ # simple dict subclass which just lowercase keys upon lookup. Since
+ # dictionary building happens only here, we ensure that all keys
+ # are in fact lowercase.
+ # With this trick we enable users to use the same key (i.e. field
+ # name) of Deb822 objects on the dictionary returned by the
+ # relations property.
+ keyname = name.lower()
+ if name in self:
+ self.__relations[keyname] = None # lazy value
+ # all lazy values will be expanded before setting
+ # __parsed_relations to True
+ else:
+ self.__relations[keyname] = []
+
+ @property
+ def relations(self):
+ """Return a dictionary of inter-package relationships among the current
+ and other packages.
+
+ Dictionary keys depend on the package kind. Binary packages have keys
+ like 'depends', 'recommends', ... while source packages have keys like
+ 'build-depends', 'build-depends-indep' and so on. See the Debian policy
+ for the comprehensive field list.
+
+ Dictionary values are package relationships returned as lists of lists
+ of dictionaries (see below for some examples).
+
+ The encoding of package relationships is as follows:
+ - the top-level lists corresponds to the comma-separated list of
+ Deb822, their components form a conjuction, i.e. they have to be
+ AND-ed together
+ - the inner lists corresponds to the pipe-separated list of Deb822,
+ their components form a disjunction, i.e. they have to be OR-ed
+ together
+ - member of the inner lists are dictionaries with the following keys:
+ - name: package (or virtual package) name
+ - version: A pair <operator, version> if the relationship is
+ versioned, None otherwise. operator is one of "<<",
+ "<=", "=", ">=", ">>"; version is the given version as
+ a string.
+ - arch: A list of pairs <polarity, architecture> if the
+ relationship is architecture specific, None otherwise.
+ Polarity is a boolean (false if the architecture is
+ negated with "!", true otherwise), architecture the
+ Debian archtiecture name as a string.
+
+ Examples:
+
+ "emacs | emacsen, make, debianutils (>= 1.7)" becomes
+ [ [ {'name': 'emacs'}, {'name': 'emacsen'} ],
+ [ {'name': 'make'} ],
+ [ {'name': 'debianutils', 'version': ('>=', '1.7')} ] ]
+
+ "tcl8.4-dev, procps [!hurd-i386]" becomes
+ [ [ {'name': 'tcl8.4-dev'} ],
+ [ {'name': 'procps', 'arch': (false, 'hurd-i386')} ] ]
+ """
+ if not self.__parsed_relations:
+ lazy_rels = filter(lambda n: self.__relations[n] is None,
+ self.__relations.keys())
+ for n in lazy_rels:
+ self.__relations[n] = PkgRelation.parse_relations(self[n])
+ self.__parsed_relations = True
+ return self.__relations
+
+
+class _multivalued(Deb822):
+ """A class with (R/W) support for multivalued fields.
+
+ To use, create a subclass with a _multivalued_fields attribute. It should
+ be a dictionary with *lower-case* keys, with lists of human-readable
+ identifiers of the fields as the values. Please see Dsc, Changes, and
+ PdiffIndex as examples.
+ """
+
+ def __init__(self, *args, **kwargs):
+ Deb822.__init__(self, *args, **kwargs)
+
+ for field, fields in self._multivalued_fields.items():
+ try:
+ contents = self[field]
+ except KeyError:
+ continue
+
+ if self.is_multi_line(contents):
+ self[field] = []
+ updater_method = self[field].append
+ else:
+ self[field] = Deb822Dict()
+ updater_method = self[field].update
+
+ for line in filter(None, contents.splitlines()):
+ updater_method(Deb822Dict(zip(fields, line.split())))
+
+ def validate_input(self, key, value):
+ if key.lower() in self._multivalued_fields:
+ # It's difficult to write a validator for multivalued fields, and
+ # basically futile, since we allow mutable lists. In any case,
+ # with sanity checking in get_as_string, we shouldn't ever output
+ # unparseable data.
+ pass
+ else:
+ Deb822.validate_input(self, key, value)
+
+ def get_as_string(self, key):
+ keyl = key.lower()
+ if keyl in self._multivalued_fields:
+ fd = StringIO()
+ if hasattr(self[key], 'keys'): # single-line
+ array = [ self[key] ]
+ else: # multi-line
+ fd.write("\n")
+ array = self[key]
+
+ order = self._multivalued_fields[keyl]
+ try:
+ field_lengths = self._fixed_field_lengths
+ except AttributeError:
+ field_lengths = {}
+ for item in array:
+ for x in order:
+ raw_value = six.text_type(item[x])
+ try:
+ length = field_lengths[keyl][x]
+ except KeyError:
+ value = raw_value
+ else:
+ value = (length - len(raw_value)) * " " + raw_value
+ if "\n" in value:
+ raise ValueError("'\\n' not allowed in component of "
+ "multivalued field %s" % key)
+ fd.write(" %s" % value)
+ fd.write("\n")
+ return fd.getvalue().rstrip("\n")
+ else:
+ return Deb822.get_as_string(self, key)
+
+
+class _gpg_multivalued(_multivalued):
+ """A _multivalued class that can support gpg signed objects
+
+ This class's feature is that it stores the raw text before parsing so that
+ gpg can verify the signature. Use it just like you would use the
+ _multivalued class.
+
+ This class only stores raw text if it is given a raw string, or if it
+ detects a gpg signature when given a file or sequence of lines (see
+ Deb822.split_gpg_and_payload for details).
+ """
+
+ def __init__(self, *args, **kwargs):
+ try:
+ sequence = args[0]
+ except IndexError:
+ sequence = kwargs.get("sequence", None)
+
+ if sequence is not None:
+ if isinstance(sequence, bytes):
+ self.raw_text = sequence
+ elif isinstance(sequence, six.string_types):
+ # If the file is really in some other encoding, then this
+ # probably won't verify correctly, but this is the best we
+ # can reasonably manage. For accurate verification, the
+ # file should be opened in binary mode.
+ self.raw_text = sequence.encode('utf-8')
+ elif hasattr(sequence, "items"):
+ # sequence is actually a dict(-like) object, so we don't have
+ # the raw text.
+ pass
+ else:
+ try:
+ gpg_pre_lines, lines, gpg_post_lines = \
+ self.split_gpg_and_payload(sequence)
+ except EOFError:
+ # Empty input
+ gpg_pre_lines = lines = gpg_post_lines = []
+ if gpg_pre_lines and gpg_post_lines:
+ raw_text = BytesIO()
+ raw_text.write(b"\n".join(gpg_pre_lines))
+ raw_text.write(b"\n\n")
+ raw_text.write(b"\n".join(lines))
+ raw_text.write(b"\n\n")
+ raw_text.write(b"\n".join(gpg_post_lines))
+ self.raw_text = raw_text.getvalue()
+ try:
+ args = list(args)
+ args[0] = lines
+ except IndexError:
+ kwargs["sequence"] = lines
+
+ _multivalued.__init__(self, *args, **kwargs)
+
+
+class Dsc(_gpg_multivalued):
+ _multivalued_fields = {
+ "files": [ "md5sum", "size", "name" ],
+ "checksums-sha1": ["sha1", "size", "name"],
+ "checksums-sha256": ["sha256", "size", "name"],
+ }
+
+
+class Changes(_gpg_multivalued):
+ _multivalued_fields = {
+ "files": [ "md5sum", "size", "section", "priority", "name" ],
+ "checksums-sha1": ["sha1", "size", "name"],
+ "checksums-sha256": ["sha256", "size", "name"],
+ }
+
+ def get_pool_path(self):
+ """Return the path in the pool where the files would be installed"""
+
+ # This is based on the section listed for the first file. While
+ # it is possible, I think, for a package to provide files in multiple
+ # sections, I haven't seen it in practice. In any case, this should
+ # probably detect such a situation and complain, or return a list...
+
+ s = self['files'][0]['section']
+
+ try:
+ section, subsection = s.split('/')
+ except ValueError:
+ # main is implicit
+ section = 'main'
+
+ if self['source'].startswith('lib'):
+ subdir = self['source'][:4]
+ else:
+ subdir = self['source'][0]
+
+ return 'pool/%s/%s/%s' % (section, subdir, self['source'])
+
+
+class PdiffIndex(_multivalued):
+ _multivalued_fields = {
+ "sha1-current": [ "SHA1", "size" ],
+ "sha1-history": [ "SHA1", "size", "date" ],
+ "sha1-patches": [ "SHA1", "size", "date" ],
+ }
+
+ @property
+ def _fixed_field_lengths(self):
+ fixed_field_lengths = {}
+ for key in self._multivalued_fields:
+ if hasattr(self[key], 'keys'):
+ # Not multi-line -- don't need to compute the field length for
+ # this one
+ continue
+ length = self._get_size_field_length(key)
+ fixed_field_lengths[key] = {"size": length}
+ return fixed_field_lengths
+
+ def _get_size_field_length(self, key):
+ lengths = [len(str(item['size'])) for item in self[key]]
+ return max(lengths)
+
+
+class Release(_multivalued):
+ """Represents a Release file
+
+ Set the size_field_behavior attribute to "dak" to make the size field
+ length only as long as the longest actual value. The default,
+ "apt-ftparchive" makes the field 16 characters long regardless.
+ """
+ # FIXME: Add support for detecting the behavior of the input, if
+ # constructed from actual 822 text.
+
+ _multivalued_fields = {
+ "md5sum": [ "md5sum", "size", "name" ],
+ "sha1": [ "sha1", "size", "name" ],
+ "sha256": [ "sha256", "size", "name" ],
+ }
+
+ __size_field_behavior = "apt-ftparchive"
+ def set_size_field_behavior(self, value):
+ if value not in ["apt-ftparchive", "dak"]:
+ raise ValueError("size_field_behavior must be either "
+ "'apt-ftparchive' or 'dak'")
+ else:
+ self.__size_field_behavior = value
+ size_field_behavior = property(lambda self: self.__size_field_behavior,
+ set_size_field_behavior)
+
+ @property
+ def _fixed_field_lengths(self):
+ fixed_field_lengths = {}
+ for key in self._multivalued_fields:
+ length = self._get_size_field_length(key)
+ fixed_field_lengths[key] = {"size": length}
+ return fixed_field_lengths
+
+ def _get_size_field_length(self, key):
+ if self.size_field_behavior == "apt-ftparchive":
+ return 16
+ elif self.size_field_behavior == "dak":
+ lengths = [len(str(item['size'])) for item in self[key]]
+ return max(lengths)
+
+
+class Sources(Dsc, _PkgRelationMixin):
+ """Represent an APT source package list"""
+
+ _relationship_fields = [ 'build-depends', 'build-depends-indep',
+ 'build-conflicts', 'build-conflicts-indep', 'binary' ]
+
+ def __init__(self, *args, **kwargs):
+ Dsc.__init__(self, *args, **kwargs)
+ _PkgRelationMixin.__init__(self, *args, **kwargs)
+
+
+class Packages(Deb822, _PkgRelationMixin):
+ """Represent an APT binary package list"""
+
+ _relationship_fields = [ 'depends', 'pre-depends', 'recommends',
+ 'suggests', 'breaks', 'conflicts', 'provides', 'replaces',
+ 'enhances' ]
+
+ def __init__(self, *args, **kwargs):
+ Deb822.__init__(self, *args, **kwargs)
+ _PkgRelationMixin.__init__(self, *args, **kwargs)
+
+
+class _CaseInsensitiveString(str):
+ """Case insensitive string.
+ """
+
+ def __new__(cls, str_):
+ s = str.__new__(cls, str_)
+ s.str_lower = str_.lower()
+ s.str_lower_hash = hash(s.str_lower)
+ return s
+
+ def __hash__(self):
+ return self.str_lower_hash
+
+ def __eq__(self, other):
+ return self.str_lower == other.lower()
+
+ def lower(self):
+ return self.str_lower
+
+
+_strI = _CaseInsensitiveString