diff options
author | Jordan Borean <jborean93@gmail.com> | 2019-08-23 06:27:28 +1000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-23 06:27:28 +1000 |
commit | e04b2a9697512bc64d2613033274e67fbfbd26c7 (patch) | |
tree | 7eea76e5b4e5994015f804534fad717e0e4ffaa9 /lib/ansible/galaxy | |
parent | c81a1057e10ca433a99af31158ec5a0c597f6d2d (diff) | |
download | ansible-e04b2a9697512bc64d2613033274e67fbfbd26c7.tar.gz |
ansible-galaxy - Add timeout and progress indicator for publish and install (#60660)
* ansible-galaxy - Add timeout and progress indicator for publish
* add progress indicator to install phase as well
Diffstat (limited to 'lib/ansible/galaxy')
-rw-r--r-- | lib/ansible/galaxy/collection.py | 149 |
1 files changed, 114 insertions, 35 deletions
diff --git a/lib/ansible/galaxy/collection.py b/lib/ansible/galaxy/collection.py index b999fa303d..911441e38b 100644 --- a/lib/ansible/galaxy/collection.py +++ b/lib/ansible/galaxy/collection.py @@ -11,6 +11,7 @@ import os import shutil import tarfile import tempfile +import threading import time import uuid import yaml @@ -21,6 +22,11 @@ from hashlib import sha256 from io import BytesIO from yaml.error import YAMLError +try: + import queue +except ImportError: + import Queue as queue # Python 2 + import ansible.constants as C from ansible.errors import AnsibleError from ansible.galaxy import get_collections_galaxy_meta_info @@ -383,13 +389,14 @@ def build_collection(collection_path, output_path, force): _build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest) -def publish_collection(collection_path, api, wait): +def publish_collection(collection_path, api, wait, timeout): """ Publish an Ansible collection tarball into an Ansible Galaxy server. :param collection_path: The path to the collection tarball to publish. :param api: A GalaxyAPI to publish the collection to. :param wait: Whether to wait until the import process is complete. + :param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite. """ b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict') if not os.path.exists(b_collection_path): @@ -423,14 +430,16 @@ def publish_collection(collection_path, api, wait): raise AnsibleError("Error when publishing collection (HTTP Code: %d, Message: %s Code: %s)" % (err.code, message, code)) - display.vvv("Collection has been pushed to the Galaxy server %s %s" % (api.name, api.api_server)) import_uri = resp['task'] if wait: - _wait_import(import_uri, api) - display.display("Collection has been successfully published to the Galaxy server") + display.display("Collection has been published to the Galaxy server %s %s" % (api.name, api.api_server)) + _wait_import(import_uri, api, timeout) + display.display("Collection has been successfully published and imported to the Galaxy server %s %s" + % (api.name, api.api_server)) else: - display.display("Collection has been pushed to the Galaxy server, not waiting until import has completed " - "due to --no-wait being set. Import task results can be found at %s" % import_uri) + display.display("Collection has been pushed to the Galaxy server %s %s, not waiting until import has " + "completed due to --no-wait being set. Import task results can be found at %s" + % (api.name, api.api_server, import_uri)) def install_collections(collections, output_path, apis, validate_certs, ignore_errors, no_deps, force, force_deps): @@ -449,18 +458,22 @@ def install_collections(collections, output_path, apis, validate_certs, ignore_e existing_collections = _find_existing_collections(output_path) with _tempdir() as b_temp_path: - dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis, validate_certs, - force, force_deps, no_deps) - - for collection in dependency_map.values(): - try: - collection.install(output_path, b_temp_path) - except AnsibleError as err: - if ignore_errors: - display.warning("Failed to install collection %s but skipping due to --ignore-errors being set. " - "Error: %s" % (to_text(collection), to_text(err))) - else: - raise + display.display("Process install dependency map") + with _display_progress(): + dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis, + validate_certs, force, force_deps, no_deps) + + display.display("Starting collection install process") + with _display_progress(): + for collection in dependency_map.values(): + try: + collection.install(output_path, b_temp_path) + except AnsibleError as err: + if ignore_errors: + display.warning("Failed to install collection %s but skipping due to --ignore-errors being set. " + "Error: %s" % (to_text(collection), to_text(err))) + else: + raise def validate_collection_name(name): @@ -491,6 +504,64 @@ def _tarfile_extract(tar, member): tar_obj.close() +@contextmanager +def _display_progress(): + def progress(display_queue, actual_display): + actual_display.debug("Starting display_progress display thread") + t = threading.current_thread() + + while True: + for c in "|/-\\": + actual_display.display(c + "\b", newline=False) + time.sleep(0.1) + + # Display a message from the main thread + while True: + try: + method, args, kwargs = display_queue.get(block=False, timeout=0.1) + except queue.Empty: + break + else: + func = getattr(actual_display, method) + func(*args, **kwargs) + + if getattr(t, "finish", False): + actual_display.debug("Received end signal for display_progress display thread") + return + + class DisplayThread(object): + + def __init__(self, display_queue): + self.display_queue = display_queue + + def __getattr__(self, attr): + def call_display(*args, **kwargs): + self.display_queue.put((attr, args, kwargs)) + + return call_display + + # Temporary override the global display class with our own which add the calls to a queue for the thread to call. + global display + old_display = display + try: + display_queue = queue.Queue() + display = DisplayThread(display_queue) + t = threading.Thread(target=progress, args=(display_queue, old_display)) + t.daemon = True + t.start() + + try: + yield + finally: + t.finish = True + t.join() + except Exception: + # The exception is re-raised so we can sure the thread is finished and not using the display anymore + raise + finally: + display = old_display + + def _get_galaxy_yml(b_galaxy_yml_path): meta_info = get_collections_galaxy_meta_info() @@ -729,27 +800,35 @@ def _get_mime_data(b_collection_path): return b"\r\n".join(form), content_type -def _wait_import(task_url, api): +def _wait_import(task_url, api, timeout): headers = api._auth_header() - display.vvv('Waiting until galaxy import task %s has completed' % task_url) + state = 'waiting' + resp = None - wait = 2 - while True: - resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers, method='GET', - validate_certs=api.validate_certs)) + display.display("Waiting until Galaxy import task %s has completed" % task_url) + with _display_progress(): + start = time.time() + wait = 2 - if resp.get('finished_at', None): - break - elif wait > 20: - # We try for a maximum of ~60 seconds before giving up in case something has gone wrong on the server end. - raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" - % to_native(task_url)) + while timeout == 0 or (time.time() - start) < timeout: + resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers, + method='GET', validate_certs=api.validate_certs)) + state = resp.get('state', 'waiting') + + if resp.get('finished_at', None): + break + + display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again' + % (state, wait)) + time.sleep(wait) + + # poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds. + wait = min(30, wait * 1.5) - status = resp.get('status', 'waiting') - display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again' % (status, wait)) - time.sleep(wait) - wait *= 1.5 # poor man's exponential backoff algo so we don't flood the Galaxy API. + if state == 'waiting': + raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'" + % to_native(task_url)) for message in resp.get('messages', []): level = message['level'] @@ -760,7 +839,7 @@ def _wait_import(task_url, api): else: display.vvv("Galaxy import message: %s - %s" % (level, message['message'])) - if resp['state'] == 'failed': + if state == 'failed': code = to_native(resp['error'].get('code', 'UNKNOWN')) description = to_native(resp['error'].get('description', "Unknown error, see %s for more details" % task_url)) raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code)) |