summaryrefslogtreecommitdiff
path: root/tests/artifactcache/artifactservice.py
blob: 791083b17900d3ff95ecf03ec16371476f4e1150 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#
#  Copyright (C) 2019 Bloomberg Finance LP
#
#  This program is free software; you can redistribute it and/or
#  modify it under the terms of the GNU Lesser General Public
#  License as published by the Free Software Foundation; either
#  version 2 of the License, or (at your option) any later version.
#
#  This library is distributed in the hope that it will be useful,
#  but WITHOUT ANY WARRANTY; without even the implied warranty of
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#  Lesser General Public License for more details.
#
#  You should have received a copy of the GNU Lesser General Public
#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
#
#  Authors: Raoul Hidalgo Charman <raoul.hidalgocharman@codethink.co.uk>
#
import os
import pytest
import signal
from urllib.parse import urlparse
from multiprocessing import Process, Queue

import grpc

from buildstream._protos.buildstream.v2.artifact_pb2 \
    import Artifact, GetArtifactRequest, UpdateArtifactRequest
from buildstream._protos.buildstream.v2.artifact_pb2_grpc import ArtifactServiceStub
from buildstream._protos.build.bazel.remote.execution.v2 \
    import remote_execution_pb2 as re_pb2
from buildstream._protos.build.bazel.remote.execution.v2 \
    import remote_execution_pb2_grpc as re_pb2_grpc
from buildstream import utils
from buildstream import _signals

from tests.testutils.artifactshare import create_artifact_share


# Since parent processes wait for queue events, we need
# to put something on it if the called process raises an
# exception.
def _queue_wrapper(target, queue, *args):
    try:
        target(*args, queue=queue)
    except Exception as e:
        queue.put(str(e))
        raise


def test_artifact_get_not_found(tmpdir):
    sharedir = os.path.join(str(tmpdir), "share")
    with create_artifact_share(sharedir) as share:
        # set up artifact service stub
        url = urlparse(share.repo)
        queue = Queue()
        process = Process(target=_queue_wrapper, args=(_artifact_request, queue, url))
        try:
            with _signals.blocked([signal.SIGINT], ignore=False):
                process.start()
            error = queue.get()
            process.join()
        except KeyboardInterrupt:
            utils._kill_process_tree(process.pid)
            raise

        assert not error


def _artifact_request(url, queue):
    channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
    artifact_stub = ArtifactServiceStub(channel)

    # Run GetArtifact and check it throws a not found error
    request = GetArtifactRequest()
    request.cache_key = "@artifact/something/not_there"
    try:
        artifact_stub.GetArtifact(request)
    except grpc.RpcError as e:
        assert e.code() == grpc.StatusCode.NOT_FOUND
        assert e.details() == "Artifact proto not found"
        queue.put(None)
    else:
        assert False


# Successfully getting the artifact
@pytest.mark.parametrize("files", ["present", "absent", "invalid"])
def test_update_artifact(tmpdir, files):
    sharedir = os.path.join(str(tmpdir), "share")
    with create_artifact_share(sharedir) as share:
        # put files object
        if files == "present":
            directory = re_pb2.Directory()
            digest = share.cas.add_object(buffer=directory.SerializeToString())
        elif files == "invalid":
            digest = share.cas.add_object(buffer="abcdefghijklmnop".encode("utf-8"))
        elif files == "absent":
            digest = utils._message_digest("abcdefghijklmnop".encode("utf-8"))

        url = urlparse(share.repo)
        queue = Queue()
        process = Process(target=_queue_wrapper, args=(_get_artifact, queue, url, files, digest))

        try:
            with _signals.blocked([signal.SIGINT], ignore=False):
                process.start()
            error = queue.get()
            process.join()
        except KeyboardInterrupt:
            utils._kill_process_tree(process.pid)
            raise

        assert not error


def _get_artifact(url, files, digest, queue):
    channel = grpc.insecure_channel("{}:{}".format(url.hostname, url.port))
    artifact_stub = ArtifactServiceStub(channel)

    # initialise an artifact
    artifact = Artifact()
    artifact.version = 0
    artifact.build_success = True
    artifact.strong_key = "abcdefghijklmnop"
    artifact.files.hash = "hashymchashash"
    artifact.files.size_bytes = 10

    artifact.files.CopyFrom(digest)

    # Put it in the artifact share with an UpdateArtifactRequest
    request = UpdateArtifactRequest()
    request.artifact.CopyFrom(artifact)
    request.cache_key = "a-cache-key"

    # should return the same artifact back
    if files == "present":
        response = artifact_stub.UpdateArtifact(request)
        assert response == artifact
    else:
        try:
            artifact_stub.UpdateArtifact(request)
        except grpc.RpcError as e:
            assert e.code() == grpc.StatusCode.FAILED_PRECONDITION
            if files == "absent":
                assert e.details() == "Artifact files specified but no files found"
            elif files == "invalid":
                assert e.details() == "Artifact files specified but directory not found"
            queue.put(None)
            return

    # If we uploaded the artifact check GetArtifact
    request = GetArtifactRequest()
    request.cache_key = "a-cache-key"

    response = artifact_stub.GetArtifact(request)
    assert response == artifact
    queue.put(None)