summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Schubert <contact@benschubert.me>2020-07-09 19:02:20 +0100
committerBenjamin Schubert <contact@benschubert.me>2020-08-22 14:14:27 +0100
commitd211abc07f5e56ad1f9e4a6a6b8804e67380e4fe (patch)
treec7e3aadc02164eb6c433607407c335fd56c67cf6
parent5e118e25031ac132cfed6ad1fd951be5560d9a62 (diff)
downloadbuildstream-d211abc07f5e56ad1f9e4a6a6b8804e67380e4fe.tar.gz
plugin.py: Add a helper to run blocking processes in subprocesses
This ensures that we can cleanly cleanup processes and threads on termination of BuildStream. Plugins should use this helper whenever there is a risk of them being blocked on a syscall for an indefinite amount of time
-rw-r--r--src/buildstream/downloadablefilesource.py97
-rw-r--r--src/buildstream/plugin.py28
2 files changed, 80 insertions, 45 deletions
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index b9ca91945..487544551 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -99,6 +99,34 @@ class _NetrcPasswordManager:
return login, password
+def _download_file(opener, url, etag, directory):
+ default_name = os.path.basename(url)
+ request = urllib.request.Request(url)
+ request.add_header("Accept", "*/*")
+ request.add_header("User-Agent", "BuildStream/2")
+
+ if etag is not None:
+ request.add_header("If-None-Match", etag)
+
+ with contextlib.closing(opener.open(request)) as response:
+ info = response.info()
+
+ # some servers don't honor the 'If-None-Match' header
+ if etag and info["ETag"] == etag:
+ return None, None
+
+ etag = info["ETag"]
+
+ filename = info.get_filename(default_name)
+ filename = os.path.basename(filename)
+ local_file = os.path.join(directory, filename)
+ with open(local_file, "wb") as dest:
+ shutil.copyfileobj(response, dest)
+
+ return local_file, etag
+
+
+
class DownloadableFileSource(Source):
# pylint: disable=attribute-defined-outside-init
@@ -137,19 +165,18 @@ class DownloadableFileSource(Source):
# there is no 'track' field in the source to determine what/whether
# or not to update refs, because tracking a ref is always a conscious
# decision by the user.
- with self.timed_activity("Tracking {}".format(self.url), silent_nested=True):
- new_ref = self._ensure_mirror()
+ new_ref = self._ensure_mirror("Tracking {}".format(self.url))
- if self.ref and self.ref != new_ref:
- detail = (
- "When tracking, new ref differs from current ref:\n"
- + " Tracked URL: {}\n".format(self.url)
- + " Current ref: {}\n".format(self.ref)
- + " New ref: {}\n".format(new_ref)
- )
- self.warn("Potential man-in-the-middle attack!", detail=detail)
+ if self.ref and self.ref != new_ref:
+ detail = (
+ "When tracking, new ref differs from current ref:\n"
+ + " Tracked URL: {}\n".format(self.url)
+ + " Current ref: {}\n".format(self.ref)
+ + " New ref: {}\n".format(new_ref)
+ )
+ self.warn("Potential man-in-the-middle attack!", detail=detail)
- return new_ref
+ return new_ref
def fetch(self): # pylint: disable=arguments-differ
@@ -162,12 +189,11 @@ class DownloadableFileSource(Source):
# Download the file, raise hell if the sha256sums don't match,
# and mirror the file otherwise.
- with self.timed_activity("Fetching {}".format(self.url), silent_nested=True):
- sha256 = self._ensure_mirror()
- if sha256 != self.ref:
- raise SourceError(
- "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
- )
+ sha256 = self._ensure_mirror("Fetching {}".format(self.url),)
+ if sha256 != self.ref:
+ raise SourceError(
+ "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
+ )
def _warn_deprecated_etag(self, node):
etag = node.get_str("etag", None)
@@ -188,40 +214,23 @@ class DownloadableFileSource(Source):
with utils.save_file_atomic(etagfilename) as etagfile:
etagfile.write(etag)
- def _ensure_mirror(self):
+ def _ensure_mirror(self, activity_name: str):
# Downloads from the url and caches it according to its sha256sum.
try:
with self.tempdir() as td:
- default_name = os.path.basename(self.url)
- request = urllib.request.Request(self.url)
- request.add_header("Accept", "*/*")
- request.add_header("User-Agent", "BuildStream/2")
-
# We do not use etag in case what we have in cache is
# not matching ref in order to be able to recover from
# corrupted download.
- if self.ref:
- etag = self._get_etag(self.ref)
-
+ if self.ref and not self.is_cached():
# Do not re-download the file if the ETag matches.
- if etag and self.is_cached():
- request.add_header("If-None-Match", etag)
-
- opener = self.__get_urlopener()
- with contextlib.closing(opener.open(request)) as response:
- info = response.info()
-
- # some servers don't honor the 'If-None-Match' header
- if self.ref and etag and info["ETag"] == etag:
- return self.ref
+ etag = self._get_etag(self.ref)
+ else:
+ etag = None
- etag = info["ETag"]
+ local_file, new_etag = self.blocking_activity(_download_file, (self.__get_urlopener(), self.url, etag, td), activity_name)
- filename = info.get_filename(default_name)
- filename = os.path.basename(filename)
- local_file = os.path.join(td, filename)
- with open(local_file, "wb") as dest:
- shutil.copyfileobj(response, dest)
+ if local_file is None:
+ return self.ref
# Make sure url-specific mirror dir exists.
if not os.path.isdir(self._mirror_dir):
@@ -233,8 +242,8 @@ class DownloadableFileSource(Source):
# In case the old file was corrupted somehow.
os.rename(local_file, self._get_mirror_file(sha256))
- if etag:
- self._store_etag(sha256, etag)
+ if new_etag:
+ self._store_etag(sha256, new_etag)
return sha256
except urllib.error.HTTPError as e:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index deb105a3b..0ed6d7d6f 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -110,11 +110,12 @@ Class Reference
"""
import itertools
+import multiprocessing
import os
import subprocess
import sys
from contextlib import contextmanager
-from typing import Generator, Optional, Tuple, TYPE_CHECKING
+from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING
from weakref import WeakValueDictionary
from . import utils
@@ -131,6 +132,14 @@ if TYPE_CHECKING:
# pylint: enable=cyclic-import
+T1 = TypeVar("T1")
+T2 = TypeVar("T2")
+
+
+def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None:
+ queue.put(target(*args))
+
+
class Plugin:
"""Plugin()
@@ -212,6 +221,8 @@ class Plugin:
# scheduling tasks.
__TABLE = WeakValueDictionary() # type: WeakValueDictionary[int, Plugin]
+ __multiprocessing_context = multiprocessing.get_context("spawn")
+
def __init__(
self,
name: str,
@@ -503,6 +514,21 @@ class Plugin:
):
yield
+ def blocking_activity(self, target: Callable[[T1], T2], args: T1, activity_name: str, *, detail: Optional[str] = None, silent_nested: bool = False) -> T2:
+ with self.__context.messenger.timed_activity(
+ activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested
+ ):
+ queue = self.__multiprocessing_context.Queue()
+
+ proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
+ proc.start()
+
+ result = queue.get()
+ proc.join()
+
+ return result
+
+
def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
"""A wrapper for subprocess.call()