1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
|
#
# Copyright (C) 2019 Bloomberg Finance LP
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
# Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
import pytest
import signal
from urllib.parse import urlparse
from multiprocessing import Process, Queue
import grpc
from buildstream._protos.buildstream.v2.artifact_pb2 \
import Artifact, GetArtifactRequest, UpdateArtifactRequest
from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactServiceStub
from buildstream._protos.build.bazel.remote.execution.v2 \
import remote_execution_pb2 as re_pb2
from buildstream._protos.build.bazel.remote.execution.v2 \
import remote_execution_pb2_grpc as re_pb2_grpc
from buildstream import utils
from buildstream import _signals
from tests.testutils.artifactshare import create_artifact_share
# Since parent processes wait for queue events, we need
# to put something on it if the called process raises an
# exception.
def _queue_wrapper(target, queue, *args):
try:
target(*args, queue=queue)
except Exception as e:
queue.put(str(e))
raise
def test_artifact_get_not_found(tmpdir):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir) as share:
# set up artifact service stub
url = urlparse(share.repo)
queue = Queue()
process = Process(target=_queue_wrapper, args=(_artifact_request, queue, url))
try:
with _signals.blocked([signal.SIGINT], ignore=False):
process.start()
error = queue.get()
process.join()
except KeyboardInterrupt:
utils._kill_process_tree(process.pid)
raise
assert not error
def _artifact_request(url, queue):
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
artifact_stub = ArtifactServiceStub(channel)
# Run GetArtifact and check it throws a not found error
request = GetArtifactRequest()
request.cache_key = "@artifact/something/not_there"
try:
artifact_stub.GetArtifact(request)
except grpc.RpcError as e:
assert e.code() == grpc.StatusCode.NOT_FOUND
assert e.details() == "Artifact proto not found"
queue.put(None)
else:
assert False
# Successfully getting the artifact
@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
def test_update_artifact(tmpdir, files):
sharedir = os.path.join(str(tmpdir), "share")
with create_artifact_share(sharedir) as share:
# put files object
if files == "present":
directory = re_pb2.Directory()
digest = share.cas.add_object(buffer=directory.SerializeToString())
elif files == "invalid":
digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
elif files == "absent":
digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))
url = urlparse(share.repo)
queue = Queue()
process = Process(target=_queue_wrapper, args=(_get_artifact, queue, url, files, digest))
try:
with _signals.blocked([signal.SIGINT], ignore=False):
process.start()
error = queue.get()
process.join()
except KeyboardInterrupt:
utils._kill_process_tree(process.pid)
raise
assert not error
def _get_artifact(url, files, digest, queue):
channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
artifact_stub = ArtifactServiceStub(channel)
# initialise an artifact
artifact = Artifact()
artifact.version = 0
artifact.build_success = True
artifact.strong_key = "abcdefghijklmnop"
artifact.files.hash = "hashymchashash"
artifact.files.size_bytes = 10
artifact.files.CopyFrom(digest)
# Put it in the artifact share with an UpdateArtifactRequest
request = UpdateArtifactRequest()
request.artifact.CopyFrom(artifact)
request.cache_key = "a-cache-key"
# should return the same artifact back
if files == "present":
response = artifact_stub.UpdateArtifact(request)
assert response == artifact
else:
try:
artifact_stub.UpdateArtifact(request)
except grpc.RpcError as e:
assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
if files == "absent":
assert e.details() == "Artifact files specified but no files found"
elif files == "invalid":
assert e.details() == "Artifact files specified but directory not found"
queue.put(None)
return
# If we uploaded the artifact check GetArtifact
request = GetArtifactRequest()
request.cache_key = "a-cache-key"
response = artifact_stub.GetArtifact(request)
assert response == artifact
queue.put(None)
|