1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
#!/usr/bin/env python3
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jonathan Maw <jonathan.maw@codethink.co.uk>
from . import utils
from . import _yaml
from ._exceptions import ImplError, LoadError, LoadErrorReason
# MirrorKind()
#
# Defines the kind of mirrors that buildstream is capable of handling.
class MirrorKind():
# The default type of mirror, replace the alias
DEFAULT = 'default'
# A mirror generated by buildstream
BST_GENERATED = 'bst-generated'
class Mirror():
def __init__(self, node):
self.location = _yaml.node_get(node, str, "location-name")
def get_mirror_uris(self, uri, source):
raise ImplError("Base mirror class does not implement get_mirror_uri")
class DefaultMirror(Mirror):
def __init__(self, node):
super().__init__(node)
allowed_fields = ['location-name', 'aliases', 'mirror-kind']
_yaml.node_validate(node, allowed_fields)
self.aliases = {}
for alias_mapping, uris in _yaml.node_items(node['aliases']):
assert isinstance(uris, list)
self.aliases[alias_mapping] = list(uris)
def get_mirror_uris(self, uri, source):
url_prefix, url_body = uri.split(utils._ALIAS_SEPARATOR, 1)
for alias_uri in self.aliases.get(url_prefix, []):
yield alias_uri + url_body, False
class BstGeneratedMirror(Mirror):
def __init__(self, node):
super().__init__(node)
allowed_fields = [
'location-name', 'mirror-kind', 'site', 'aliases-covered'
]
_yaml.node_validate(node, allowed_fields)
self.site = _yaml.node_get(node, str, 'site')
if '://' in self.site:
provenance = _yaml.node_get_provenance(node, key='site')
raise LoadError(LoadErrorReason.INVALID_DATA,
'{}: Site should not contain a URI prefix'.format(provenance))
self.aliases_covered = _yaml.node_get(node, list, 'aliases-covered')
def get_mirror_uris(self, uri, source):
yield source.get_normalised_mirror_path(uri, prefix=self.site), True
|