#!/usr/bin/env python3 # Copyright (C) 2011-2021 Codethink Limited # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 of the License. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import contextlib import email.message import email.utils import ftplib import functools import json import logging import os import re import shutil import stat import string import subprocess import sys import tempfile import traceback import urllib.parse import urllib.request import urllib.response from datetime import datetime import cliapp import yaml __version__ = "1.0.0" lorry_path = os.path.realpath(__file__) UPDATE_COUNT_NAME = "lorry-update-count" def file_missing_or_empty(filename): """A more comprehensive alternative to os.path.exists().""" return (not os.path.isfile(filename)) or (os.path.getsize(filename) <= 0) def quote_url(url): """Convert URIs to strings that only contain digits, letters, % and _. NOTE: When changing the code of this function, make sure to also apply the same to the quote_url() function of morph. Otherwise the git bundles generated by lorry may no longer be found by morph. """ valid_chars = string.digits + string.ascii_letters + "%_" transl = lambda x: x if x in valid_chars else "_" # noqa E731 return "".join([transl(x) for x in url]) def find_exec_in_path(name): for dir_name in os.environ["PATH"].split(os.pathsep): file_name = os.path.join(dir_name, name) try: file_stat = os.stat(file_name) except OSError: continue if stat.S_ISREG(file_stat.st_mode) and file_stat.st_mode & 0o111: return file_name return None def find_bazaar_command(): return find_exec_in_path("bzr") or find_exec_in_path("brz") # This is a simplified replacement for urllib.request.FTPHandler, with # one additional feature: it uses the MDTM extension specified in RFC # 3659, and sets the Last-Modified header based on the result. class SimpleFTPHandler(urllib.request.BaseHandler): # Priority needs to be higher (numerically lower) than the # standard FTPHandler handler_order = urllib.request.FTPHandler.handler_order - 1 # Format is YYYYMMDDhhmmss with optional fractional seconds (which # we ignore). The implicit time zone is UTC. _mdtm_response_re = re.compile(r"^213 (\d{14})(?:\.\d+)?$") def ftp_open(self, req): from urllib.request import URLError if getattr(req, "method", None) not in [None, "GET"]: raise URLError("SimpleFTPHandler: only supports GET method") url_parts = urllib.parse.urlparse(req.full_url) if url_parts.username or url_parts.password: raise URLError("SimpleFTPHandler: only supports anonymous FTP") if ";" in url_parts.path or url_parts.params or url_parts.query: raise URLError("SimpleFTPHandler: does not support parameters") path_parts = [] for part in url_parts.path.split("/"): if part == "": continue part = urllib.parse.unquote(part) if "\r\n" in part: raise URLError("SimpleFTPHandler: illegal characters in path") path_parts.append(part) ftp = ftplib.FTP() try: ftp.connect(url_parts.hostname, url_parts.port or 21) ftp.login() for part in path_parts[:-1]: ftp.cwd(part) # Try to get the mtime from the server, ignoring error # or invalid responses mtime = None try: mdtm_response = ftp.sendcmd("MDTM " + path_parts[-1]) except ftplib.error_reply: pass else: match = self._mdtm_response_re.match(mdtm_response) if match: mtime_s = match.group(1) try: mtime = datetime( int(mtime_s[0:4]), int(mtime_s[4:6]), int(mtime_s[6:8]), int(mtime_s[8:10]), int(mtime_s[10:12]), int(mtime_s[12:14]), ) except ValueError: pass # Start binary mode transfer ftp.voidcmd("TYPE I") data_sock, size = ftp.ntransfercmd("RETR " + path_parts[-1]) data_file = data_sock.makefile("rb") try: # Synthesise an HTTP-like response header header = email.message.EmailMessage() if size is not None: header["Content-Length"] = str(size) if mtime is not None: header["Last-Modified"] = mtime.strftime( "%a, %d %b %Y %H:%M:%S GMT" ) # Wrap up the file with a close hook to close the # control socket as well, and the extra metadata # expected in a response object response = urllib.response.addinfourl( urllib.response.addclosehook(data_file, self._ftp_close), header, req.full_url, ) self.ftp = ftp ftp = None data_file = None return response finally: # Close data socket on error if data_file: data_file.close() except ftplib.all_errors as e: # Re-raise as URLError raise URLError("SimpleFTPHandler: %r" % e).with_traceback(sys.exc_info()[2]) finally: # Close control socket on error if ftp: ftp.close() def _ftp_close(self): self.ftp.close() del self.ftp class Lorry(cliapp.Application): def add_settings(self): self.settings.string( ["working-area", "w"], "use DIR as the working area (for holding " "intermediate git repositories, etc)", metavar="DIR", default="workd", ) self.settings.string( ["mirror-base-url-push"], "base URL to use for pushing to the mirror server", metavar="URL", ) self.settings.string( ["mirror-base-url-fetch"], "base URL to use for bundle names and for " "pulling from the mirror server", metavar="URL", ) self.settings.boolean( ["pull-only"], "only pull from upstreams, do not push to the mirror server", ) self.settings.boolean(["verbose", "v"], "report what is going on to stdout") self.settings.boolean( ["repack"], "repack git repositories when an import has " "been updated (enabled by default)", default=True, ) self.settings.string( ["command-stdout"], "write the stdout of commands to this file", metavar="FILE", default=None, ) self.settings.string( ["command-stderr"], "write the stderr of commands to this file", metavar="FILE", default=None, ) self.settings.choice( ["bundle"], ["first", "never", "always"], "create bundles of git repositories." "first will only bundle if there is not already " "a bundle in BUNDLES (default: first)", ) self.settings.string( ["bundle-dest"], "put created bundles in BUNDLES", metavar="BUNDLES" ) self.settings.choice( ["tarball"], ["first", "never", "always"], "create tarballs of git repositories." "first will only tar if there is not already " "a tarball in TARBALLS (default: first)", ) self.settings.string( ["tarball-dest"], "put created tarballs in DIR", metavar="DIR", default="git-tars", ) self.settings.boolean( ["keep-multiple-backups"], "keep multiple (time-stamped) backups (disabled by default)", default=False, ) self.settings.string_list( ["push-option"], "option for 'git push' to pass to the remote server" ) self.settings.string( ["bazaar-command"], "command used to access Bazaar repositories", metavar="COMMAND", default=find_bazaar_command(), ) self.settings.boolean( ["check-certificates"], "validate SSL/TLS server certificates", default=True ) def process_args(self, args): status = 0 def ensure_path_exists(setting_name): full_path = os.path.abspath(self.settings[setting_name]) if not os.path.exists(full_path): os.makedirs(full_path) self.settings[setting_name] = full_path ensure_path_exists("working-area") if self.settings["tarball"] != "never": ensure_path_exists("tarball-dest") urllib.request.install_opener(urllib.request.build_opener(SimpleFTPHandler)) for arg in args: self.progress("Processing spec file %s" % arg) with open(arg) as f: try: specs = yaml.safe_load(f) except yaml.YAMLError: f.seek(0) specs = json.load(f) for name in sorted(specs.keys()): self.progress("Getting: %s" % name) try: self.gitify(name, specs[name]) except Exception: status += 1 sys.stderr.write("Error mirroring:\n%s" % traceback.format_exc()) logging.error(traceback.format_exc()) if status > 0: logging.debug("Total Mirrors failed: %d" % status) status = 1 self.progress("Done") # print 'total failed:',status sys.exit(status) def should_check_certificates(self, spec): return self.settings["check-certificates"] and spec.get( "check-certificates", True ) def bundle(self, name, gitdir): if self.settings["bundle"] == "never": return if len(self.settings["mirror-base-url-fetch"]) == 0: return bundlename = "%s/%s" % (self.settings["mirror-base-url-fetch"], name) path = ( os.path.join(self.settings["bundle-dest"], quote_url(bundlename)) + ".bndl" ) if not os.path.exists(path) or self.settings["bundle"] == "always": self.progress(".. building bundle %s" % bundlename) # create the bundle self.run_program( ["git", "bundle", "create", path, "--branches", "--tags"], cwd=gitdir ) # FIXME this is a hack to avoid unrecognized headers in bundles, # which happens with some repositories. See # # http://marc.info/?l=git&m=132992959317420&w=2 # # for more information. From the bundle's header section, the # expression below will remove all garbage lines that appear # between the first line (the bundle format meta comment) and # the list of refs. expr = r"1,/^[0-9a-f]\{40\}/{ /^[0-9a-f]\{40\}/!{/^[^#]/d}}" self.run_program(["sed", "-i", "-e", expr, path], cwd=gitdir) def make_tarball(self, name, gitdir): if self.settings["tarball"] == "never": return if len(self.settings["mirror-base-url-fetch"]) == 0: return tarballname = "%s/%s" % (self.settings["mirror-base-url-fetch"], name) path = ( os.path.join(self.settings["tarball-dest"], quote_url(tarballname)) + ".tar" ) if os.path.exists(os.path.join(gitdir, ".git")): gitdir = os.path.join(gitdir, ".git") if not os.path.exists(path) or self.settings["tarball"] == "always": self.progress(".. building tarball %s" % tarballname) args = ["tar", "cf", path] if os.path.exists(os.path.join(gitdir, "config")): os.rename( os.path.join(gitdir, "config"), os.path.join(gitdir, "config.lorrytmp"), ) with open(os.path.join(gitdir, "config"), "w") as fh: fh.write( """[core] repositoryformatversion = 0 filemode = true bare = true """ # noqa: 374 ) for entry in [ "HEAD", "objects", "refs", "info", "packed-refs", "config", "branches", "description", ]: if os.path.exists(os.path.join(gitdir, entry)): args += [entry] self.run_program(args, cwd=gitdir) if os.path.exists(os.path.join(gitdir, "config.lorrytmp")): os.rename( os.path.join(gitdir, "config.lorrytmp"), os.path.join(gitdir, "config"), ) def gitify(self, name, spec): self.progress("Getting %s" % name) table = { "bzr": self.gitify_bzr, "cvs": self.gitify_cvs, "git": self.mirror_git, "hg": self.gitify_hg, "svn": self.gitify_svn, "raw-file": self.gitify_raw_file, "tarball": functools.partial(self.gitify_archive, "tar"), "zip": functools.partial(self.gitify_archive, "zip"), "gzip": functools.partial(self.gitify_archive, "gzip"), } vcstype = spec["type"] if vcstype not in table: raise cliapp.AppException("Unknown VCS type %s" % vcstype) dirname = self.dirname(name) if not os.path.exists(dirname): os.mkdir(dirname) self.migrate_oldstyle_repos(dirname) temp_repo, active_repo, next_update_count = self.prepare_working_repos(dirname) time = datetime.now().strftime("%F-%T") post_fail_name = "git-post-fail" post_fail_backup_suffix = ( post_fail_name + "-" + time if self.settings["keep-multiple-backups"] else post_fail_name ) post_fail_backup_dir = os.path.join(dirname, post_fail_backup_suffix) if not self.settings["keep-multiple-backups"]: if os.path.exists(post_fail_backup_dir): shutil.rmtree(post_fail_backup_dir) try: self.needs_aggressive = False table[vcstype](name, dirname, temp_repo, spec) if self.settings["repack"]: self.progress(".. repacking %s git repository" % name) self.run_program( ["git", "config", "pack.windowMemory", "128M"], cwd=temp_repo ) args = ["git", "gc"] if self.needs_aggressive: args += ["--aggressive"] self.run_program(args, cwd=temp_repo) self.bundle(name, temp_repo) self.make_tarball(name, temp_repo) self.write_update_count(temp_repo, next_update_count) active_repo = temp_repo except: # noqa: E722 if active_repo is not None: os.rename(temp_repo, post_fail_backup_dir) self.output.write( "Mirror of %s failed, state before mirror " "is saved at %s and state after mirror is " "saved at %s\n" % (name, active_repo, post_fail_backup_dir) ) logging.debug( "Mirror of %s failed, state before mirror " "is saved at %s and state after mirror is " "saved at %s\n", name, active_repo, post_fail_backup_dir, ) raise if not self.settings["pull-only"]: if len(self.settings["mirror-base-url-push"]) > 0: if "refspecs" in spec: self.push_to_mirror_server(name, active_repo, spec["refspecs"]) else: self.push_to_mirror_server(name, active_repo) def migrate_oldstyle_repos(self, dirname): # Migrate old-style active repository old_repo = os.path.join(dirname, "git") if os.path.exists(old_repo): new_repo = os.path.join(dirname, "git-a") if os.path.exists(new_repo): msg = "Found both old %s and new %s directories; not migrating\n" % ( old_repo, new_repo, ) self.output.write(msg) logging.warning(msg) else: # If it has a .git subdirectory, use that old_gitdir = os.path.join(old_repo, ".git") if not os.path.exists(old_gitdir): old_gitdir = old_repo # Ensure that it's bare self.run_program(["git", "config", "core.bare", "true"], cwd=old_gitdir) self.run_program( ["git", "config", "core.logallrefupdates", "false"], cwd=old_gitdir ) self.write_update_count(old_gitdir, 1) # Move it to new name, and remove top-level directory if we # moved the .git subdirectory os.rename(old_gitdir, new_repo) if old_repo != old_gitdir: shutil.rmtree(old_repo) # Remove old-style backup repository old_repo = os.path.join(dirname, "git-pre-update") if os.path.exists(old_repo): shutil.rmtree(old_repo) def prepare_working_repos(self, dirname): # Determine which repository is active (has highest update # count) and which we will create or replace repos = [] for repo in [os.path.join(dirname, "git-a"), os.path.join(dirname, "git-b")]: tstamp = -1 count = 0 try: count_name = os.path.join(repo, UPDATE_COUNT_NAME) with open(count_name, "r") as count_file: tstamp = os.stat(count_file.fileno()).st_mtime count = int(count_file.readline()) except (FileNotFoundError, ValueError): pass repos.append((count, tstamp, repo)) repos.sort() temp_count, _, temp_repo = repos[0] active_count, active_tstamp, active_repo = repos[1] # Remove/rename temporary repository if os.path.exists(temp_repo): # If this was the result of a successful conversion, and # multiple backups are enabled, rename it. We name it # using the timestamp of the active repository, i.e. the # time that this repository became inactive. if temp_count > 0 and self.settings["keep-multiple-backups"]: time = datetime.fromtimestamp(active_tstamp).strftime("%F-%T") os.rename(temp_repo, os.path.join(dirname, "git-pre-update-" + time)) else: shutil.rmtree(temp_repo) if active_count == 0: # We can't create the repo here because "git cvsimport" # insists on doing so itself return temp_repo, None, 1 self.copy_gitdir(active_repo, temp_repo) return temp_repo, active_repo, active_count + 1 def write_update_count(self, gitdir, count): count_name = os.path.join(gitdir, UPDATE_COUNT_NAME) with open(count_name, "w") as count_file: count_file.write("%d\n" % count) def ensure_gitdir(self, gitdir): # Create git directory if it doesn't exist. Return flag for # whether we created it. exists = os.path.exists(gitdir) if not exists: self.progress(".. creating git repo") self.run_program(["git", "init", "--bare", gitdir]) else: self.progress(".. updating existing clone") return not exists def copy_gitdir(self, source, dest): if not os.path.exists(source): return None # copy everything except the objects dir and update count def ignore_filter(dirname, filenames): if dirname.endswith(source): return ["objects", UPDATE_COUNT_NAME] return [] shutil.copytree(source, dest, ignore=ignore_filter) # hardlink the objects sourceobjects = os.path.join(source, "objects") assert os.path.exists(sourceobjects), "No source objects" objectspath = os.path.join(dest, "objects") os.mkdir(objectspath) for dirpath, dirnames, filenames in os.walk(sourceobjects): assert dirpath.startswith(sourceobjects) # strip sourceobjects and / from relpath relpath = dirpath[len(sourceobjects) + 1 :] for dirname in dirnames: os.mkdir(os.path.join(objectspath, relpath, dirname)) for filename in filenames: assert os.path.exists(os.path.join(objectspath, relpath)) os.link( os.path.join(dirpath, filename), os.path.join(objectspath, relpath, filename), ) return dest def mirror_git(self, project_name, dirname, gitdir, spec): if self.should_check_certificates(spec): env = os.environ else: env = dict(os.environ) env["GIT_SSL_NO_VERIFY"] = "true" self.ensure_gitdir(gitdir) argv = [ "git", "-c", "gc.autodetach=false", "fetch", "--prune", spec["url"], "+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*", ] self.run_program(argv, cwd=gitdir, env=env) def gitify_bzr(self, project_name, dirname, gitdir, spec): bzr = self.settings["bazaar-command"] if os.path.basename(bzr) == "brz": msg = "Using Breezy for Bazaar conversion, which may not work correctly" logging.warning(msg) self.output.write("%s\n" % msg) bzrdir = os.path.join(dirname, "bzr") # check if repo exists if not os.path.exists(bzrdir): self.progress(".. creating bzr repository") self.run_program([bzr, "init-repo", "--no-trees", bzrdir]) self.needs_aggressive = self.ensure_gitdir(gitdir) # branches are the listed branches, plus the branch specified in url if "branches" in spec: branches = spec["branches"] else: branches = {} if "url" in spec: branches["trunk"] = spec["url"] logging.debug("all branches: %s" % repr(branches)) cert_options = [] if not self.should_check_certificates(spec): cert_options.append("-Ossl.cert_reqs=none") for branch, address in branches.items(): branchdir = os.path.join(bzrdir, branch) if not os.path.exists(branchdir): self.progress(".. doing initial bzr branch") self.run_program( [bzr, "branch", "--quiet", *cert_options, address, branchdir] ) else: self.progress(".. updating bzr branch") self.run_program( [bzr, "pull", "--quiet", *cert_options, address], cwd=branchdir ) exports = {} bzrmarks = os.path.join(gitdir, "marks.bzr") for branch, address in branches.items(): branchdir = os.path.join(bzrdir, branch) self.progress(".. fast-exporting branch %s from bzr" % branch) exports[branch] = os.path.join(dirname, "fast-export" + branch) cmdline = [ bzr, "fast-export", "--git-branch=" + branch, branchdir, exports[branch], ] if os.path.exists(bzrmarks): cmdline.append("--marks=" + bzrmarks) else: cmdline.append("--export-marks=" + bzrmarks) self.run_program(cmdline) gitmarks = os.path.join(gitdir, "marks.git") for branch, address in branches.items(): self.progress(".. fast-importing branch %s into git" % branch) with open(exports[branch], "rb") as exportfile: cmdline = ["git", "fast-import", "--export-marks=" + gitmarks] if os.path.exists(gitmarks): cmdline.append("--import-marks=" + gitmarks) self.run_program(cmdline, stdin=exportfile, cwd=gitdir) for branch, address in branches.items(): branchdir = os.path.join(bzrdir, branch) self.progress(".. removing temporary fast-export file " + exports[branch]) os.remove(exports[branch]) def gitify_svn(self, project_name, dirname, gitdir, spec): layout = spec["layout"] # if standard layour specified, fill in the defaults if layout == "standard": layout = {"trunk": "trunk", "tags": "tags/*", "branches": "branches/*"} # We should not run "git svn init" which creates a non-bare # repository. Instead, create the directory and extra config # parameters that it would create. This also ensures that if # the URL in the spec changes, Lorry accepts the change rather # than using the original one. self.needs_aggressive = self.ensure_gitdir(gitdir) os.makedirs(os.path.join(gitdir, "svn/refs/remotes/git-svn"), exist_ok=True) self.run_program( ["git", "config", "svn-remote.svn.url", spec["url"]], cwd=gitdir ) self.run_program( ["git", "config", "svn-remote.svn.fetch", ":refs/remotes/git-svn"], cwd=gitdir, ) # manually set the refspecs to fetch into local # git-svn can apparently provide better history tracking by # fetching the root of the repository # git-svn will convert branch, trunk and tag paths to allow this, # but it is simpler to disable it and do it manually self.run_program( [ "git", "config", "svn-remote.svn.fetch", layout["trunk"] + ":refs/heads/master", ], cwd=gitdir, ) if "branches" in layout: self.run_program( [ "git", "config", "svn-remote.svn.branches", layout["branches"] + ":refs/heads/*", ], cwd=gitdir, ) else: # try removing old config try: self.run_program( ["git", "config", "--unset", "svn-remote.svn.branches"], cwd=gitdir ) except Exception as e: if "(exit code 5)" not in e.message: raise if "tags" in layout: self.run_program( [ "git", "config", "svn-remote.svn.tags", layout["tags"] + ":refs/tags/*", ], cwd=gitdir, ) else: # try removing old config try: self.run_program( ["git", "config", "--unset", "svn-remote.svn.tags"], cwd=gitdir ) except Exception as e: if "(exit code 5)" not in e.message: raise # update the remote tracking branches self.run_program(["git", "svn", "fetch"], cwd=gitdir) def gitify_cvs(self, project_name, dirname, gitdir, spec): # git cvsimport requires a working tree for some operations; # keep this separate from the repository worktree = os.path.join(dirname, "git-cvs-worktree") if os.path.exists(gitdir): if os.path.exists(worktree): shutil.rmtree(worktree) self.run_program(["git", "worktree", "prune"], cwd=gitdir) self.run_program(["git", "worktree", "add", worktree, "master"], cwd=gitdir) # git cvsimport insists on $GIT_DIR or .git being a # directory, but .git will be a file. Set $GIT_DIR to # the subdirectory of gitdir created for this worktree. cvsimp_gitdir = ( self.runcmd(["git", "rev-parse", "--git-dir"], cwd=worktree) .decode("utf-8") .rstrip("\n") ) # cvsps should find its cache under gitdir, not the # temporary worktree cvsps_home = gitdir else: # We must let git cvsimport create the repository cvsimp_gitdir = os.path.join(worktree, ".git") # cvsps should create its cache there, and it will be # moved to gitdir later cvsps_home = cvsimp_gitdir self.needs_aggressive = True env = dict(os.environ) env["CVS_RSH"] = "lorry-ssh-wrapper" env["GIT_DIR"] = cvsimp_gitdir env["HOME"] = cvsps_home self.run_program( [ "git", "cvsimport", "-a", "-d", spec["url"], "-C", worktree, spec["module"], ], env=env, ) if not os.path.exists(gitdir): # git cvsimport created a non-bare repository; convert it # to bare os.rename(cvsimp_gitdir, gitdir) self.run_program(["git", "config", "core.bare", "true"], cwd=gitdir) self.run_program( ["git", "config", "core.logallrefupdates", "false"], cwd=gitdir ) try: os.remove(os.path.join(gitdir, "index")) except FileNotFoundError: pass try: shutil.rmtree(os.path.join(gitdir, "logs")) except FileNotFoundError: pass shutil.rmtree(worktree) self.run_program(["git", "worktree", "prune"], cwd=gitdir) def gitify_hg(self, project_name, dirname, gitdir, spec): cert_options = [] if not self.should_check_certificates(spec): cert_options.append("--insecure") hgdir = os.path.join(dirname, "hg") if os.path.exists(hgdir): self.progress(".. updating hg branch") # Note that we always specify the URL from the spec, so # that if the spec changes, we pick up the new URL. self.run_program( ["hg", "pull", "--quiet", *cert_options, spec["url"]], cwd=hgdir ) else: self.progress(".. doing initial hg branch") self.run_program( ["hg", "clone", "--quiet", *cert_options, spec["url"], hgdir] ) self.needs_aggressive = self.ensure_gitdir(gitdir) # Since there are marks files in existing deployments that # have broken references, fix up the marks file before rather # than after running hg-fast-export self.prune_unreachable_marks(gitdir, os.path.join(gitdir, "hg2git-marks")) # Enable the fudge_user_ids plugin if possible plugin_options = [] exit, out, _ = self.runcmd_unchecked(["hg-fast-export", "--help"]) if exit == 0 and b"--plugin" in out: for plugin_path in [ # Check under same directory as us, in case we are # not yet installed os.path.join(os.path.dirname(__file__), "hg-fast-export/plugins"), # Try walking from /bin/lorry to # /share/lorry/... os.path.join( os.path.dirname(__file__), "../share/lorry/hg-fast-export/plugins" ), ]: if os.path.exists(plugin_path): plugin_options += [ "--plugin-path", plugin_path, "--plugin", "fudge_user_ids", ] break self.progress(".. fast-exporting into git") self.run_program( ["hg-fast-export", "-r", "../hg", "--quiet", "--force", *plugin_options], cwd=gitdir, ) def gitify_raw_file(self, project_name, dirname, gitdir, spec): raw_file_branch = "master" raw_file_refspecs = "refs/heads/{branch}:refs/heads/{branch}".format( branch=raw_file_branch ) self.ensure_gitdir(gitdir) # Ensure the repo is up-to-date pullurl = "%s/%s.git" % (self.settings["mirror-base-url-push"], project_name) try: self.run_program(["git", "fetch", pullurl, raw_file_refspecs], cwd=gitdir) except Exception: # TODO: Be more specific about which exceptions are fine self.progress("Failed to fetch from URL: %s" % pullurl) # Ensure the repo supports git LFS self.run_program(["git", "lfs", "install", "--local"], cwd=gitdir) try: # List of all files in preexisting downstream repo old_files = ( self.run_program( ["git", "ls-tree", "-r", "HEAD", "--name-only"], cwd=gitdir ) .strip() .splitlines() ) except Exception: old_files = [] # Fetch the files specified in .lorry file new_files = [] desired_files = [".gitattributes"] for src in spec["urls"]: url = src["url"] url_path = urllib.parse.urlparse(url)[2] basename = os.path.basename(url_path) file_dest = os.path.join(dirname, basename) self.progress(".. checking if we need to fetch %s" % basename) repo_subdir = src.get("destination", ".") repo_dest = os.path.relpath(os.path.join(repo_subdir, basename)) desired_files.append(repo_dest) if file_missing_or_empty(file_dest): new_files.append((repo_subdir, file_dest)) self.progress(".. attempting to fetch %s" % basename) try: with open(file_dest, "wb") as raw_file, self.urlopen( url ) as urlfile: raw_file.write(urlfile.read()) try: # HTTP dates use (one of) the email date formats url_date = email.utils.mktime_tz( email.utils.parsedate_tz( urlfile.info()["Last-Modified"] ) ) except (KeyError, ValueError, TypeError): url_date = None if url_date: os.utime(file_dest, (url_date, url_date)) except Exception: if os.path.exists(file_dest): os.unlink(file_dest) raise elif repo_dest not in old_files: new_files.append((repo_subdir, file_dest)) self.progress("..path has changed for %s" % basename) else: self.progress("no need to import %s" % basename) if len(new_files) == 0: self.progress(".. no need to run importer") # Import files to bare local git repo for subpath, raw_file in new_files: self.run_program( ["%s.raw-file-importer" % lorry_path, raw_file, subpath], cwd=gitdir ) # Set user info for commits self.run_program( ["git", "config", "user.name", '"Lorry Raw File Importer"'], cwd=gitdir ) self.run_program( ["git", "config", "user.email", '"lorry-raw-file-importer@lorry"'], cwd=gitdir, ) # Remove repo file paths of worktree that aren't # included included in .lorry file # If old worktree exists, delete it gitdir_prefix = os.path.dirname(gitdir) worktree = os.path.join(gitdir_prefix, "raw-file-worktree") if os.path.exists(worktree): shutil.rmtree(worktree) self.run_program(["git", "worktree", "prune"], cwd=gitdir) # Create worktree from preexisting downstream repo self.run_program( ["git", "worktree", "add", worktree, "--checkout", raw_file_branch], cwd=gitdir, ) unexpected_files = [ old_file for old_file in old_files if old_file not in desired_files ] for file_path in unexpected_files: self.progress("Found unexpected file: %s" % file_path) # Delete file path from git repo self.run_program( ["git", "rm", "-f", file_path], cwd=worktree, ) # Delete file locally if not needed whatsoever file_name = os.path.basename(file_path) if file_name not in [ os.path.basename(file_path) for file_path in desired_files ]: file_abs_path_local = os.path.join(gitdir_prefix, file_name) os.remove(file_abs_path_local) self.progress("Deleted unwanted file locally: %s" % file_name) # Commit changes try: self.progress(".. commiting deletions of unwanted files") self.run_program( ["git", "commit", "-m", "Remove redundant file paths."], cwd=worktree ) except Exception: self.progress("Couldn't commit deletions. Perhaps there weren't any?") # Remove worktree (in case files are large) shutil.rmtree(worktree) self.run_program(["git", "worktree", "prune"], cwd=gitdir) def gitify_archive(self, archive_type, project_name, dirname, gitdir, spec): assert archive_type in ["zip", "gzip", "tar"] url = spec["url"] url_path = urllib.parse.urlparse(url)[2] basename = os.path.basename(url_path) archive_dest = os.path.join(dirname, basename) self.ensure_gitdir(gitdir) self.progress(".. checking if we need to fetch %s" % basename) if file_missing_or_empty(archive_dest): self.progress(".. attempting to fetch.") try: with open(archive_dest, "wb") as archive_file, self.urlopen( spec["url"] ) as urlfile: archive_file.write(urlfile.read()) try: # HTTP dates use (one of) the email date formats url_date = email.utils.mktime_tz( email.utils.parsedate_tz(urlfile.info()["Last-Modified"]) ) except (KeyError, ValueError, TypeError): url_date = None if url_date: os.utime(archive_dest, (url_date, url_date)) except Exception: if os.path.exists(archive_dest): os.unlink(archive_dest) raise else: self.progress(".. no need to run, nothing to do") return cmdline = ["%s.%s-importer" % (lorry_path, archive_type), archive_dest] self.run_program(cmdline, cwd=gitdir) self.needs_aggressive = True def push_to_mirror_server( self, name, gitdir, pushrefspecs=["refs/heads/*:refs/heads/*", "refs/tags/*:refs/tags/*"], ): pushurl = "%s/%s.git" % (self.settings["mirror-base-url-push"], name) # If pushing to local filesystem, check if repo exists if pushurl.startswith("/"): if not os.path.isdir(pushurl): self.progress(".. creating local repo for %s in %s" % (name, pushurl)) self.run_program(["git", "init", "--bare", pushurl], cwd=gitdir) self.progress(".. pushing %s to mirror server %s" % (name, pushurl)) self.run_program( ["git", "push"] + ["--push-option=%s" % option for option in self.settings["push-option"]] + [pushurl] + pushrefspecs, cwd=gitdir, ) def run_program(self, argv, **kwargs): if self.settings["command-stdout"]: kwargs["stdout"] = open(self.settings["command-stdout"], "a") if self.settings["command-stderr"]: kwargs["stderr"] = open(self.settings["command-stderr"], "a") if "stdin" not in kwargs: kwargs["stdin"] = open("/dev/null", "r") logging.debug("Running: argv=%s kwargs=%s" % (repr(argv), repr(kwargs))) exit, out, err = self.runcmd_unchecked(argv, **kwargs) if isinstance(out, bytes): out = out.decode("utf-8", errors="replace") if isinstance(err, bytes): err = err.decode("utf-8", errors="replace") logging.debug( "Command: %s\nExit: %s\nStdout:\n%sStderr:\n%s" % (argv, exit, self.indent(out or ""), self.indent(err or "")) ) if exit != 0: raise Exception( "%s failed (exit code %s):\n%s" % (" ".join(argv), exit, self.indent(err or "")) ) return out def indent(self, string): return "".join(" %s\n" % line for line in string.splitlines()) def dirname(self, project_name): assert "\0" not in project_name # We escape slashes as underscores. project_name = "_".join(project_name.split("/")) return os.path.join(self.settings["working-area"], project_name) def progress(self, msg): logging.debug(msg) if self.settings["verbose"]: self.output.write("%s\n" % msg) def prune_unreachable_marks(self, gitdir, marks_name): if not os.path.exists(marks_name): return # Find reachable commits reachable = set() with subprocess.Popen( ["git", "rev-list", "--all"], cwd=gitdir, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, universal_newlines=True, ) as rev_list_proc: for line in rev_list_proc.stdout: reachable.add(line.rstrip("\n")) # Filter marks file to temporary file mark_re = re.compile(r":(\S+) ([0-9a-f]{40,})\n") marks_temp_fd, marks_temp_name = tempfile.mkstemp( dir=os.path.dirname(marks_name) ) n_pruned = 0 try: with open(marks_temp_fd, "w") as marks_out, open( marks_name, "r" ) as marks_in: for line in marks_in: match = mark_re.match(line) if not match: msg = '%s: failed to parse line "%s"' % ( marks_name, line.rstrip("\n"), ) logging.warning(msg) self.output.write("%s\n" % msg) # We don't know whether it should be kept; err # on the side of caution marks_out.write(line) elif match.group(2) in reachable: marks_out.write(line) else: n_pruned += 1 # On success, replace marks file with temporary file os.rename(marks_temp_name, marks_name) if n_pruned: self.progress( "%s: pruned %d unreachable commit(s)" % (marks_name, n_pruned) ) except: # noqa: E722 # On failure, delete temporary file os.unlink(marks_temp_name) raise @contextlib.contextmanager def urlopen(self, url): try: req = urllib.request.Request(url) with urllib.request.urlopen(req) as urlfile: yield urlfile except urllib.error.HTTPError as e: if e.getcode() == 403: newreq = urllib.request.Request(url) newreq.add_header( "User-Agent", "Lorry/%s (https://gitlab.com/CodethinkLabs/lorry/lorry)" % __version__, ) with urllib.request.urlopen(newreq) as newurlfile: yield newurlfile else: raise if __name__ == "__main__": Lorry(version=__version__).run()