summaryrefslogtreecommitdiff
path: root/buildstream/storage/_casbaseddirectory.py
blob: 1acde4e29b597bf49eb57670116c4e63704289f9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
#!/usr/bin/env python3
#
#  Copyright (C) 2018 Codethink Limited
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors:
#        Jim MacArthur <jim.macarthur@codethink.co.uk>

"""
CasBasedDirectory
=========

Implementation of the Directory class which backs onto a Merkle-tree based content
addressable storage system.

See also: :ref:`sandboxing`.
"""

from typing import List
from collections import OrderedDict

import calendar
import os
import time
import hashlib
import tempfile
from .._exceptions import BstError, ErrorDomain
from .directory import Directory
from ._filebaseddirectory import VirtualDirectoryError, FileBasedDirectory # TODO: That shouldn't be in _filebaseddirectory
from ..utils import FileListResult, safe_copy

from google.devtools.remoteexecution.v1test import remote_execution_pb2, remote_execution_pb2_grpc

# CasBasedDirectory intentionally doesn't call its superclass constuctor,
# which is mean to be unimplemented.
# pylint: disable=super-init-not-called

class _FileObject():
    """A description of a file in a virtual directory. The contents of
    this class are never used, but there needs to be something present
    for files so is_empty() works correctly.

    """
    def __init__(self, virtual_directory: Directory, filename: str):
        self.directory = virtual_directory
        self.filename = filename

class IndexEntry():
    def __init__(self, thing, modified=False):
        self.thing = thing
        self.modified = modified

class CasBasedDirectory(Directory):
    def __init__(self, context, ref=None, parent=None, name="undefined"):
        # I need a context...
        self.context = context
        self.cas_directory = os.path.join(context.artifactdir, 'googlecas')
        self.name = name # For debugging
        self.directory = remote_execution_pb2.Directory()
        if name != "undefined" and ref is None:
            # Load from existing head
            path = os.path.join(self.cas_directory, 'refs', 'heads', name)
            if os.path.exists(path):
                ref = remote_execution_pb2.Digest()
                with open(path, 'rb') as f:
                    ref.hash = f.read()
                    print("Will use {} for existing object {}".format(ref.hash, name))
        if ref:
            with open(self._objpath(ref), 'rb') as f:
                self.directory.ParseFromString(f.read())
            if name == "undefined":
                print("Loading a previous CasBasedDirectory with ref {}".format(ref.hash))
            else:
                print("Loading a previous CasBasedDirectory with the name '{}' and ref {}".format(name, ref.hash))
                
        else:
            print("Creating a new CasBasedDirectory with the name {}".format(name))
        self.ref = ref
        self.index = OrderedDict()
        self.parent = parent
        self._directory_read = False
        self._populate_index()

    # Functions taken from googlecas.py

    def _objpath(self, digest):
        return os.path.join(self.cas_directory, 'objects', digest.hash[:2], digest.hash[2:])

    def _hash_object(self, *, digest=None, path=None, buffer=None):
        # Exactly one of the two parameters has to be specified
        assert (path is None) != (buffer is None)

        if digest is None:
            digest = remote_execution_pb2.Digest()

        try:
            h = hashlib.sha256()
            # Always write out new file to avoid corruption if input file is modified
            with tempfile.NamedTemporaryFile(dir=os.path.join(self.cas_directory, 'tmp')) as out:
                if path:
                    with open(path, 'rb') as f:
                        for chunk in iter(lambda: f.read(4096), b""):
                            h.update(chunk)
                            out.write(chunk)
                else:
                    h.update(buffer)
                    out.write(buffer)

                out.flush()

                digest.hash = h.hexdigest()
                digest.size_bytes = os.fstat(out.fileno()).st_size

                # Place file at final location
                _objpath = self._objpath(digest)
                os.makedirs(os.path.dirname(_objpath), exist_ok=True)
                os.link(out.name, _objpath)

        except FileExistsError as e:
            # We can ignore the failed link() if the object is already in the repo.
            pass

        except OSError as e:
            raise VirtualDirectoryError("Failed to hash object: {}".format(e)) from e

        return digest

    # End of functions from googlecas.py

    def _populate_index(self) -> None:
        if self._directory_read:
            raise VirtualDirectoryError("_populate_index called twice")
            return
        # Replace with tree read
        
        for entry in self.directory.directories:
            self.index[entry.name] = IndexEntry(CasBasedDirectory(self.context, ref=entry.digest,parent=self,name=entry.name))
        for entry in self.directory.files:
            self.index[entry.name] = IndexEntry(_FileObject(self, entry))

        # TODO: Symlinks?
        self._directory_read = True

    def recalculate_hash(self) -> None:
        self.ref = self._hash_object(buffer=self.directory.SerializeToString())
        if self.parent:
            self.parent.recalculate_hash()

    def _add_new_blank_directory(self, name):
        print("Creating new CasBasedDirectory '{}', blank".format(name))
        newdir = CasBasedDirectory(self.context, parent=self, name=name)
        dirnode = self.directory.directories.add()
        dirnode.name = name
        # Calculate the hash for an empty directory
        if name in self.index:
            raise VirtualDirectoryError("Creating directory {} would overwrite an existing item in {}".format(name, str(self)))
        directory = remote_execution_pb2.Directory()             
        self._hash_object(digest=dirnode.digest, buffer=directory.SerializeToString())
        self.index[name] = IndexEntry(newdir)
        self.recalculate_hash()
        return newdir

    def find_directory(self, name):
        """ Strange search function - this is probably implemented in the collection, but I can't find it right now. """
        for i in self.directory.directories:
            if i.name == name: return i
        return None

    
    def _add_directory(self, name):
        print("Creating new CasBasedDirectory '{}', blank".format(name))
        if name in self.index:
            newdir = self.index[name].thing
            if not isinstance(newdir,CasBasedDirectory):
                raise VirtualDirectoryError("New directory {} in {} would overwrite existing non-directory of type {}".format(name, str(self), type(newdir)))
            dirnode = self.find_directory(name)
        else:
            newdir = CasBasedDirectory(self.context, parent=self, name=name)
            dirnode = self.directory.directories.add()

        dirnode.name = name
        # Calculate the hash for an empty directory

        directory = remote_execution_pb2.Directory()           
        self._hash_object(digest=dirnode.digest, buffer=directory.SerializeToString())
        self.index[name] = IndexEntry(newdir)
        self.recalculate_hash()
        return newdir

    
    def _add_new_file(self, basename, filename):
        filenode = self.directory.files.add()
        filenode.name = filename
        self._hash_object(digest=filenode.digest, path=os.path.join(basename, filename))
        self.index[filename] = IndexEntry(_FileObject(basename, filename), modified=(filename in self.index))
        self.recalculate_hash()
        print("  Added file {} which now has hash {}".format(filename, filenode.digest.hash[0:7]))
        
    def descend(self, subdirectory_spec: List[str], create: bool = False) -> Directory:
        """ Descend one or more levels of directory hierarchy and return a new
        Directory object for that directory.

        Arguments:
        * subdirectory_spec (list of strings): A list of strings which are all directory
          names.
        * create (boolean): If this is true, the directories will be created if
          they don't already exist.
        """

        # It's very common to send a directory name instead of a list and this causes
        # bizarre errors, so check for it here
        if not isinstance(subdirectory_spec, list):
            subdirectory_spec = [subdirectory_spec]
        if not subdirectory_spec:
            return self
        # Because of the way split works, it's common to get a list which begins with
        # an empty string. Detect these and remove them, then start again.
        if subdirectory_spec[0] == "":
            return self.descend(subdirectory_spec[1:], create)

        print("Descending from {} into '{}'".format(self, subdirectory_spec))
        if subdirectory_spec[0] in self.index:
            entry = self.index[subdirectory_spec[0]].thing
            if isinstance(entry, CasBasedDirectory):
                return entry.descend(subdirectory_spec[1:], create)
            else:
                error = "Cannot descend into {}, which is a '{}' in the directory {}"
                raise VirtualDirectoryError(error.format(subdirectory_spec[0],
                                                         type(entry).__name__,
                                                         self.external_directory))
        else:
            if create:
                # Adding an entry to this node makes it a new node. What do we do then?
                newdir = self._add_new_blank_directory(subdirectory_spec[0])
                return newdir
            else:
                error = "No entry called '{}' found in the directory"
                raise VirtualDirectoryError(error.format(subdirectory_spec[0]))
        return None

    def _import_files_from_directory(self, source_directory):
        for name in sorted(os.listdir(source_directory)):
            pathname = os.path.join(source_directory, name)
            if os.path.isdir(pathname):
                print("Attempting to import DIRECTORY {} from {}".format(name, source_directory))
                newdir = self._add_directory(name)
                newdir._import_files_from_directory(pathname)
            elif os.path.islink(pathname):
                    raise NotImplementedError()
        for name in sorted(os.listdir(source_directory)):
            pathname = os.path.join(source_directory, name)
            if os.path.isfile(pathname):
                print("Attempting to import FILE {} from {}".format(name, source_directory))
                self._add_new_file(source_directory, name)

    def save(self, name):
        """Saves this directory into the content cache."""
        self.recalculate_hash()
        refdir = os.path.join(self.cas_directory, 'refs', 'heads')
        refname = os.path.join(refdir,name)
        if not os.path.exists(refdir):
            os.makedirs(refdir)
        with open(refname, "wt") as f:
            f.write(self.ref.hash)
        
    def import_files(self, external_pathspec: any, files: List[str] = None,
                     report_written: bool = True, update_utimes: bool = False,
                     can_link: bool = False) -> FileListResult:
        """Imports some or all files from external_path into this directory.

        Keyword arguments: external_pathspec: Either a string
        containing a pathname, or a Directory object, to use as the
        source.

        files (list of strings): A list of all the files relative to
        the external_pathspec to copy. If 'None' is supplied, all
        files are copied.

        report_written (bool): Return the full list of files
        written. Defaults to true. If false, only a list of
        overwritten files is returned.

        update_utimes (bool): Update the access and modification time
        of each file copied to the current time.

        can_link (bool): Whether it's OK to create a hard link to the
        original content, meaning the stored copy will change when the
        original files change. Setting this doesn't guarantee hard
        links will be made. can_link will never be used if
        update_utimes is set.
        """
        
        if isinstance(external_pathspec, FileBasedDirectory):
            source_directory = external_pathspec.get_underlying_directory()
        elif isinstance(external_pathspec, CasBasedDirectory):
            raise NotImplementedError()
            # Here we have an opportunity to do some faster import by just copying
            # hashes.
        else:
            source_directory = external_pathspec

        # TODO: No notice is taken of the `files` argument, report_written, update_utimes or can_link.
        self._import_files_from_directory(source_directory)
        if self.name != "undefined":   #TODO: Not good enough!
            self.save(self.name)
        
    def set_deterministic_mtime(self) -> None:
        """ Sets a static modification time for all regular files in this directory.
        Since we don't store any modification time, we don't need to do anything.
        """
        pass


    def set_deterministic_user(self) -> None:
        """ Sets all files in this directory to the current user's euid/egid.
        We also don't store user data, so this can be ignored.
        """
        pass

    def export_files(self, to_directory: str, can_link: bool = False, can_destroy: bool = False) -> None:
        """Copies everything from this into to_directory.

        Arguments:

        to_directory (string): a path outside this directory object
        where the contents will be copied to.

        can_link (bool): Whether we can create hard links in to_directory
        instead of copying.

        """

        if not os.path.exists(to_directory):
            os.mkdir(to_directory)

        print("Extraction of {}, containing directories {} and files {}".format(str(self),[entry.name for entry in self.directory.directories], [entry.name for entry in self.directory.files]))
        for entry in self.directory.directories:
            if entry.name not in self.index:
                raise VirtualDirectoryError("CasDir {} contained {} in directories but not in the index".format(str(self), entry.name))                
            if not self._directory_read:
                raise VirtualDirectoryError("CasDir {} has not been indexed yet".format(str(self)))
            dest_dir = os.path.join(to_directory, entry.name)
            if not os.path.exists(dest_dir):
                os.mkdir(dest_dir)
            print("About to descend into {}".format(entry.name))
            target = self.descend([entry.name])
            print("Successful descend into {}".format(str(target)))
            target.export_files(dest_dir)
        for entry in self.directory.files:
            # Extract the entry to a single file
            dest_name = os.path.join(to_directory, entry.name)
            src_name = self._objpath(entry.digest)
            print("Extraction of {}/{} to {}".format(str(self),entry.name, dest_name))
            safe_copy(src_name, dest_name)

    def is_empty(self) -> bool:
        """ Return true if this directory has no files, subdirectories or links in it.
        """
        return len(self.index) == 0

    def mark_unmodified(self) -> None:
        """ Marks all files in this directory (recursively) as unmodified.
        """
        self.modified = False
        for i in self.index.values():
            i.modified = False
            if isinstance(i.thing, CasBasedDirectory):
                i.thing.mark_unmodified()

    def list_modified_paths(self) -> List[str]:
        """Provide a list of relative paths which have been modified since the
        last call to mark_unmodified.

        Return value: List(str) - list of modified paths
        """

        filelist = []
        for (k,v) in self.index.items():            
            if isinstance(v.thing, CasBasedDirectory):
                filelist.extend([k + "/" + x for x in v.list_relative_paths])
            elif isinstance(v.thing, _FileObject) and v.modified:
                filelist.append(k)
        return filelist
    
    def list_relative_paths(self) -> List[str]:
        """Provide a list of all relative paths.

        Return value: List(str) - list of all paths
        """

        filelist = []
        for (k,v) in self.index.items():            
            if isinstance(v.thing, CasBasedDirectory):
                filelist.extend([k + "/" + x for x in v.list_relative_paths])
            elif isinstance(v.thing, _FileObject):
                filelist.append(k)
        return filelist
       

    def __str__(self) -> str:
        path = ""
        if self.parent:
            path = str(self.parent)
        path += "/" + self.name
        
        return path