# Copyright (C) 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # This plugin is used as part of the Baserock automated release process. # # See: for more information. import re import cliapp import yaml import morphlib class CVECheckPlugin(cliapp.Plugin): def enable(self): self.app.add_subcommand( 'cve-check', self.cve_check, arg_synopsis='REPO REF MORPH [MORPH]...') def disable(self): pass def cve_check(self, args): '''Check given system definitions for known vulnerabilities. Command line arguments: * `REPO` is a git repository URL. * `REF` is a branch or other commit reference in that repository. * `MORPH` is a system morphology name at that ref. ''' if len(args) < 3: raise cliapp.AppException( 'Wrong number of arguments to cve-check command ' '(see help)') repo, ref = args[0], args[1] system_filenames = map(morphlib.util.sanitise_morphology_path, args[2:]) self.lrc, self.rrc = morphlib.util.new_repo_caches(self.app) self.resolver = morphlib.artifactresolver.ArtifactResolver() self.cve_db = CVEDataBase(self.lrc) for system_filename in system_filenames: self.certify_system(repo, ref, system_filename) def certify_system(self, repo, ref, system_filename): '''Check system for vulnerabilities.''' self.app.status( msg='Creating source pool for %s' % system_filename, chatty=True) source_pool = morphlib.sourceresolver.create_source_pool( self.lrc, self.rrc, repo, ref, system_filename, cachedir=self.app.settings['cachedir'], update_repos = not self.app.settings['no-git-update'], status_cb=self.app.status) self.app.status( msg='Resolving artifacts for %s' % system_filename, chatty=True) root_artifacts = self.resolver.resolve_root_artifacts(source_pool) def find_artifact_by_name(artifacts_list, filename): for a in artifacts_list: if a.source.filename == filename: return a raise ValueError system_artifact = find_artifact_by_name(root_artifacts, system_filename) aliases = self.app.settings['repo-alias'] resolver = morphlib.repoaliasresolver.RepoAliasResolver(aliases) for source in set(a.source for a in system_artifact.walk()): if source.morphology['kind'] != 'chunk': continue self.cve_db.check_vulnerability(source) class CVEDetail: """ A single CVE """ def __init__(self, id, ranges): self.id = id self.ranges = ranges def check_vulnerability(self, version): v = Version(version) for r in self.ranges: first = Version(r[0]) last = Version(r[1]) if v >= first and v <= last: print(' {}:'.format(self.id)) print(' version is {}; vulnerable range is: {} to {}'. format(version, r[0], r[1])) class CVESoftware: """ A piece of software we track CVEs for """ def __init__(self, name, filters): self.name = name self.filters = filters self.cves = [] def add_cve(self, id, ranges): cve = CVEDetail(id, ranges) self.cves.append(cve) def check_vulnerability(self, version): filtered_version = version for f in self.filters: filtered_version = re.sub(f[0], f[1], filtered_version) for v in self.cves: v.check_vulnerability(filtered_version) class CVEDataBase: """ Provides CVE checking functionality """ def __init__(self, lrc): # TODO: In the future this will be loaded from a remote server # For now, we have a local YAML file, containing CVE data self.db = [] self.lrc = lrc self.version_guesser = VersionGuesser() def _handle_header(doc): if 'stream' not in doc.keys() or 'version' not in doc.keys(): raise InputError('Bad header') def _handle_software(doc): software = None cves = [] filters = [] for key, value in doc.iteritems(): if key == 'software': software = value elif key == 'vulnerabilities': for vuln in value: cves.append([vuln['id'], vuln['ranges']]) elif key == 'tag-filters': for filter in value: filters.append([str(filter['match'] or ''), str(filter['replacement'] or '')]) self._add_software(software, filters, cves) with open('cve.yaml') as f: docs = yaml.load_all(f) for i, doc in enumerate(docs): if not doc: continue if i == 0: _handle_header(doc) else: _handle_software(doc) def _add_software(self, name, filters, cves): sw = CVESoftware(name, filters) for v in cves: sw.add_cve(v[0], v[1]) self.db.append(sw) def check_vulnerability(self, source): name = source.morphology['name'] for s in self.db: if s.name != name: continue print('Checking chunk: {}'.format(name)) def get_version(source): # Ensure we have a cache of the repo if not self.lrc.has_repo(source.repo_name): self.lrc.cache_repo(source.repo_name) ref = source.original_ref cached = self.lrc.get_repo(source.repo_name) return self.version_guesser.guess_version(cached, ref) s.check_vulnerability(get_version(source)) break class ProjectVersionGuesser(object): def __init__(self, interesting_files): self.interesting_files = interesting_files def file_contents(self, cached, ref, tree): filenames = [x for x in self.interesting_files if x in tree] if filenames: for filename in filenames: yield filename, cached.read_file(filename, ref) class AutotoolsVersionGuesser(ProjectVersionGuesser): def __init__(self): ProjectVersionGuesser.__init__(self, [ 'configure.ac', 'configure.in', 'configure.ac.in', 'configure.in.in', ]) def guess_version(self, cached, ref, tree): version = None for filename, data in self.file_contents(cached, ref, tree): # First, try to grep for AC_INIT() version = self._check_ac_init(data) if version: break return version def _check_ac_init(self, data): data = data.replace('\n', ' ') for macro in ['AC_INIT', 'AM_INIT_AUTOMAKE']: pattern = r'.*%s\((.*?)\).*' % macro if not re.match(pattern, data): continue acinit = re.sub(pattern, r'\1', data) if acinit: version = acinit.split(',') if macro == 'AM_INIT_AUTOMAKE' and len(version) == 1: continue version = version[0] if len(version) == 1 else version[1] version = re.sub('[\[\]]', '', version).strip() version = version.split()[0] if version: if version and version[0].isdigit(): return version return None class VersionGuesser(object): def __init__(self): self.guessers = [ AutotoolsVersionGuesser() ] def guess_version(self, cached, ref): version = None tree = cached.list_files(ref, recurse=False) for guesser in self.guessers: version = guesser.guess_version(cached, ref, tree) if version: return version # Fall back to `git describe` which always gives something return cached.version_guess(ref) #------------------------------------------------------------------------------# # Beyond here is code from the `python-debian` module. #------------------------------------------------------------------------------# # debian_support.py -- Python module for Debian metadata # Copyright (C) 2005 Florian Weimer # Copyright (C) 2010 John Wright """This module implements facilities to deal with Debian-specific metadata.""" # From python-debian (GPL2) class BaseVersion(object): """Base class for classes representing Debian versions It doesn't implement any comparison, but it does check for valid versions according to Section 5.6.12 in the Debian Policy Manual. Since splitting the version into epoch, upstream_version, and debian_revision components is pretty much free with the validation, it sets those fields as properties of the object, and sets the raw version to the full_version property. A missing epoch or debian_revision results in the respective property set to None. Setting any of the properties results in the full_version being recomputed and the rest of the properties set from that. It also implements __str__, just returning the raw version given to the initializer. """ re_valid_version = re.compile( r"^((?P\d+):)?" "(?P[A-Za-z0-9.+:~-]+?)" "(-(?P[A-Za-z0-9+.~]+))?$") magic_attrs = ('full_version', 'epoch', 'upstream_version', 'debian_revision', 'debian_version') def __init__(self, version): self.full_version = version def _set_full_version(self, version): m = self.re_valid_version.match(version) if not m: raise ValueError("Invalid version string %r" % version) # If there no epoch ("1:..."), then the upstream version can not # contain a :. if (m.group("epoch") is None and ":" in m.group("upstream_version")): raise ValueError("Invalid version string %r" % version) self.__full_version = version self.__epoch = m.group("epoch") self.__upstream_version = m.group("upstream_version") self.__debian_revision = m.group("debian_revision") def __setattr__(self, attr, value): if attr not in self.magic_attrs: super(BaseVersion, self).__setattr__(attr, value) return # For compatibility with the old changelog.Version class if attr == "debian_version": attr = "debian_revision" if attr == "full_version": self._set_full_version(str(value)) else: if value is not None: value = str(value) private = "_BaseVersion__%s" % attr old_value = getattr(self, private) setattr(self, private, value) try: self._update_full_version() except ValueError: # Don't leave it in an invalid state setattr(self, private, old_value) self._update_full_version() raise ValueError("Setting %s to %r results in invalid version" % (attr, value)) def __getattr__(self, attr): if attr not in self.magic_attrs: return super(BaseVersion, self).__getattribute__(attr) # For compatibility with the old changelog.Version class if attr == "debian_version": attr = "debian_revision" private = "_BaseVersion__%s" % attr return getattr(self, private) def _update_full_version(self): version = "" if self.__epoch is not None: version += self.__epoch + ":" version += self.__upstream_version if self.__debian_revision: version += "-" + self.__debian_revision self.full_version = version def __str__(self): return self.full_version def __repr__(self): return "%s('%s')" % (self.__class__.__name__, self) def _compare(self, other): raise NotImplementedError # TODO: Once we support only Python >= 2.7, we can simplify this using # @functools.total_ordering. def __lt__(self, other): return self._compare(other) < 0 def __le__(self, other): return self._compare(other) <= 0 def __eq__(self, other): return self._compare(other) == 0 def __ne__(self, other): return self._compare(other) != 0 def __ge__(self, other): return self._compare(other) >= 0 def __gt__(self, other): return self._compare(other) > 0 def __hash__(self): return hash(str(self)) # From python-debian (GPL2) # Version based on the DpkgVersion class by Raphael Hertzog in # svn://svn.debian.org/qa/trunk/pts/www/bin/common.py r2361 class Version(BaseVersion): """Represents a Debian package version, with native Python comparison""" re_all_digits_or_not = re.compile("\d+|\D+") re_digits = re.compile("\d+") re_digit = re.compile("\d") re_alpha = re.compile("[A-Za-z]") def _compare(self, other): # Convert other into an instance of BaseVersion if it's not already. # (All we need is epoch, upstream_version, and debian_revision # attributes, which BaseVersion gives us.) Requires other's string # representation to be the raw version. if not isinstance(other, BaseVersion): try: other = BaseVersion(str(other)) except ValueError as e: raise ValueError("Couldn't convert %r to BaseVersion: %s" % (other, e)) lepoch = int(self.epoch or "0") repoch = int(other.epoch or "0") if lepoch < repoch: return -1 elif lepoch > repoch: return 1 res = self._version_cmp_part(self.upstream_version, other.upstream_version) if res != 0: return res return self._version_cmp_part(self.debian_revision or "0", other.debian_revision or "0") @classmethod def _order(cls, x): """Return an integer value for character x""" if x == '~': return -1 elif cls.re_digit.match(x): return int(x) + 1 elif cls.re_alpha.match(x): return ord(x) else: return ord(x) + 256 @classmethod def _version_cmp_string(cls, va, vb): la = [cls._order(x) for x in va] lb = [cls._order(x) for x in vb] while la or lb: a = 0 b = 0 if la: a = la.pop(0) if lb: b = lb.pop(0) if a < b: return -1 elif a > b: return 1 return 0 @classmethod def _version_cmp_part(cls, va, vb): la = cls.re_all_digits_or_not.findall(va) lb = cls.re_all_digits_or_not.findall(vb) while la or lb: a = "0" b = "0" if la: a = la.pop(0) if lb: b = lb.pop(0) if cls.re_digits.match(a) and cls.re_digits.match(b): a = int(a) b = int(b) if a < b: return -1 elif a > b: return 1 else: res = cls._version_cmp_string(a, b) if res != 0: return res return 0