1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
|
#!/usr/bin/env python3
#
# Copyright (C) 2018 Codethink Limited
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# Jim MacArthur <jim.macarthur@codethink.co.uk>
"""
CasBasedDirectory
=========
Implementation of the Directory class which backs onto a Merkle-tree based content
addressable storage system.
See also: :ref:`sandboxing`.
"""
from typing import List
from collections import OrderedDict
import calendar
import os
import time
import hashlib
import tempfile
from .._exceptions import BstError, ErrorDomain
from .directory import Directory
from ._filebaseddirectory import VirtualDirectoryError, FileBasedDirectory # TODO: That shouldn't be in _filebaseddirectory
from ..utils import FileListResult, safe_copy
from google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc
# CasBasedDirectory intentionally doesn't call its superclass constuctor,
# which is mean to be unimplemented.
# pylint: disable=super-init-not-called
class _FileObject():
"""A description of a file in a virtual directory. The contents of
this class are never used, but there needs to be something present
for files so is_empty() works correctly.
"""
def __init__(self, virtual_directory: Directory, filename: str):
self.directory = virtual_directory
self.filename = filename
class IndexEntry():
def __init__(self, thing, modified=False):
self.thing = thing
self.modified = modified
class CasBasedDirectory(Directory):
def __init__(self, context, ref=None, parent=None, name="undefined"):
# I need a context...
self.context = context
self.cas_directory = os.path.join(context.artifactdir, 'googlecas')
self.name = name # For debugging
self.directory = remote_execution_pb2.Directory()
if name != "undefined" and ref is None:
# Load from existing head
path = os.path.join(self.cas_directory, 'refs', 'heads', name)
if os.path.exists(path):
ref = remote_execution_pb2.Digest()
with open(path, 'rb') as f:
ref.hash = f.read()
print("Will use {} for existing object {}".format(ref.hash, name))
if ref:
with open(self._objpath(ref), 'rb') as f:
self.directory.ParseFromString(f.read())
if name == "undefined":
print("Loading a previous CasBasedDirectory with ref {}".format(ref.hash))
else:
print("Loading a previous CasBasedDirectory with the name '{}' and ref {}".format(name, ref.hash))
else:
print("Creating a new CasBasedDirectory with the name {}".format(name))
self.ref = ref
self.index = OrderedDict()
self.parent = parent
self._directory_read = False
self._populate_index()
# Functions taken from googlecas.py
def _objpath(self, digest):
return os.path.join(self.cas_directory, 'objects', digest.hash[:2], digest.hash[2:])
def _hash_object(self, *, digest=None, path=None, buffer=None):
# Exactly one of the two parameters has to be specified
assert (path is None) != (buffer is None)
if digest is None:
digest = remote_execution_pb2.Digest()
try:
h = hashlib.sha256()
# Always write out new file to avoid corruption if input file is modified
with tempfile.NamedTemporaryFile(dir=os.path.join(self.cas_directory, 'tmp')) as out:
if path:
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""):
h.update(chunk)
out.write(chunk)
else:
h.update(buffer)
out.write(buffer)
out.flush()
digest.hash = h.hexdigest()
digest.size_bytes = os.fstat(out.fileno()).st_size
# Place file at final location
_objpath = self._objpath(digest)
os.makedirs(os.path.dirname(_objpath), exist_ok=True)
os.link(out.name, _objpath)
except FileExistsError as e:
# We can ignore the failed link() if the object is already in the repo.
pass
except OSError as e:
raise VirtualDirectoryError("Failed to hash object: {}".format(e)) from e
return digest
# End of functions from googlecas.py
def _populate_index(self) -> None:
if self._directory_read:
raise VirtualDirectoryError("_populate_index called twice")
return
# Replace with tree read
for entry in self.directory.directories:
self.index[entry.name] = IndexEntry(CasBasedDirectory(self.context, ref=entry.digest,parent=self,name=entry.name))
for entry in self.directory.files:
self.index[entry.name] = IndexEntry(_FileObject(self, entry))
# TODO: Symlinks?
self._directory_read = True
def recalculate_hash(self) -> None:
self.ref = self._hash_object(buffer=self.directory.SerializeToString())
if self.parent:
self.parent.recalculate_hash()
def _add_new_blank_directory(self, name):
print("Creating new CasBasedDirectory '{}', blank".format(name))
newdir = CasBasedDirectory(self.context, parent=self, name=name)
dirnode = self.directory.directories.add()
dirnode.name = name
# Calculate the hash for an empty directory
if name in self.index:
raise VirtualDirectoryError("Creating directory {} would overwrite an existing item in {}".format(name, str(self)))
directory = remote_execution_pb2.Directory()
self._hash_object(digest=dirnode.digest, buffer=directory.SerializeToString())
self.index[name] = IndexEntry(newdir)
self.recalculate_hash()
return newdir
def find_directory(self, name):
""" Strange search function - this is probably implemented in the collection, but I can't find it right now. """
for i in self.directory.directories:
if i.name == name: return i
return None
def _add_directory(self, name):
print("Creating new CasBasedDirectory '{}', blank".format(name))
if name in self.index:
newdir = self.index[name].thing
if not isinstance(newdir,CasBasedDirectory):
raise VirtualDirectoryError("New directory {} in {} would overwrite existing non-directory of type {}".format(name, str(self), type(newdir)))
dirnode = self.find_directory(name)
else:
newdir = CasBasedDirectory(self.context, parent=self, name=name)
dirnode = self.directory.directories.add()
dirnode.name = name
# Calculate the hash for an empty directory
directory = remote_execution_pb2.Directory()
self._hash_object(digest=dirnode.digest, buffer=directory.SerializeToString())
self.index[name] = IndexEntry(newdir)
self.recalculate_hash()
return newdir
def _add_new_file(self, basename, filename):
filenode = self.directory.files.add()
filenode.name = filename
self._hash_object(digest=filenode.digest, path=os.path.join(basename, filename))
self.index[filename] = IndexEntry(_FileObject(basename, filename), modified=(filename in self.index))
self.recalculate_hash()
print(" Added file {} which now has hash {}".format(filename, filenode.digest.hash[0:7]))
def descend(self, subdirectory_spec: List[str], create: bool = False) -> Directory:
""" Descend one or more levels of directory hierarchy and return a new
Directory object for that directory.
Arguments:
* subdirectory_spec (list of strings): A list of strings which are all directory
names.
* create (boolean): If this is true, the directories will be created if
they don't already exist.
"""
# It's very common to send a directory name instead of a list and this causes
# bizarre errors, so check for it here
if not isinstance(subdirectory_spec, list):
subdirectory_spec = [subdirectory_spec]
if not subdirectory_spec:
return self
# Because of the way split works, it's common to get a list which begins with
# an empty string. Detect these and remove them, then start again.
if subdirectory_spec[0] == "":
return self.descend(subdirectory_spec[1:], create)
print("Descending from {} into '{}'".format(self, subdirectory_spec))
if subdirectory_spec[0] in self.index:
entry = self.index[subdirectory_spec[0]].thing
if isinstance(entry, CasBasedDirectory):
return entry.descend(subdirectory_spec[1:], create)
else:
error = "Cannot descend into {}, which is a '{}' in the directory {}"
raise VirtualDirectoryError(error.format(subdirectory_spec[0],
type(entry).__name__,
self.external_directory))
else:
if create:
# Adding an entry to this node makes it a new node. What do we do then?
newdir = self._add_new_blank_directory(subdirectory_spec[0])
return newdir
else:
error = "No entry called '{}' found in the directory"
raise VirtualDirectoryError(error.format(subdirectory_spec[0]))
return None
def _import_files_from_directory(self, source_directory):
for name in sorted(os.listdir(source_directory)):
pathname = os.path.join(source_directory, name)
if os.path.isdir(pathname):
print("Attempting to import DIRECTORY {} from {}".format(name, source_directory))
newdir = self._add_directory(name)
newdir._import_files_from_directory(pathname)
elif os.path.islink(pathname):
raise NotImplementedError()
for name in sorted(os.listdir(source_directory)):
pathname = os.path.join(source_directory, name)
if os.path.isfile(pathname):
print("Attempting to import FILE {} from {}".format(name, source_directory))
self._add_new_file(source_directory, name)
def save(self, name):
"""Saves this directory into the content cache."""
self.recalculate_hash()
refdir = os.path.join(self.cas_directory, 'refs', 'heads')
refname = os.path.join(refdir,name)
if not os.path.exists(refdir):
os.makedirs(refdir)
with open(refname, "wt") as f:
f.write(self.ref.hash)
def import_files(self, external_pathspec: any, files: List[str] = None,
report_written: bool = True, update_utimes: bool = False,
can_link: bool = False) -> FileListResult:
"""Imports some or all files from external_path into this directory.
Keyword arguments: external_pathspec: Either a string
containing a pathname, or a Directory object, to use as the
source.
files (list of strings): A list of all the files relative to
the external_pathspec to copy. If 'None' is supplied, all
files are copied.
report_written (bool): Return the full list of files
written. Defaults to true. If false, only a list of
overwritten files is returned.
update_utimes (bool): Update the access and modification time
of each file copied to the current time.
can_link (bool): Whether it's OK to create a hard link to the
original content, meaning the stored copy will change when the
original files change. Setting this doesn't guarantee hard
links will be made. can_link will never be used if
update_utimes is set.
"""
if isinstance(external_pathspec, FileBasedDirectory):
source_directory = external_pathspec.get_underlying_directory()
elif isinstance(external_pathspec, CasBasedDirectory):
raise NotImplementedError()
# Here we have an opportunity to do some faster import by just copying
# hashes.
else:
source_directory = external_pathspec
# TODO: No notice is taken of the `files` argument, report_written, update_utimes or can_link.
self._import_files_from_directory(source_directory)
if self.name != "undefined": #TODO: Not good enough!
self.save(self.name)
def set_deterministic_mtime(self) -> None:
""" Sets a static modification time for all regular files in this directory.
Since we don't store any modification time, we don't need to do anything.
"""
pass
def set_deterministic_user(self) -> None:
""" Sets all files in this directory to the current user's euid/egid.
We also don't store user data, so this can be ignored.
"""
pass
def export_files(self, to_directory: str, can_link: bool = False, can_destroy: bool = False) -> None:
"""Copies everything from this into to_directory.
Arguments:
to_directory (string): a path outside this directory object
where the contents will be copied to.
can_link (bool): Whether we can create hard links in to_directory
instead of copying.
"""
if not os.path.exists(to_directory):
os.mkdir(to_directory)
print("Extraction of {}, containing directories {} and files {}".format(str(self),[entry.name for entry in self.directory.directories], [entry.name for entry in self.directory.files]))
for entry in self.directory.directories:
if entry.name not in self.index:
raise VirtualDirectoryError("CasDir {} contained {} in directories but not in the index".format(str(self), entry.name))
if not self._directory_read:
raise VirtualDirectoryError("CasDir {} has not been indexed yet".format(str(self)))
dest_dir = os.path.join(to_directory, entry.name)
if not os.path.exists(dest_dir):
os.mkdir(dest_dir)
print("About to descend into {}".format(entry.name))
target = self.descend([entry.name])
print("Successful descend into {}".format(str(target)))
target.export_files(dest_dir)
for entry in self.directory.files:
# Extract the entry to a single file
dest_name = os.path.join(to_directory, entry.name)
src_name = self._objpath(entry.digest)
print("Extraction of {}/{} to {}".format(str(self),entry.name, dest_name))
safe_copy(src_name, dest_name)
def is_empty(self) -> bool:
""" Return true if this directory has no files, subdirectories or links in it.
"""
return len(self.index) == 0
def mark_unmodified(self) -> None:
""" Marks all files in this directory (recursively) as unmodified.
"""
self.modified = False
for i in self.index.values():
i.modified = False
if isinstance(i.thing, CasBasedDirectory):
i.thing.mark_unmodified()
def list_modified_paths(self) -> List[str]:
"""Provide a list of relative paths which have been modified since the
last call to mark_unmodified.
Return value: List(str) - list of modified paths
"""
filelist = []
for (k,v) in self.index.items():
if isinstance(v.thing, CasBasedDirectory):
filelist.extend([k + "/" + x for x in v.list_relative_paths])
elif isinstance(v.thing, _FileObject) and v.modified:
filelist.append(k)
return filelist
def list_relative_paths(self) -> List[str]:
"""Provide a list of all relative paths.
Return value: List(str) - list of all paths
"""
filelist = []
for (k,v) in self.index.items():
if isinstance(v.thing, CasBasedDirectory):
filelist.extend([k + "/" + x for x in v.list_relative_paths])
elif isinstance(v.thing, _FileObject):
filelist.append(k)
return filelist
def __str__(self) -> str:
path = ""
if self.parent:
path = str(self.parent)
path += "/" + self.name
return path
|