1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
|
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
"""
:orphan:
workspace - stage an opened workspace directory
===============================================
**Usage:**
The workspace plugin must not be directly used. This plugin is used as the
kind for a synthetic node representing the sources of an element with an open
workspace. The node constructed would be specified as follows:
.. code:: yaml
# Specify the workspace source kind
kind: workspace
# Specify the absolute path to the directory
path: /path/to/workspace
"""
import os
from buildstream import Source, SourceError, Directory, MappingNode
from buildstream.types import SourceRef
class WorkspaceSource(Source):
# pylint: disable=attribute-defined-outside-init
BST_MIN_VERSION = "2.0"
BST_STAGE_VIRTUAL_DIRECTORY = True
# the digest of the Directory following the import of the workspace
__digest = None
# the cache key of the last workspace build
__last_build = None
def configure(self, node: MappingNode) -> None:
node.validate_keys(["path", "last_build", "kind"])
self.path = node.get_str("path")
self.__last_build = node.get_str("last_build")
def preflight(self) -> None:
pass # pragma: nocover
def is_cached(self):
return True
def is_resolved(self):
return os.path.exists(self._get_local_path())
def get_unique_key(self):
#
# As a core plugin, we use some private API to optimize file hashing.
#
# * Use Source._cache_directory() to prepare a Directory
# * Do the regular staging activity into the Directory
# * Use the hash of the cached digest as the unique key
#
if not self.__digest:
with self._cache_directory() as directory:
self.__do_stage(directory)
self.__digest = directory._get_digest()
return self.__digest.hash
def get_ref(self) -> None:
return None
def load_ref(self, node: MappingNode) -> None:
pass # pragma: nocover
def set_ref(self, ref: SourceRef, node: MappingNode) -> None:
pass # pragma: nocover
# init_workspace()
#
# Raises AssertionError: existing workspaces should not be reinitialized
def init_workspace(self, directory: Directory) -> None:
raise AssertionError("Attempting to re-open an existing workspace")
def fetch(self) -> None: # pylint: disable=arguments-differ
pass # pragma: nocover
def stage(self, directory):
#
# We've already prepared the CAS while resolving the cache key which
# will happen before staging.
#
# Now just retrieve the previously cached content to stage.
#
assert isinstance(directory, Directory)
assert self.__digest is not None
with self._cache_directory(digest=self.__digest) as cached_directory:
directory.import_files(cached_directory)
# As a core element, we speed up some scenarios when this is used for
# a junction, by providing the local path to this content directly.
#
def _get_local_path(self) -> str:
return self.path
# Staging is implemented internally, we preemptively put it in the CAS
# as a side effect of resolving the cache key, at stage time we just
# do an internal CAS stage.
#
def __do_stage(self, directory: Directory) -> None:
assert isinstance(directory, Directory)
with self.timed_activity("Staging local files"):
result = directory.import_files(self.path, properties=["mtime"])
if result.overwritten or result.ignored:
raise SourceError(
"Failed to stage source: files clash with existing directory", reason="ensure-stage-dir-fail"
)
# Plugin entry point
def setup() -> WorkspaceSource:
return WorkspaceSource
|