# Copyright (C) 2012 Codethink Limited # import json import logging import re import glob import os import time import fnmatch default_values = [ ( u'create', u'never' ), ( u'destroy', u'never' ), ( u'interval', u'1m' ), ( u'stagger', False ), ( u'tarball', u'never' ), ( u'type', u'invalid_type' ), ] valid_interval = re.compile(r"^([1-9][0-9]*)([mhd])?$") interval_mults = { None: 1, 'm': 60, 'h': 60 * 60, 'd': 60 * 60 * 24, } class LorryControllerConfig(object): '''This encapsulates the configuration for lorry-controller.''' def __init__(self, app, confpath): self.app = app self.confpath = confpath self.lorries = {} self.configs = {} self.duetimes = {} self.troves = [] def parse_config(self): confpath = os.path.join(self.app.settings['work-area'], self.confpath) logging.info("Parsing configuration: %s" % confpath) try: with open(confpath, "r") as fh: self._raw_conf = json.load(fh) except Exception, e: logging.error("Unable to parse: %r" % e) raise logging.debug("Validating configuration semantics") self._validate__raw_conf() logging.info("Configuration loaded") def _validate__raw_conf(self): '''Validate the entire raw config.''' if type(self._raw_conf) != list: self._give_up("Configuration was not a list.") for entry in self._raw_conf: if type(entry) != dict: self._give_up("Configuration entry was not a dict.") if type(entry.get('type', None)) != unicode: self._give_up("Configuration entry lacked a suitable 'type' " "field.") # Set the defaults for key, defval in default_values: entry[key] = entry.get(key, defval) # And validate the generic values self._validate__generics(entry) # Now validate the rest validator = getattr(self, '_validate_' + entry['type'], None) if validator is None: self._give_up("Configuration entry had unknown type: %s" % entry['type']) validator(entry) def _validate__generics(self, entry): '''Validate the generic entries such as 'uuid'.''' if type(entry.get('uuid', None)) != unicode: self._give_up("UUID missing, cannot reconcile without it!") if entry['uuid'] in self.configs: self._give_up("UUID is not unique") self.configs[entry['uuid']] = entry for key, defval in default_values: if type(defval) != type(entry[key]): self._give_up("Invalid type for '%s': %r" % (key, entry[key])) self._validate__when(entry, 'create', ["always", "never"]) self._validate__when(entry, 'destroy', ["always", "never", "unchanged"]) self._validate__when(entry, 'tarball', ["always", "never", "first"]) entry['interval-parsed'] = self._parse_interval(entry['interval']) if 'ls-interval' in entry: entry['ls-interval-parsed'] = \ self._parse_interval(entry['ls-interval']) def _validate__when(self, entry, key, valid_whens): if entry[key] not in valid_whens: self._give_up("Invalid value for %s: %s" % (key, entry[key])) def _parse_interval(self, interval): m = valid_interval.match(interval.lower()) if m is None: self._give_up("Unable to parse '%s' as an interval" % interval) num, mult = m.groups() num = int(num) mult = interval_mults.get(mult, None) if mult is None: self._give_up("Somehow, '%s' managed to appear as a multiplier!" % m.group(2)) logging.debug("Converted interval %r to %r", interval, (num * mult)) return num * mult def _validate_lorries(self, entry): '''Validate a 'lorries' stanza.''' if type(entry.get('globs', None)) != list: self._give_up("Lorries stanzas need lists for their 'globs'") if entry.get('prefix', None) is None: entry['prefix'] = u"" if type(entry['prefix']) != unicode: self._give_up("Lorry prefixes should be strings.") my_lorries = set() git_base = os.path.join(self.app.settings['work-area'], 'git') for glob_entry in entry['globs']: if type(glob_entry) != unicode: self._give_up("Lorries globs should be strings") fullglob = os.path.join(git_base, glob_entry) my_lorries = my_lorries.union(set(glob.iglob(fullglob))) for lorry in my_lorries: if not lorry.startswith(git_base): self._give_up("Glob found %s which is outside the git base") logging.debug("Expanded globs in entry to %d lorry files" % len(my_lorries)) logging.debug("Loading lorries into memory, please wait...") my_lorry_names = set() for lorry in my_lorries: try: with open(lorry, "r") as fh: lorry_json = json.load(fh) for name, content in lorry_json.iteritems(): fullname = os.path.join(entry['prefix'], name) if self.lorries.get(fullname, None) is not None: self._give_up("Lorry repeated: %s" % fullname) content['controller-uuid'] = entry['uuid'] if not content.has_key('source-HEAD'): content['source-HEAD'] = 'refs/heads/master' my_lorry_names.add(fullname) self.lorries[fullname] = content except Exception, e: logging.warning("Unable to parse %s, because of %s. " "Moving on" % (lorry, e)) # Now calculate the 'next due' time for every lorry we just parsed starttime = time.time() - 1 endtime = starttime + entry['interval-parsed'] step = 0 if entry['stagger']: step = (endtime - starttime) / (len(my_lorry_names) + 1) for lorry_name in my_lorry_names: self.duetimes[lorry_name] = starttime starttime += step logging.debug("Now loaded %d lorries" % len(self.lorries.keys())) def _validate_trove(self, entry): # Validate top levels if type(entry.get('trovehost', None)) != unicode: self._give_up("Trove host %r is not a string" % entry.get('trovehost', None)) if 'ls-interval-parsed' not in entry: self._give_up("No ls-interval specified for %s" % entry['trovehost']) if type(entry.get('prefixmap', None)) != dict: self._give_up("Prefixmap not a dict for %s" % entry['trovehost']) if type(entry.get('ignore', [])) != list: self._give_up("Ignore is not a list for %s" % entry['trovehost']) # Validate prefixmap for local, remote in entry['prefixmap'].iteritems(): if type(local) != unicode: self._give_up("Local part of prefixmap is not a string: %r" % local) if type(remote) != unicode: self._give_up("Remote part of prefixmap is not a string: %r" % remote) # Validate ignore for ign in entry.get('ignore', []): if type(ign) != unicode: self._give_up("Part of ignore list is not a string: %r" % ign) self.troves.append(entry) def update_trove(self, trove, state): logging.info("Processing trove %s (%s)" % (trove['trovehost'], trove['uuid'])) # 1. Ensure that if we need to 'ls' the trove, we do it now = time.time() listcmdargs = ["ssh", "-oStrictHostKeyChecking=no", "-oBatchMode=yes", "git@" + trove['trovehost'], "ls", "--verbose"] state['next-vls'] = state.get('next-vls', now - 1) if state['next-vls'] < now: exit, out, err = self.app.maybe_runcmd(listcmdargs, dry=True) if exit == 0: repo_info = {} for entry in [x for x in out.split("\n") if x != ""]: while entry.find(" ") > -1: entry = entry.replace(" ", " ") elems = entry.split(" ") this_repo = { "perm": elems[0], "name": elems[1], "head": elems[2], "desc": " ".join(elems[3:]), } repo_info[elems[1]] = this_repo state['last-ls-output'] = repo_info logging.info("ls interval %d" % trove['ls-interval-parsed']) logging.info("next-vls was %s" % time.asctime(time.gmtime(state['next-vls']))) while state['next-vls'] < now: state['next-vls'] += trove['ls-interval-parsed'] logging.info("next-vls now %s" % time.asctime(time.gmtime(state['next-vls']))) else: # Pass through unchanged state['last-ls-output'] = state.get('last-ls-output', {}) def ignored(reponame): for pattern in trove['ignore']: if fnmatch.fnmatch(reponame, pattern): return True return False # 2. For every entry in last-ls-output, construct a lorry if we want it lorries_made = set() for remotereponame, info in state['last-ls-output'].iteritems(): localreponame = None for local, remote in trove['prefixmap'].iteritems(): if remotereponame.startswith(remote+"/"): localreponame = "%s/%s" % (local, remotereponame[len(remote)+1:]) if ((not ignored(remotereponame)) and (localreponame is not None)): # Construct a lorry for this one. lorry = { "type": "git", "url": "ssh://git@%s/%s.git" % (trove['trovehost'], remotereponame), "controller-uuid": trove['uuid'], "source-HEAD": info["head"], "refspecs": [ "+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*" ] } if localreponame in self.lorries: logging.warn("Skipping %s (%s from %s) because we already " "have something for that." % ( localreponame, remotereponame, trove['trovehost'])) else: self.lorries[localreponame] = lorry lorries_made.add(localreponame) # 3. Now schedule all those lorries in case they're new starttime = time.time() - 1 endtime = starttime + trove['interval-parsed'] step = 0 if trove['stagger']: step = (endtime - starttime) / (len(lorries_made)+1) for lorry_name in lorries_made: self.duetimes[lorry_name] = starttime starttime += step logging.debug("Generated %d lorries from that trove" % len(lorries_made)) def update_troves(self, statemgr): # Now that we have a state manager we can look at the trove data. for trove in self.troves: self.app.html.set_processing(trove['uuid']) trove_state = statemgr.get_trove(trove['uuid']) self.update_trove(trove, trove_state) def _give_up(self, *args, **kwargs): logging.error(*args, **kwargs) raise SystemExit(5)