summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbst-marge-bot <marge-bot@buildstream.build>2019-07-08 11:17:25 +0000
committerbst-marge-bot <marge-bot@buildstream.build>2019-07-08 11:17:25 +0000
commitcf0516ba92fb4a220ae0086c411314cec4974df5 (patch)
tree2b9714484fe27645211efaa37065220465cf3b79
parent93816bea18d93495744263deaabc1d73a68029ac (diff)
parentbb2cf18be0aef7d6e394a0c6ff6d83eac737c60b (diff)
downloadbuildstream-cf0516ba92fb4a220ae0086c411314cec4974df5.tar.gz
Merge branch 'raoul/1038-source-cache-proto' into 'master'
Proto based source cache service Closes #1038 See merge request BuildStream/buildstream!1435
-rw-r--r--src/buildstream/_cas/casserver.py48
-rw-r--r--src/buildstream/_protos/buildstream/v2/buildstream.proto6
-rw-r--r--src/buildstream/_protos/buildstream/v2/buildstream_pb2.py61
-rw-r--r--src/buildstream/_protos/buildstream/v2/source.proto61
-rw-r--r--src/buildstream/_protos/buildstream/v2/source_pb2.py214
-rw-r--r--src/buildstream/_protos/buildstream/v2/source_pb2_grpc.py70
-rw-r--r--src/buildstream/_sourcecache.py213
-rw-r--r--src/buildstream/source.py3
-rw-r--r--tests/frontend/buildtrack.py4
-rw-r--r--tests/sourcecache/cache.py20
-rw-r--r--tests/sourcecache/fetch.py13
-rw-r--r--tests/sourcecache/push.py2
-rw-r--r--tests/sourcecache/staging.py13
13 files changed, 662 insertions, 66 deletions
diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 774548216..9606c263b 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -35,7 +35,7 @@ from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2, remo
from .._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
from .._protos.google.rpc import code_pb2
from .._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
- artifact_pb2, artifact_pb2_grpc
+ artifact_pb2, artifact_pb2_grpc, source_pb2, source_pb2_grpc
from .. import utils
from .._exceptions import CASError
@@ -66,6 +66,7 @@ def create_server(repo, *, enable_push,
min_head_size=int(2e9)):
cas = CASCache(os.path.abspath(repo))
artifactdir = os.path.join(os.path.abspath(repo), 'artifacts', 'refs')
+ sourcedir = os.path.join(os.path.abspath(repo), 'source_protos')
# Use max_workers default from Python 3.5+
max_workers = (os.cpu_count() or 1) * 5
@@ -88,11 +89,16 @@ def create_server(repo, *, enable_push,
artifact_pb2_grpc.add_ArtifactServiceServicer_to_server(
_ArtifactServicer(cas, artifactdir), server)
+ source_pb2_grpc.add_SourceServiceServicer_to_server(
+ _SourceServicer(sourcedir), server)
+
# Create up reference storage and artifact capabilities
artifact_capabilities = buildstream_pb2.ArtifactCapabilities(
allow_updates=enable_push)
+ source_capabilities = buildstream_pb2.SourceCapabilities(
+ allow_updates=enable_push)
buildstream_pb2_grpc.add_CapabilitiesServicer_to_server(
- _BuildStreamCapabilitiesServicer(artifact_capabilities),
+ _BuildStreamCapabilitiesServicer(artifact_capabilities, source_capabilities),
server)
return server
@@ -509,15 +515,51 @@ class _ArtifactServicer(artifact_pb2_grpc.ArtifactServiceServicer):
class _BuildStreamCapabilitiesServicer(buildstream_pb2_grpc.CapabilitiesServicer):
- def __init__(self, artifact_capabilities):
+ def __init__(self, artifact_capabilities, source_capabilities):
self.artifact_capabilities = artifact_capabilities
+ self.source_capabilities = source_capabilities
def GetCapabilities(self, request, context):
response = buildstream_pb2.ServerCapabilities()
response.artifact_capabilities.CopyFrom(self.artifact_capabilities)
+ response.source_capabilities.CopyFrom(self.source_capabilities)
return response
+class _SourceServicer(source_pb2_grpc.SourceServiceServicer):
+ def __init__(self, sourcedir):
+ self.sourcedir = sourcedir
+
+ def GetSource(self, request, context):
+ try:
+ source_proto = self._get_source(request.cache_key)
+ except FileNotFoundError:
+ context.abort(grpc.StatusCode.NOT_FOUND, "Source not found")
+ except DecodeError:
+ context.abort(grpc.StatusCode.NOT_FOUND,
+ "Sources gives invalid directory")
+
+ return source_proto
+
+ def UpdateSource(self, request, context):
+ self._set_source(request.cache_key, request.source)
+ return request.source
+
+ def _get_source(self, cache_key):
+ path = os.path.join(self.sourcedir, cache_key)
+ source_proto = source_pb2.Source()
+ with open(path, 'r+b') as f:
+ source_proto.ParseFromString(f.read())
+ os.utime(path)
+ return source_proto
+
+ def _set_source(self, cache_key, source_proto):
+ path = os.path.join(self.sourcedir, cache_key)
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with utils.save_file_atomic(path, 'w+b') as f:
+ f.write(source_proto.SerializeToString())
+
+
def _digest_from_download_resource_name(resource_name):
parts = resource_name.split('/')
diff --git a/src/buildstream/_protos/buildstream/v2/buildstream.proto b/src/buildstream/_protos/buildstream/v2/buildstream.proto
index e9cd1223b..d5cac2892 100644
--- a/src/buildstream/_protos/buildstream/v2/buildstream.proto
+++ b/src/buildstream/_protos/buildstream/v2/buildstream.proto
@@ -112,7 +112,13 @@ message ArtifactCapabilities {
bool allow_updates = 1;
}
+// Capabilities of the source service
+message SourceCapabilities {
+ bool allow_updates = 1;
+}
+
// All capabalities will be unset if the service isn't present
message ServerCapabilities {
ArtifactCapabilities artifact_capabilities = 1;
+ SourceCapabilities source_capabilities = 2;
} \ No newline at end of file
diff --git a/src/buildstream/_protos/buildstream/v2/buildstream_pb2.py b/src/buildstream/_protos/buildstream/v2/buildstream_pb2.py
index d723f4149..877cd3011 100644
--- a/src/buildstream/_protos/buildstream/v2/buildstream_pb2.py
+++ b/src/buildstream/_protos/buildstream/v2/buildstream_pb2.py
@@ -22,7 +22,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
package='buildstream.v2',
syntax='proto3',
serialized_options=None,
- serialized_pb=_b('\n buildstream/v2/buildstream.proto\x12\x0e\x62uildstream.v2\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\"9\n\x13GetReferenceRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"O\n\x14GetReferenceResponse\x12\x37\n\x06\x64igest\x18\x01 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"v\n\x16UpdateReferenceRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12\x37\n\x06\x64igest\x18\x03 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\x19\n\x17UpdateReferenceResponse\"&\n\rStatusRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\"\'\n\x0eStatusResponse\x12\x15\n\rallow_updates\x18\x01 \x01(\x08\"/\n\x16GetCapabilitiesRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\"-\n\x14\x41rtifactCapabilities\x12\x15\n\rallow_updates\x18\x01 \x01(\x08\"Y\n\x12ServerCapabilities\x12\x43\n\x15\x61rtifact_capabilities\x18\x01 \x01(\x0b\x32$.buildstream.v2.ArtifactCapabilities2\xca\x03\n\x10ReferenceStorage\x12\x90\x01\n\x0cGetReference\x12#.buildstream.v2.GetReferenceRequest\x1a$.buildstream.v2.GetReferenceResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v2/{instance_name=**}/buildstream/refs/{key}\x12\xa1\x01\n\x0fUpdateReference\x12&.buildstream.v2.UpdateReferenceRequest\x1a\'.buildstream.v2.UpdateReferenceResponse\"=\x82\xd3\xe4\x93\x02\x37\x1a-/v2/{instance_name=**}/buildstream/refs/{key}:\x06\x64igest\x12\x7f\n\x06Status\x12\x1d.buildstream.v2.StatusRequest\x1a\x1e.buildstream.v2.StatusResponse\"6\x82\xd3\xe4\x93\x02\x30\x1a./v2/{instance_name=**}/buildstream/refs:status2\x9b\x01\n\x0c\x43\x61pabilities\x12\x8a\x01\n\x0fGetCapabilities\x12&.buildstream.v2.GetCapabilitiesRequest\x1a\".buildstream.v2.ServerCapabilities\"+\x82\xd3\xe4\x93\x02%\x12#/v2/{instance_name=**}/capabilitiesb\x06proto3')
+ serialized_pb=_b('\n buildstream/v2/buildstream.proto\x12\x0e\x62uildstream.v2\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\"9\n\x13GetReferenceRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"O\n\x14GetReferenceResponse\x12\x37\n\x06\x64igest\x18\x01 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"v\n\x16UpdateReferenceRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x0c\n\x04keys\x18\x02 \x03(\t\x12\x37\n\x06\x64igest\x18\x03 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"\x19\n\x17UpdateReferenceResponse\"&\n\rStatusRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\"\'\n\x0eStatusResponse\x12\x15\n\rallow_updates\x18\x01 \x01(\x08\"/\n\x16GetCapabilitiesRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\"-\n\x14\x41rtifactCapabilities\x12\x15\n\rallow_updates\x18\x01 \x01(\x08\"+\n\x12SourceCapabilities\x12\x15\n\rallow_updates\x18\x01 \x01(\x08\"\x9a\x01\n\x12ServerCapabilities\x12\x43\n\x15\x61rtifact_capabilities\x18\x01 \x01(\x0b\x32$.buildstream.v2.ArtifactCapabilities\x12?\n\x13source_capabilities\x18\x02 \x01(\x0b\x32\".buildstream.v2.SourceCapabilities2\xca\x03\n\x10ReferenceStorage\x12\x90\x01\n\x0cGetReference\x12#.buildstream.v2.GetReferenceRequest\x1a$.buildstream.v2.GetReferenceResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v2/{instance_name=**}/buildstream/refs/{key}\x12\xa1\x01\n\x0fUpdateReference\x12&.buildstream.v2.UpdateReferenceRequest\x1a\'.buildstream.v2.UpdateReferenceResponse\"=\x82\xd3\xe4\x93\x02\x37\x1a-/v2/{instance_name=**}/buildstream/refs/{key}:\x06\x64igest\x12\x7f\n\x06Status\x12\x1d.buildstream.v2.StatusRequest\x1a\x1e.buildstream.v2.StatusResponse\"6\x82\xd3\xe4\x93\x02\x30\x1a./v2/{instance_name=**}/buildstream/refs:status2\x9b\x01\n\x0c\x43\x61pabilities\x12\x8a\x01\n\x0fGetCapabilities\x12&.buildstream.v2.GetCapabilitiesRequest\x1a\".buildstream.v2.ServerCapabilities\"+\x82\xd3\xe4\x93\x02%\x12#/v2/{instance_name=**}/capabilitiesb\x06proto3')
,
dependencies=[build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2.DESCRIPTOR,google_dot_api_dot_annotations__pb2.DESCRIPTOR,])
@@ -291,6 +291,37 @@ _ARTIFACTCAPABILITIES = _descriptor.Descriptor(
)
+_SOURCECAPABILITIES = _descriptor.Descriptor(
+ name='SourceCapabilities',
+ full_name='buildstream.v2.SourceCapabilities',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='allow_updates', full_name='buildstream.v2.SourceCapabilities.allow_updates', index=0,
+ number=1, type=8, cpp_type=7, label=1,
+ has_default_value=False, default_value=False,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=602,
+ serialized_end=645,
+)
+
+
_SERVERCAPABILITIES = _descriptor.Descriptor(
name='ServerCapabilities',
full_name='buildstream.v2.ServerCapabilities',
@@ -305,6 +336,13 @@ _SERVERCAPABILITIES = _descriptor.Descriptor(
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='source_capabilities', full_name='buildstream.v2.ServerCapabilities.source_capabilities', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
],
extensions=[
],
@@ -317,13 +355,14 @@ _SERVERCAPABILITIES = _descriptor.Descriptor(
extension_ranges=[],
oneofs=[
],
- serialized_start=602,
- serialized_end=691,
+ serialized_start=648,
+ serialized_end=802,
)
_GETREFERENCERESPONSE.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
_UPDATEREFERENCEREQUEST.fields_by_name['digest'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
_SERVERCAPABILITIES.fields_by_name['artifact_capabilities'].message_type = _ARTIFACTCAPABILITIES
+_SERVERCAPABILITIES.fields_by_name['source_capabilities'].message_type = _SOURCECAPABILITIES
DESCRIPTOR.message_types_by_name['GetReferenceRequest'] = _GETREFERENCEREQUEST
DESCRIPTOR.message_types_by_name['GetReferenceResponse'] = _GETREFERENCERESPONSE
DESCRIPTOR.message_types_by_name['UpdateReferenceRequest'] = _UPDATEREFERENCEREQUEST
@@ -332,6 +371,7 @@ DESCRIPTOR.message_types_by_name['StatusRequest'] = _STATUSREQUEST
DESCRIPTOR.message_types_by_name['StatusResponse'] = _STATUSRESPONSE
DESCRIPTOR.message_types_by_name['GetCapabilitiesRequest'] = _GETCAPABILITIESREQUEST
DESCRIPTOR.message_types_by_name['ArtifactCapabilities'] = _ARTIFACTCAPABILITIES
+DESCRIPTOR.message_types_by_name['SourceCapabilities'] = _SOURCECAPABILITIES
DESCRIPTOR.message_types_by_name['ServerCapabilities'] = _SERVERCAPABILITIES
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
@@ -391,6 +431,13 @@ ArtifactCapabilities = _reflection.GeneratedProtocolMessageType('ArtifactCapabil
))
_sym_db.RegisterMessage(ArtifactCapabilities)
+SourceCapabilities = _reflection.GeneratedProtocolMessageType('SourceCapabilities', (_message.Message,), dict(
+ DESCRIPTOR = _SOURCECAPABILITIES,
+ __module__ = 'buildstream.v2.buildstream_pb2'
+ # @@protoc_insertion_point(class_scope:buildstream.v2.SourceCapabilities)
+ ))
+_sym_db.RegisterMessage(SourceCapabilities)
+
ServerCapabilities = _reflection.GeneratedProtocolMessageType('ServerCapabilities', (_message.Message,), dict(
DESCRIPTOR = _SERVERCAPABILITIES,
__module__ = 'buildstream.v2.buildstream_pb2'
@@ -406,8 +453,8 @@ _REFERENCESTORAGE = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=0,
serialized_options=None,
- serialized_start=694,
- serialized_end=1152,
+ serialized_start=805,
+ serialized_end=1263,
methods=[
_descriptor.MethodDescriptor(
name='GetReference',
@@ -448,8 +495,8 @@ _CAPABILITIES = _descriptor.ServiceDescriptor(
file=DESCRIPTOR,
index=1,
serialized_options=None,
- serialized_start=1155,
- serialized_end=1310,
+ serialized_start=1266,
+ serialized_end=1421,
methods=[
_descriptor.MethodDescriptor(
name='GetCapabilities',
diff --git a/src/buildstream/_protos/buildstream/v2/source.proto b/src/buildstream/_protos/buildstream/v2/source.proto
new file mode 100644
index 000000000..6fcb43725
--- /dev/null
+++ b/src/buildstream/_protos/buildstream/v2/source.proto
@@ -0,0 +1,61 @@
+// Copyright 2019 Bloomberg Finance LP
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto3";
+
+package buildstream.v2;
+
+import "build/bazel/remote/execution/v2/remote_execution.proto";
+import "google/api/annotations.proto";
+
+service SourceService {
+ // Retrieve a source message given a reference name from the service
+ //
+ // Errors:
+ // * `NOT_FOUND`: The requested reference is not in the cache.
+ rpc GetSource(GetSourceRequest) returns (Source) {}
+
+ // Sets a source message on the service
+ //
+ // Errors:
+ // * `RESOURCE_EXHAUSTED`: There is insufficient storage space to add the
+ // entry to the cache.
+ rpc UpdateSource(UpdateSourceRequest) returns (Source) {}
+}
+
+message Source {
+ // This version number must always be present and can be used to
+ // further indicate presence or absence of parts of the proto at a
+ // later date. It only needs incrementing if a change to what is
+ // *mandatory* changes.
+ int32 version = 1;
+ // root directory digest of the files
+ build.bazel.remote.execution.v2.Digest files = 2;
+}
+
+message GetSourceRequest {
+ // instance of the service we want to query
+ string instance_name = 1;
+ // reference key for the source
+ string cache_key = 2;
+}
+
+message UpdateSourceRequest {
+ // instance of the service we want to query
+ string instance_name = 1;
+ // reference key ofr the source
+ string cache_key = 2;
+ // Source that we want to upload to the service
+ Source source = 3;
+} \ No newline at end of file
diff --git a/src/buildstream/_protos/buildstream/v2/source_pb2.py b/src/buildstream/_protos/buildstream/v2/source_pb2.py
new file mode 100644
index 000000000..c91dee0e9
--- /dev/null
+++ b/src/buildstream/_protos/buildstream/v2/source_pb2.py
@@ -0,0 +1,214 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: buildstream/v2/source.proto
+
+import sys
+_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from google.protobuf import reflection as _reflection
+from google.protobuf import symbol_database as _symbol_database
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 as build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2
+from buildstream._protos.google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
+
+
+DESCRIPTOR = _descriptor.FileDescriptor(
+ name='buildstream/v2/source.proto',
+ package='buildstream.v2',
+ syntax='proto3',
+ serialized_options=None,
+ serialized_pb=_b('\n\x1b\x62uildstream/v2/source.proto\x12\x0e\x62uildstream.v2\x1a\x36\x62uild/bazel/remote/execution/v2/remote_execution.proto\x1a\x1cgoogle/api/annotations.proto\"Q\n\x06Source\x12\x0f\n\x07version\x18\x01 \x01(\x05\x12\x36\n\x05\x66iles\x18\x02 \x01(\x0b\x32\'.build.bazel.remote.execution.v2.Digest\"<\n\x10GetSourceRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x11\n\tcache_key\x18\x02 \x01(\t\"g\n\x13UpdateSourceRequest\x12\x15\n\rinstance_name\x18\x01 \x01(\t\x12\x11\n\tcache_key\x18\x02 \x01(\t\x12&\n\x06source\x18\x03 \x01(\x0b\x32\x16.buildstream.v2.Source2\xa7\x01\n\rSourceService\x12G\n\tGetSource\x12 .buildstream.v2.GetSourceRequest\x1a\x16.buildstream.v2.Source\"\x00\x12M\n\x0cUpdateSource\x12#.buildstream.v2.UpdateSourceRequest\x1a\x16.buildstream.v2.Source\"\x00\x62\x06proto3')
+ ,
+ dependencies=[build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2.DESCRIPTOR,google_dot_api_dot_annotations__pb2.DESCRIPTOR,])
+
+
+
+
+_SOURCE = _descriptor.Descriptor(
+ name='Source',
+ full_name='buildstream.v2.Source',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='version', full_name='buildstream.v2.Source.version', index=0,
+ number=1, type=5, cpp_type=1, label=1,
+ has_default_value=False, default_value=0,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='files', full_name='buildstream.v2.Source.files', index=1,
+ number=2, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=133,
+ serialized_end=214,
+)
+
+
+_GETSOURCEREQUEST = _descriptor.Descriptor(
+ name='GetSourceRequest',
+ full_name='buildstream.v2.GetSourceRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='buildstream.v2.GetSourceRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='cache_key', full_name='buildstream.v2.GetSourceRequest.cache_key', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=216,
+ serialized_end=276,
+)
+
+
+_UPDATESOURCEREQUEST = _descriptor.Descriptor(
+ name='UpdateSourceRequest',
+ full_name='buildstream.v2.UpdateSourceRequest',
+ filename=None,
+ file=DESCRIPTOR,
+ containing_type=None,
+ fields=[
+ _descriptor.FieldDescriptor(
+ name='instance_name', full_name='buildstream.v2.UpdateSourceRequest.instance_name', index=0,
+ number=1, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='cache_key', full_name='buildstream.v2.UpdateSourceRequest.cache_key', index=1,
+ number=2, type=9, cpp_type=9, label=1,
+ has_default_value=False, default_value=_b("").decode('utf-8'),
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ _descriptor.FieldDescriptor(
+ name='source', full_name='buildstream.v2.UpdateSourceRequest.source', index=2,
+ number=3, type=11, cpp_type=10, label=1,
+ has_default_value=False, default_value=None,
+ message_type=None, enum_type=None, containing_type=None,
+ is_extension=False, extension_scope=None,
+ serialized_options=None, file=DESCRIPTOR),
+ ],
+ extensions=[
+ ],
+ nested_types=[],
+ enum_types=[
+ ],
+ serialized_options=None,
+ is_extendable=False,
+ syntax='proto3',
+ extension_ranges=[],
+ oneofs=[
+ ],
+ serialized_start=278,
+ serialized_end=381,
+)
+
+_SOURCE.fields_by_name['files'].message_type = build_dot_bazel_dot_remote_dot_execution_dot_v2_dot_remote__execution__pb2._DIGEST
+_UPDATESOURCEREQUEST.fields_by_name['source'].message_type = _SOURCE
+DESCRIPTOR.message_types_by_name['Source'] = _SOURCE
+DESCRIPTOR.message_types_by_name['GetSourceRequest'] = _GETSOURCEREQUEST
+DESCRIPTOR.message_types_by_name['UpdateSourceRequest'] = _UPDATESOURCEREQUEST
+_sym_db.RegisterFileDescriptor(DESCRIPTOR)
+
+Source = _reflection.GeneratedProtocolMessageType('Source', (_message.Message,), dict(
+ DESCRIPTOR = _SOURCE,
+ __module__ = 'buildstream.v2.source_pb2'
+ # @@protoc_insertion_point(class_scope:buildstream.v2.Source)
+ ))
+_sym_db.RegisterMessage(Source)
+
+GetSourceRequest = _reflection.GeneratedProtocolMessageType('GetSourceRequest', (_message.Message,), dict(
+ DESCRIPTOR = _GETSOURCEREQUEST,
+ __module__ = 'buildstream.v2.source_pb2'
+ # @@protoc_insertion_point(class_scope:buildstream.v2.GetSourceRequest)
+ ))
+_sym_db.RegisterMessage(GetSourceRequest)
+
+UpdateSourceRequest = _reflection.GeneratedProtocolMessageType('UpdateSourceRequest', (_message.Message,), dict(
+ DESCRIPTOR = _UPDATESOURCEREQUEST,
+ __module__ = 'buildstream.v2.source_pb2'
+ # @@protoc_insertion_point(class_scope:buildstream.v2.UpdateSourceRequest)
+ ))
+_sym_db.RegisterMessage(UpdateSourceRequest)
+
+
+
+_SOURCESERVICE = _descriptor.ServiceDescriptor(
+ name='SourceService',
+ full_name='buildstream.v2.SourceService',
+ file=DESCRIPTOR,
+ index=0,
+ serialized_options=None,
+ serialized_start=384,
+ serialized_end=551,
+ methods=[
+ _descriptor.MethodDescriptor(
+ name='GetSource',
+ full_name='buildstream.v2.SourceService.GetSource',
+ index=0,
+ containing_service=None,
+ input_type=_GETSOURCEREQUEST,
+ output_type=_SOURCE,
+ serialized_options=None,
+ ),
+ _descriptor.MethodDescriptor(
+ name='UpdateSource',
+ full_name='buildstream.v2.SourceService.UpdateSource',
+ index=1,
+ containing_service=None,
+ input_type=_UPDATESOURCEREQUEST,
+ output_type=_SOURCE,
+ serialized_options=None,
+ ),
+])
+_sym_db.RegisterServiceDescriptor(_SOURCESERVICE)
+
+DESCRIPTOR.services_by_name['SourceService'] = _SOURCESERVICE
+
+# @@protoc_insertion_point(module_scope)
diff --git a/src/buildstream/_protos/buildstream/v2/source_pb2_grpc.py b/src/buildstream/_protos/buildstream/v2/source_pb2_grpc.py
new file mode 100644
index 000000000..ecf734afb
--- /dev/null
+++ b/src/buildstream/_protos/buildstream/v2/source_pb2_grpc.py
@@ -0,0 +1,70 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+import grpc
+
+from buildstream._protos.buildstream.v2 import source_pb2 as buildstream_dot_v2_dot_source__pb2
+
+
+class SourceServiceStub(object):
+ # missing associated documentation comment in .proto file
+ pass
+
+ def __init__(self, channel):
+ """Constructor.
+
+ Args:
+ channel: A grpc.Channel.
+ """
+ self.GetSource = channel.unary_unary(
+ '/buildstream.v2.SourceService/GetSource',
+ request_serializer=buildstream_dot_v2_dot_source__pb2.GetSourceRequest.SerializeToString,
+ response_deserializer=buildstream_dot_v2_dot_source__pb2.Source.FromString,
+ )
+ self.UpdateSource = channel.unary_unary(
+ '/buildstream.v2.SourceService/UpdateSource',
+ request_serializer=buildstream_dot_v2_dot_source__pb2.UpdateSourceRequest.SerializeToString,
+ response_deserializer=buildstream_dot_v2_dot_source__pb2.Source.FromString,
+ )
+
+
+class SourceServiceServicer(object):
+ # missing associated documentation comment in .proto file
+ pass
+
+ def GetSource(self, request, context):
+ """Retrieve a source message given a reference name from the service
+
+ Errors:
+ * `NOT_FOUND`: The requested reference is not in the cache.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+ def UpdateSource(self, request, context):
+ """Sets a source message on the service
+
+ Errors:
+ * `RESOURCE_EXHAUSTED`: There is insufficient storage space to add the
+ entry to the cache.
+ """
+ context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+ context.set_details('Method not implemented!')
+ raise NotImplementedError('Method not implemented!')
+
+
+def add_SourceServiceServicer_to_server(servicer, server):
+ rpc_method_handlers = {
+ 'GetSource': grpc.unary_unary_rpc_method_handler(
+ servicer.GetSource,
+ request_deserializer=buildstream_dot_v2_dot_source__pb2.GetSourceRequest.FromString,
+ response_serializer=buildstream_dot_v2_dot_source__pb2.Source.SerializeToString,
+ ),
+ 'UpdateSource': grpc.unary_unary_rpc_method_handler(
+ servicer.UpdateSource,
+ request_deserializer=buildstream_dot_v2_dot_source__pb2.UpdateSourceRequest.FromString,
+ response_serializer=buildstream_dot_v2_dot_source__pb2.Source.SerializeToString,
+ ),
+ }
+ generic_handler = grpc.method_handlers_generic_handler(
+ 'buildstream.v2.SourceService', rpc_method_handlers)
+ server.add_generic_rpc_handlers((generic_handler,))
diff --git a/src/buildstream/_sourcecache.py b/src/buildstream/_sourcecache.py
index ce0694e08..fdfe00901 100644
--- a/src/buildstream/_sourcecache.py
+++ b/src/buildstream/_sourcecache.py
@@ -18,12 +18,15 @@
# Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
+import grpc
-from ._cas import CASRemoteSpec
+from ._cas import CASRemote, CASRemoteSpec
from .storage._casbaseddirectory import CasBasedDirectory
from ._basecache import BaseCache
-from ._exceptions import CASError, CASCacheError, SourceCacheError
+from ._exceptions import CASError, CASRemoteError, SourceCacheError
from . import utils
+from ._protos.buildstream.v2 import buildstream_pb2, buildstream_pb2_grpc, \
+ source_pb2, source_pb2_grpc
# Holds configuration for a remote used for the source cache.
@@ -38,6 +41,43 @@ class SourceCacheSpec(CASRemoteSpec):
pass
+class SourceRemote(CASRemote):
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.capabilities_service = None
+ self.source_service = None
+
+ def init(self):
+ if not self._initialized:
+ super().init()
+
+ self.capabilities_service = buildstream_pb2_grpc.CapabilitiesStub(self.channel)
+
+ # check that the service supports sources
+ try:
+ request = buildstream_pb2.GetCapabilitiesRequest()
+ if self.instance_name:
+ request.instance_name = self.instance_name
+
+ response = self.capabilities_service.GetCapabilities(request)
+ except grpc.RpcError as e:
+ # Check if this remote has the artifact service
+ if e.code() == grpc.StatusCode.UNIMPLEMENTED:
+ raise SourceCacheError(
+ "Configured remote does not have the BuildStream "
+ "capabilities service. Please check remote configuration.")
+ # Else raise exception with details
+ raise SourceCacheError(
+ "Remote initialisation failed: {}".format(e.details()))
+
+ if not response.source_capabilities:
+ raise SourceCacheError(
+ "Configured remote does not support source service")
+
+ # set up source service
+ self.source_service = source_pb2_grpc.SourceServiceStub(self.channel)
+
+
# Class that keeps config of remotes and deals with caching of sources.
#
# Args:
@@ -49,15 +89,20 @@ class SourceCache(BaseCache):
spec_name = "source_cache_specs"
spec_error = SourceCacheError
config_node_name = "source-caches"
+ remote_class = SourceRemote
def __init__(self, context):
super().__init__(context)
self._required_sources = set()
+ self.sourcerefdir = os.path.join(context.cachedir, 'source_protos')
+ os.makedirs(self.sourcerefdir, exist_ok=True)
- self.casquota.add_remove_callbacks(self.unrequired_sources, self.cas.remove)
+ self.casquota.add_remove_callbacks(self.unrequired_sources, self._remove_source)
self.casquota.add_list_refs_callback(self.list_sources)
+ self.cas.add_reachable_directories_callback(self._reachable_directories)
+
# mark_required_sources()
#
# Mark sources that are required by the current run.
@@ -77,8 +122,8 @@ class SourceCache(BaseCache):
for source in sources:
ref = source._get_source_name()
try:
- self.cas.update_mtime(ref)
- except CASCacheError:
+ self._update_mtime(ref)
+ except SourceCacheError:
pass
# required_sources()
@@ -103,23 +148,19 @@ class SourceCache(BaseCache):
def unrequired_sources(self):
required_source_names = set(map(
lambda x: x._get_source_name(), self._required_sources))
- for (mtime, source) in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*"):
+ for (mtime, source) in self._list_refs_mtimes(self.sourcerefdir):
if source not in required_source_names:
yield (mtime, source)
# list_sources()
#
- # Get list of all sources in the `cas/refs/heads/@sources/` folder
+ # Get list of all sources in the `sources_protos/` folder
#
# Returns:
# ([str]): iterable over all source refs
#
def list_sources(self):
- return [ref for _, ref in self._list_refs_mtimes(
- os.path.join(self.cas.casdir, 'refs', 'heads'),
- glob_expr="@sources/*")]
+ return [ref for _, ref in self._list_refs_mtimes(self.sourcerefdir)]
# contains()
#
@@ -134,7 +175,14 @@ class SourceCache(BaseCache):
#
def contains(self, source):
ref = source._get_source_name()
- return self.cas.contains(ref)
+ path = self._source_path(ref)
+
+ if not os.path.exists(path):
+ return False
+
+ # check files
+ source_proto = self._get_source(ref)
+ return self.cas.contains_directory(source_proto.files, with_files=True)
# commit()
#
@@ -162,7 +210,7 @@ class SourceCache(BaseCache):
else:
source._stage(vdir)
- self.cas.set_ref(ref, vdir._get_digest())
+ self._store_source(ref, vdir._get_digest())
# export()
#
@@ -175,13 +223,8 @@ class SourceCache(BaseCache):
# CASBasedDirectory
def export(self, source):
ref = source._get_source_name()
-
- try:
- digest = self.cas.resolve_ref(ref)
- except CASCacheError as e:
- raise SourceCacheError("Error exporting source: {}".format(e))
-
- return CasBasedDirectory(self.cas, digest=digest)
+ source = self._get_source(ref)
+ return CasBasedDirectory(self.cas, digest=source.files)
# pull()
#
@@ -204,13 +247,27 @@ class SourceCache(BaseCache):
try:
source.status("Pulling source {} <- {}".format(display_key, remote.spec.url))
- if self.cas.pull(ref, remote):
- source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
- # no need to pull from additional remotes
- return True
- else:
- source.info("Remote ({}) does not have source {} cached".format(
+ # fetch source proto
+ response = self._pull_source(ref, remote)
+ if response is None:
+ source.info("Remote source service ({}) does not have source {} cached".format(
+ remote.spec.url, display_key))
+ continue
+
+ # Fetch source blobs
+ self.cas._fetch_directory(remote, response.files)
+ required_blobs = self.cas.required_blobs_for_directory(response.files)
+ missing_blobs = self.cas.local_missing_blobs(required_blobs)
+ missing_blobs = self.cas.fetch_blobs(remote, missing_blobs)
+
+ if missing_blobs:
+ source.info("Remote cas ({}) does not have source {} cached".format(
remote.spec.url, display_key))
+ continue
+
+ source.info("Pulled source {} <- {}".format(display_key, remote.spec.url))
+ return True
+
except CASError as e:
raise SourceCacheError("Failed to pull source {}: {}".format(
display_key, e)) from e
@@ -242,11 +299,105 @@ class SourceCache(BaseCache):
for remote in push_remotes:
remote.init()
source.status("Pushing source {} -> {}".format(display_key, remote.spec.url))
- if self.cas.push([ref], remote):
- source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
- pushed = True
- else:
+
+ # check whether cache has files already
+ if self._pull_source(ref, remote) is not None:
source.info("Remote ({}) already has source {} cached"
.format(remote.spec.url, display_key))
+ continue
+
+ # push files to storage
+ source_proto = self._get_source(ref)
+ try:
+ self.cas._send_directory(remote, source_proto.files)
+ except CASRemoteError:
+ source.info("Failed to push source files {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ if not self._push_source(ref, remote):
+ source.info("Failed to push source metadata {} -> {}".format(display_key, remote.spec.url))
+ continue
+
+ source.info("Pushed source {} -> {}".format(display_key, remote.spec.url))
+ pushed = True
return pushed
+
+ def _remove_source(self, ref, *, defer_prune=False):
+ return self.cas.remove(ref, basedir=self.sourcerefdir, defer_prune=defer_prune)
+
+ def _store_source(self, ref, digest):
+ source_proto = source_pb2.Source()
+ source_proto.files.CopyFrom(digest)
+
+ self._store_proto(source_proto, ref)
+
+ def _store_proto(self, proto, ref):
+ path = self._source_path(ref)
+ os.makedirs(os.path.dirname(path), exist_ok=True)
+ with utils.save_file_atomic(path, 'w+b') as f:
+ f.write(proto.SerializeToString())
+
+ def _get_source(self, ref):
+ path = self._source_path(ref)
+ source_proto = source_pb2.Source()
+ try:
+ with open(path, 'r+b') as f:
+ source_proto.ParseFromString(f.read())
+ return source_proto
+ except FileNotFoundError as e:
+ raise SourceCacheError("Attempted to access unavailable source: {}"
+ .format(e)) from e
+
+ def _source_path(self, ref):
+ return os.path.join(self.sourcerefdir, ref)
+
+ def _reachable_directories(self):
+ for root, _, files in os.walk(self.sourcerefdir):
+ for source_file in files:
+ source = source_pb2.Source()
+ with open(os.path.join(root, source_file), 'r+b') as f:
+ source.ParseFromString(f.read())
+
+ yield source.files
+
+ def _update_mtime(self, ref):
+ try:
+ os.utime(self._source_path(ref))
+ except FileNotFoundError as e:
+ raise SourceCacheError("Couldn't find source: {}".format(ref)) from e
+
+ def _pull_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ request = source_pb2.GetSourceRequest()
+ request.cache_key = source_ref
+
+ response = remote.source_service.GetSource(request)
+
+ self._store_proto(response, source_ref)
+
+ return response
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.NOT_FOUND:
+ raise SourceCacheError("Failed to pull source: {}".format(e.details()))
+ return None
+
+ def _push_source(self, source_ref, remote):
+ try:
+ remote.init()
+
+ source_proto = self._get_source(source_ref)
+
+ request = source_pb2.UpdateSourceRequest()
+ request.cache_key = source_ref
+ request.source.CopyFrom(source_proto)
+
+ return remote.source_service.UpdateSource(request)
+
+ except grpc.RpcError as e:
+ if e.code() != grpc.StatusCode.RESOURCE_EXHAUSTED:
+ raise SourceCacheError("Failed to push source: {}".format(e.details()))
+ return None
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index b5c8f9a63..76c56fd1d 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -1063,8 +1063,7 @@ class Source(Plugin):
# Gives a ref path that points to where sources are kept in the CAS
def _get_source_name(self):
# @ is used to prevent conflicts with project names
- return "{}/{}/{}".format(
- '@sources',
+ return "{}/{}".format(
self.get_kind(),
self._key)
diff --git a/tests/frontend/buildtrack.py b/tests/frontend/buildtrack.py
index d42b6d1ba..13e5ab96e 100644
--- a/tests/frontend/buildtrack.py
+++ b/tests/frontend/buildtrack.py
@@ -140,8 +140,8 @@ def test_build_track(cli, datafiles, tmpdir, ref_storage, strict,
# Delete element sources
source_dir = os.path.join(project, 'cache', 'sources')
shutil.rmtree(source_dir)
- source_refs = os.path.join(project, 'cache', 'cas', 'refs', 'heads', '@sources')
- shutil.rmtree(source_refs)
+ source_protos = os.path.join(project, 'cache', 'source_protos')
+ shutil.rmtree(source_protos)
# Delete artifacts one by one and assert element states
for target in set(tracked):
diff --git a/tests/sourcecache/cache.py b/tests/sourcecache/cache.py
index 20faaa64e..793344ef0 100644
--- a/tests/sourcecache/cache.py
+++ b/tests/sourcecache/cache.py
@@ -39,10 +39,10 @@ def test_patch_sources_cached_1(cli, datafiles):
# as we have a local, patch, local config, the first local and patch should
# be cached together, and the last local on it's own
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1
- assert len(os.listdir(os.path.join(source_dir, 'local'))) == 2
+ assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1
+ assert len(os.listdir(os.path.join(source_protos, 'local'))) == 2
@pytest.mark.datafiles(DATA_DIR)
@@ -53,9 +53,9 @@ def test_patch_sources_cached_2(cli, datafiles):
res.assert_success()
# As everything is before the patch it should all be cached together
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'patch'))) == 1
+ assert len(os.listdir(os.path.join(source_protos, 'patch'))) == 1
@pytest.mark.datafiles(DATA_DIR)
@@ -66,9 +66,9 @@ def test_sources_without_patch(cli, datafiles):
res.assert_success()
# No patches so everything should be cached seperately
- source_dir = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(project_dir, 'cache', 'source_protos')
- assert len(os.listdir(os.path.join(source_dir, 'local'))) == 3
+ assert len(os.listdir(os.path.join(source_protos, 'local'))) == 3
@pytest.mark.datafiles(DATA_DIR)
@@ -105,8 +105,8 @@ def test_source_cache_key(cli, datafiles):
res.assert_success()
# Should have one source ref
- patch_refs = os.path.join(project_dir, 'cache', 'cas', 'refs', 'heads', '@sources', 'patch')
- assert len(os.listdir(patch_refs)) == 1
+ patch_protos = os.path.join(project_dir, 'cache', 'source_protos', 'patch')
+ assert len(os.listdir(patch_protos)) == 1
# modify hello-patch file and check tracking updates refs
with open(os.path.join(file_path, 'dev-files', 'usr', 'include', 'pony.h'), 'a') as f:
@@ -120,4 +120,4 @@ def test_source_cache_key(cli, datafiles):
res.assert_success()
# We should have a new source ref
- assert len(os.listdir(patch_refs)) == 2
+ assert len(os.listdir(patch_protos)) == 2
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 3fc9d96a6..de8587862 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -89,15 +89,22 @@ def test_source_fetch(cli, tmpdir, datafiles):
assert os.listdir(os.path.join(str(tmpdir), 'cache', 'sources', 'git')) != []
+ # get root digest of source
+ sourcecache = context.sourcecache
+ digest = sourcecache.export(source)._get_digest()
+
# Move source in local cas to repo
shutil.rmtree(os.path.join(str(tmpdir), 'sourceshare', 'repo', 'cas'))
shutil.move(
+ os.path.join(str(tmpdir), 'cache', 'source_protos'),
+ os.path.join(str(tmpdir), 'sourceshare', 'repo'))
+ shutil.move(
os.path.join(str(tmpdir), 'cache', 'cas'),
os.path.join(str(tmpdir), 'sourceshare', 'repo'))
shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'sources'))
shutil.rmtree(os.path.join(str(tmpdir), 'cache', 'artifacts'))
- digest = share.cas.resolve_ref(source._get_source_name())
+ # check the share has the object
assert share.has_object(digest)
state = cli.get_element_state(project_dir, 'fetch.bst')
@@ -163,7 +170,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
res = cli.run(project=project_dir, args=['source', 'fetch', 'fetch.bst'])
res.assert_success()
brief_key = source._get_brief_display_key()
- assert ("Remote ({}) does not have source {} cached"
+ assert ("Remote source service ({}) does not have source {} cached"
.format(share.repo, brief_key)) in res.stderr
assert ("SUCCESS Fetching from {}"
.format(repo.source_config(ref=ref)['url'])) in res.stderr
@@ -219,5 +226,5 @@ def test_pull_fail(cli, tmpdir, datafiles):
res = cli.run(project=project_dir, args=['build', 'push.bst'])
res.assert_main_error(ErrorDomain.STREAM, None)
res.assert_task_error(ErrorDomain.PLUGIN, None)
- assert "Remote ({}) does not have source {} cached".format(
+ assert "Remote source service ({}) does not have source {} cached".format(
share.repo, source._get_brief_display_key()) in res.stderr
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 6282b6e60..23f5f1ca1 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -95,7 +95,7 @@ def test_source_push(cli, tmpdir, datafiles):
assert sourcecache.contains(source)
# check that's the remote CAS now has it
- digest = share.cas.resolve_ref(source._get_source_name())
+ digest = sourcecache.export(source)._get_digest()
assert share.has_object(digest)
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 9dc431bda..c15bed215 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -78,8 +78,7 @@ def test_source_staged(tmpdir, cli, datafiles):
assert sourcecache.contains(source)
# Extract the file and check it's the same as the one we imported
- ref = source._get_source_name()
- digest = cas.resolve_ref(ref)
+ digest = sourcecache.export(source)._get_digest()
extractdir = os.path.join(str(tmpdir), "extract")
cas.checkout(extractdir, digest)
dir1 = extractdir
@@ -108,6 +107,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
context.cachedir = cachedir
context.messenger.set_message_handler(dummy_message_handler)
cas = context.get_cascache()
+ sourcecache = context.sourcecache
res = cli.run(project=project_dir, args=["source", "fetch", "import-dev.bst"])
res.assert_success()
@@ -117,8 +117,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
assert element._source_cached()
# check that the directory structures are idetical
- ref = source._get_source_name()
- digest = cas.resolve_ref(ref)
+ digest = sourcecache.export(source)._get_digest()
extractdir = os.path.join(str(tmpdir), "extract")
cas.checkout(extractdir, digest)
dir1 = extractdir
@@ -133,11 +132,11 @@ def test_staged_source_build(tmpdir, datafiles, cli):
project_dir = os.path.join(datafiles.dirname, datafiles.basename, 'project')
cachedir = os.path.join(str(tmpdir), 'cache')
element_path = 'elements'
- source_refs = os.path.join(str(tmpdir), 'cache', 'cas', 'refs', 'heads', '@sources')
+ source_protos = os.path.join(str(tmpdir), 'cache', 'source_protos')
source_dir = os.path.join(str(tmpdir), 'cache', 'sources')
cli.configure({
- 'cachedir': os.path.join(str(tmpdir), 'cache')
+ 'cachedir': cachedir
})
create_element_size('target.bst', project_dir, element_path, [], 10000)
@@ -181,7 +180,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
assert files == []
# Now remove the source refs and check the state
- shutil.rmtree(source_refs)
+ shutil.rmtree(source_protos)
cli.remove_artifact_from_cache(project_dir, 'target.bst')
states = cli.get_element_states(project_dir, ['target.bst'])
assert states['target.bst'] == 'fetch needed'