# Copyright (C) 2014-2015 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. # This plugin is used as part of the Baserock automated release process. # # See: for more information. import re import cliapp import yaml import morphlib class CVECheckPlugin(cliapp.Plugin): def enable(self): self.app.add_subcommand( 'cve-check', self.cve_check, arg_synopsis='REPO REF MORPH [MORPH]...') def disable(self): pass def cve_check(self, args): '''Certify that any given system definition is reproducable. Command line arguments: * `REPO` is a git repository URL. * `REF` is a branch or other commit reference in that repository. * `MORPH` is a system morphology name at that ref. ''' if len(args) < 3: raise cliapp.AppException( 'Wrong number of arguments to certify command ' '(see help)') repo, ref = args[0], args[1] system_filenames = map(morphlib.util.sanitise_morphology_path, args[2:]) self.lrc, self.rrc = morphlib.util.new_repo_caches(self.app) self.resolver = morphlib.artifactresolver.ArtifactResolver() self.cve_db = CVEDataBase() self.version_guesser = VersionGuesser() for system_filename in system_filenames: self.certify_system(repo, ref, system_filename) def certify_system(self, repo, ref, system_filename): '''Certify reproducibility of system.''' self.app.status( msg='Creating source pool for %s' % system_filename, chatty=True) source_pool = morphlib.sourceresolver.create_source_pool( self.lrc, self.rrc, repo, ref, system_filename, cachedir=self.app.settings['cachedir'], update_repos = not self.app.settings['no-git-update'], status_cb=self.app.status) self.app.status( msg='Resolving artifacts for %s' % system_filename, chatty=True) root_artifacts = self.resolver.resolve_root_artifacts(source_pool) def find_artifact_by_name(artifacts_list, filename): for a in artifacts_list: if a.source.filename == filename: return a raise ValueError system_artifact = find_artifact_by_name(root_artifacts, system_filename) aliases = self.app.settings['repo-alias'] resolver = morphlib.repoaliasresolver.RepoAliasResolver(aliases) for source in set(a.source for a in system_artifact.walk()): if source.morphology['kind'] != 'chunk': continue name = source.morphology['name'] ref = source.original_ref print(' Checking chunk: {}'.format(name)) # Ensure we have a cache of the repo if not self.lrc.has_repo(source.repo_name): self.lrc.cache_repo(source.repo_name) cached = self.lrc.get_repo(source.repo_name) version = self.version_guesser.guess_version(cached, ref) self.cve_db.check_vulnerability(name, version) class CVEDetail: """ A single CVE """ def __init__(self, id, ranges): self.id = id self.ranges = ranges def check_vulnerability(self, version): print(' {}:'.format(self.id)) for r in self.ranges: print(' version is {}; vulnerable range is: {} to {}'. format(version, r[0], r[1])) class CVESoftware: """ A piece of software we track CVEs for """ def __init__(self, name): self.name = name self.cves = [] def add_cve(self, id, ranges): cve = CVEDetail(id, ranges) self.cves.append(cve) def check_vulnerability(self, version): for v in self.cves: v.check_vulnerability(version) class CVEDataBase: """ Provides CVE checking functionality """ def __init__(self): # TODO: In the future this will be loaded from a remote server # For now, we have a local YAML file, containing CVE data self.db = [] def _handle_header(doc): if 'stream' not in doc.keys() or 'version' not in doc.keys(): raise InputError('Bad header') def _handle_software(doc): software = None cves = [] for key, value in doc.iteritems(): if key == 'software': software = value elif key == 'vulnerabilities': for vuln in value: cves.append([vuln['id'], vuln['ranges']]) self._add_software(software, cves) with open('cve.yaml') as f: docs = yaml.load_all(f) for i, doc in enumerate(docs): if not doc: continue if i == 0: _handle_header(doc) else: _handle_software(doc) def _add_software(self, name, cves): sw = CVESoftware(name) for v in cves: sw.add_cve(v[0], v[1]) self.db.append(sw) def check_vulnerability(self, name, version): for s in self.db: if s.name != name: continue s.check_vulnerability(version) break class ProjectVersionGuesser(object): def __init__(self, interesting_files): self.interesting_files = interesting_files def file_contents(self, cached, ref, tree): filenames = [x for x in self.interesting_files if x in tree] if filenames: for filename in filenames: yield filename, cached.read_file(filename, ref) class AutotoolsVersionGuesser(ProjectVersionGuesser): def __init__(self): ProjectVersionGuesser.__init__(self, [ 'configure.ac', 'configure.in', 'configure.ac.in', 'configure.in.in', ]) def guess_version(self, cached, ref, tree): version = None for filename, data in self.file_contents(cached, ref, tree): # First, try to grep for AC_INIT() version = self._check_ac_init(data) if version: break return version def _check_ac_init(self, data): data = data.replace('\n', ' ') for macro in ['AC_INIT', 'AM_INIT_AUTOMAKE']: pattern = r'.*%s\((.*?)\).*' % macro if not re.match(pattern, data): continue acinit = re.sub(pattern, r'\1', data) if acinit: version = acinit.split(',') if macro == 'AM_INIT_AUTOMAKE' and len(version) == 1: continue version = version[0] if len(version) == 1 else version[1] version = re.sub('[\[\]]', '', version).strip() version = version.split()[0] if version: if version and version[0].isdigit(): return version return None class VersionGuesser(object): def __init__(self): self.guessers = [ AutotoolsVersionGuesser() ] def guess_version(self, cached, ref): version = None tree = cached.list_files(ref, recurse=False) for guesser in self.guessers: version = guesser.guess_version(cached, ref, tree) if version: return version # Fall back to `git describe` which always gives something return cached.version_guess(ref)