1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
# Copyright (C) 2012 Codethink Limited
#
import json
import logging
import re
import glob
import os
default_values = [
( u'serial', 0 ),
( u'create', u'never' ),
( u'destroy', u'never' ),
( u'interval', u'1m' ),
( u'stagger', False ),
( u'type', u'invalid_type' ),
]
valid_whens = set(["always", "never", "unchanged"])
valid_interval = re.compile(r"^([1-9][0-9]*)([mhd])?$")
interval_mults = {
None: 1,
'm': 60,
'h': 60 * 60,
'd': 60 * 60 * 24,
}
class LorryControllerConfig(object):
'''This encapsulates the configuration for lorry-controller.'''
def __init__(self, settings, confpath):
self.settings = settings
confpath = os.path.join(settings['work-area'], confpath)
logging.debug("Parsing configuration: %s" % confpath)
with open(confpath, "r") as fh:
self._raw_conf = json.load(fh)
logging.debug("Validating configuration semantics")
self._validate__raw_conf()
logging.debug("Configuration loaded")
def _validate__raw_conf(self):
'''Validate the entire raw config.'''
if type(self._raw_conf) != list:
self._give_up("Configuration was not a list.")
for entry in self._raw_conf:
if type(entry) != dict:
self._give_up("Configuration entry was not a dict.")
if type(entry.get('type', None)) != unicode:
self._give_up("Configuration entry lacked a suitable 'type' "
"field.")
# Set the defaults
for key, defval in default_values:
entry[key] = entry.get(key, defval)
# And validate the generic values such as serial
self._validate__generics(entry)
# Now validate the rest
validator = getattr(self, '_validate_' + entry['type'], None)
if validator is None:
self._give_up("Configuration entry had unknown type: %s" %
entry['type'])
validator(entry)
def _validate__generics(self, entry):
'''Validate the generic entries such as 'serial'.'''
for key, defval in default_values:
if type(defval) != type(entry[key]):
self._give_up("Invalid type for '%s': %r" % (key, entry[key]))
self._validate__when(entry, 'create')
if entry['create'] == "unchanged":
self._give_up("Invalid value for create: unchanged")
self._validate__when(entry, 'destroy')
entry['interval-parsed'] = self._parse_interval(entry['interval'])
def _validate__when(self, entry, key):
if entry[key] not in valid_whens:
self._give_up("Invalid value for %s: %s" % (key, entry[key]))
def _parse_interval(self, interval):
m = valid_interval.match(interval.lower())
if m is None:
self._give_up("Unable to parse '%s' as an interval" % interval)
num, mult = m.groups()
num = int(num)
mult = interval_mults.get(mult, None)
if mult is None:
self._give_up("Somehow, '%s' managed to appear as a multiplier!" %
m.group(2))
logging.debug("Converted interval %r to %r", interval, (num * mult))
return num * mult
def _validate_lorries(self, entry):
'''Validate a 'lorries' stanza.'''
if type(entry.get('globs', None)) != list:
self._give_up("Lorries stanzas need lists for their 'globs'")
all_lorries = set()
git_base = os.path.join(self.settings['work-area'], 'git')
for glob_entry in entry['globs']:
if type(glob_entry) != unicode:
self._give_up("Lorries globs should be strings")
fullglob = os.path.join(git_base, glob_entry)
all_lorries = all_lorries.union(set(glob.iglob(fullglob)))
for lorry in all_lorries:
if not lorry.startswith(git_base):
self._give_up("Glob found %s which is outside the git base")
logging.debug("Expanded globs in entry to %d lorries" %
len(all_lorries))
entry['lorries'] = all_lorries
def _give_up(self, *args, **kwargs):
logging.error(*args, **kwargs)
raise SystemExit(5)
|