diff options
author | Benjamin Schubert <contact@benschubert.me> | 2020-07-09 19:02:20 +0100 |
---|---|---|
committer | Benjamin Schubert <contact@benschubert.me> | 2020-08-22 14:14:27 +0100 |
commit | d211abc07f5e56ad1f9e4a6a6b8804e67380e4fe (patch) | |
tree | c7e3aadc02164eb6c433607407c335fd56c67cf6 | |
parent | 5e118e25031ac132cfed6ad1fd951be5560d9a62 (diff) | |
download | buildstream-d211abc07f5e56ad1f9e4a6a6b8804e67380e4fe.tar.gz |
plugin.py: Add a helper to run blocking processes in subprocesses
This ensures that we can cleanly cleanup processes and threads on
termination of BuildStream.
Plugins should use this helper whenever there is a risk of them being
blocked on a syscall for an indefinite amount of time
-rw-r--r-- | src/buildstream/downloadablefilesource.py | 97 | ||||
-rw-r--r-- | src/buildstream/plugin.py | 28 |
2 files changed, 80 insertions, 45 deletions
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py index b9ca91945..487544551 100644 --- a/src/buildstream/downloadablefilesource.py +++ b/src/buildstream/downloadablefilesource.py @@ -99,6 +99,34 @@ class _NetrcPasswordManager: return login, password +def _download_file(opener, url, etag, directory): + default_name = os.path.basename(url) + request = urllib.request.Request(url) + request.add_header("Accept", "*/*") + request.add_header("User-Agent", "BuildStream/2") + + if etag is not None: + request.add_header("If-None-Match", etag) + + with contextlib.closing(opener.open(request)) as response: + info = response.info() + + # some servers don't honor the 'If-None-Match' header + if etag and info["ETag"] == etag: + return None, None + + etag = info["ETag"] + + filename = info.get_filename(default_name) + filename = os.path.basename(filename) + local_file = os.path.join(directory, filename) + with open(local_file, "wb") as dest: + shutil.copyfileobj(response, dest) + + return local_file, etag + + + class DownloadableFileSource(Source): # pylint: disable=attribute-defined-outside-init @@ -137,19 +165,18 @@ class DownloadableFileSource(Source): # there is no 'track' field in the source to determine what/whether # or not to update refs, because tracking a ref is always a conscious # decision by the user. - with self.timed_activity("Tracking {}".format(self.url), silent_nested=True): - new_ref = self._ensure_mirror() + new_ref = self._ensure_mirror("Tracking {}".format(self.url)) - if self.ref and self.ref != new_ref: - detail = ( - "When tracking, new ref differs from current ref:\n" - + " Tracked URL: {}\n".format(self.url) - + " Current ref: {}\n".format(self.ref) - + " New ref: {}\n".format(new_ref) - ) - self.warn("Potential man-in-the-middle attack!", detail=detail) + if self.ref and self.ref != new_ref: + detail = ( + "When tracking, new ref differs from current ref:\n" + + " Tracked URL: {}\n".format(self.url) + + " Current ref: {}\n".format(self.ref) + + " New ref: {}\n".format(new_ref) + ) + self.warn("Potential man-in-the-middle attack!", detail=detail) - return new_ref + return new_ref def fetch(self): # pylint: disable=arguments-differ @@ -162,12 +189,11 @@ class DownloadableFileSource(Source): # Download the file, raise hell if the sha256sums don't match, # and mirror the file otherwise. - with self.timed_activity("Fetching {}".format(self.url), silent_nested=True): - sha256 = self._ensure_mirror() - if sha256 != self.ref: - raise SourceError( - "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) - ) + sha256 = self._ensure_mirror("Fetching {}".format(self.url),) + if sha256 != self.ref: + raise SourceError( + "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref) + ) def _warn_deprecated_etag(self, node): etag = node.get_str("etag", None) @@ -188,40 +214,23 @@ class DownloadableFileSource(Source): with utils.save_file_atomic(etagfilename) as etagfile: etagfile.write(etag) - def _ensure_mirror(self): + def _ensure_mirror(self, activity_name: str): # Downloads from the url and caches it according to its sha256sum. try: with self.tempdir() as td: - default_name = os.path.basename(self.url) - request = urllib.request.Request(self.url) - request.add_header("Accept", "*/*") - request.add_header("User-Agent", "BuildStream/2") - # We do not use etag in case what we have in cache is # not matching ref in order to be able to recover from # corrupted download. - if self.ref: - etag = self._get_etag(self.ref) - + if self.ref and not self.is_cached(): # Do not re-download the file if the ETag matches. - if etag and self.is_cached(): - request.add_header("If-None-Match", etag) - - opener = self.__get_urlopener() - with contextlib.closing(opener.open(request)) as response: - info = response.info() - - # some servers don't honor the 'If-None-Match' header - if self.ref and etag and info["ETag"] == etag: - return self.ref + etag = self._get_etag(self.ref) + else: + etag = None - etag = info["ETag"] + local_file, new_etag = self.blocking_activity(_download_file, (self.__get_urlopener(), self.url, etag, td), activity_name) - filename = info.get_filename(default_name) - filename = os.path.basename(filename) - local_file = os.path.join(td, filename) - with open(local_file, "wb") as dest: - shutil.copyfileobj(response, dest) + if local_file is None: + return self.ref # Make sure url-specific mirror dir exists. if not os.path.isdir(self._mirror_dir): @@ -233,8 +242,8 @@ class DownloadableFileSource(Source): # In case the old file was corrupted somehow. os.rename(local_file, self._get_mirror_file(sha256)) - if etag: - self._store_etag(sha256, etag) + if new_etag: + self._store_etag(sha256, new_etag) return sha256 except urllib.error.HTTPError as e: diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py index deb105a3b..0ed6d7d6f 100644 --- a/src/buildstream/plugin.py +++ b/src/buildstream/plugin.py @@ -110,11 +110,12 @@ Class Reference """ import itertools +import multiprocessing import os import subprocess import sys from contextlib import contextmanager -from typing import Generator, Optional, Tuple, TYPE_CHECKING +from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING from weakref import WeakValueDictionary from . import utils @@ -131,6 +132,14 @@ if TYPE_CHECKING: # pylint: enable=cyclic-import +T1 = TypeVar("T1") +T2 = TypeVar("T2") + + +def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None: + queue.put(target(*args)) + + class Plugin: """Plugin() @@ -212,6 +221,8 @@ class Plugin: # scheduling tasks. __TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin] + __multiprocessing_context = multiprocessing.get_context("spawn") + def __init__( self, name: str, @@ -503,6 +514,21 @@ class Plugin: ): yield + def blocking_activity(self, target: Callable[[T1], T2], args: T1, activity_name: str, *, detail: Optional[str] = None, silent_nested: bool = False) -> T2: + with self.__context.messenger.timed_activity( + activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested + ): + queue = self.__multiprocessing_context.Queue() + + proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args)) + proc.start() + + result = queue.get() + proc.join() + + return result + + def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int: """A wrapper for subprocess.call() |