summaryrefslogtreecommitdiff
path: root/lib/ansible/galaxy
diff options
context:
space:
mode:
authorJordan Borean <jborean93@gmail.com>2019-08-23 06:27:28 +1000
committerGitHub <noreply@github.com>2019-08-23 06:27:28 +1000
commite04b2a9697512bc64d2613033274e67fbfbd26c7 (patch)
tree7eea76e5b4e5994015f804534fad717e0e4ffaa9 /lib/ansible/galaxy
parentc81a1057e10ca433a99af31158ec5a0c597f6d2d (diff)
downloadansible-e04b2a9697512bc64d2613033274e67fbfbd26c7.tar.gz
ansible-galaxy - Add timeout and progress indicator for publish and install (#60660)
* ansible-galaxy - Add timeout and progress indicator for publish * add progress indicator to install phase as well
Diffstat (limited to 'lib/ansible/galaxy')
-rw-r--r--lib/ansible/galaxy/collection.py149
1 files changed, 114 insertions, 35 deletions
diff --git a/lib/ansible/galaxy/collection.py b/lib/ansible/galaxy/collection.py
index b999fa303d..911441e38b 100644
--- a/lib/ansible/galaxy/collection.py
+++ b/lib/ansible/galaxy/collection.py
@@ -11,6 +11,7 @@ import os
import shutil
import tarfile
import tempfile
+import threading
import time
import uuid
import yaml
@@ -21,6 +22,11 @@ from hashlib import sha256
from io import BytesIO
from yaml.error import YAMLError
+try:
+ import queue
+except ImportError:
+ import Queue as queue # Python 2
+
import ansible.constants as C
from ansible.errors import AnsibleError
from ansible.galaxy import get_collections_galaxy_meta_info
@@ -383,13 +389,14 @@ def build_collection(collection_path, output_path, force):
_build_collection_tar(b_collection_path, b_collection_output, collection_manifest, file_manifest)
-def publish_collection(collection_path, api, wait):
+def publish_collection(collection_path, api, wait, timeout):
"""
Publish an Ansible collection tarball into an Ansible Galaxy server.
:param collection_path: The path to the collection tarball to publish.
:param api: A GalaxyAPI to publish the collection to.
:param wait: Whether to wait until the import process is complete.
+ :param timeout: The time in seconds to wait for the import process to finish, 0 is indefinite.
"""
b_collection_path = to_bytes(collection_path, errors='surrogate_or_strict')
if not os.path.exists(b_collection_path):
@@ -423,14 +430,16 @@ def publish_collection(collection_path, api, wait):
raise AnsibleError("Error when publishing collection (HTTP Code: %d, Message: %s Code: %s)"
% (err.code, message, code))
- display.vvv("Collection has been pushed to the Galaxy server %s %s" % (api.name, api.api_server))
import_uri = resp['task']
if wait:
- _wait_import(import_uri, api)
- display.display("Collection has been successfully published to the Galaxy server")
+ display.display("Collection has been published to the Galaxy server %s %s" % (api.name, api.api_server))
+ _wait_import(import_uri, api, timeout)
+ display.display("Collection has been successfully published and imported to the Galaxy server %s %s"
+ % (api.name, api.api_server))
else:
- display.display("Collection has been pushed to the Galaxy server, not waiting until import has completed "
- "due to --no-wait being set. Import task results can be found at %s" % import_uri)
+ display.display("Collection has been pushed to the Galaxy server %s %s, not waiting until import has "
+ "completed due to --no-wait being set. Import task results can be found at %s"
+ % (api.name, api.api_server, import_uri))
def install_collections(collections, output_path, apis, validate_certs, ignore_errors, no_deps, force, force_deps):
@@ -449,18 +458,22 @@ def install_collections(collections, output_path, apis, validate_certs, ignore_e
existing_collections = _find_existing_collections(output_path)
with _tempdir() as b_temp_path:
- dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis, validate_certs,
- force, force_deps, no_deps)
-
- for collection in dependency_map.values():
- try:
- collection.install(output_path, b_temp_path)
- except AnsibleError as err:
- if ignore_errors:
- display.warning("Failed to install collection %s but skipping due to --ignore-errors being set. "
- "Error: %s" % (to_text(collection), to_text(err)))
- else:
- raise
+ display.display("Process install dependency map")
+ with _display_progress():
+ dependency_map = _build_dependency_map(collections, existing_collections, b_temp_path, apis,
+ validate_certs, force, force_deps, no_deps)
+
+ display.display("Starting collection install process")
+ with _display_progress():
+ for collection in dependency_map.values():
+ try:
+ collection.install(output_path, b_temp_path)
+ except AnsibleError as err:
+ if ignore_errors:
+ display.warning("Failed to install collection %s but skipping due to --ignore-errors being set. "
+ "Error: %s" % (to_text(collection), to_text(err)))
+ else:
+ raise
def validate_collection_name(name):
@@ -491,6 +504,64 @@ def _tarfile_extract(tar, member):
tar_obj.close()
+@contextmanager
+def _display_progress():
+ def progress(display_queue, actual_display):
+ actual_display.debug("Starting display_progress display thread")
+ t = threading.current_thread()
+
+ while True:
+ for c in "|/-\\":
+ actual_display.display(c + "\b", newline=False)
+ time.sleep(0.1)
+
+ # Display a message from the main thread
+ while True:
+ try:
+ method, args, kwargs = display_queue.get(block=False, timeout=0.1)
+ except queue.Empty:
+ break
+ else:
+ func = getattr(actual_display, method)
+ func(*args, **kwargs)
+
+ if getattr(t, "finish", False):
+ actual_display.debug("Received end signal for display_progress display thread")
+ return
+
+ class DisplayThread(object):
+
+ def __init__(self, display_queue):
+ self.display_queue = display_queue
+
+ def __getattr__(self, attr):
+ def call_display(*args, **kwargs):
+ self.display_queue.put((attr, args, kwargs))
+
+ return call_display
+
+ # Temporary override the global display class with our own which add the calls to a queue for the thread to call.
+ global display
+ old_display = display
+ try:
+ display_queue = queue.Queue()
+ display = DisplayThread(display_queue)
+ t = threading.Thread(target=progress, args=(display_queue, old_display))
+ t.daemon = True
+ t.start()
+
+ try:
+ yield
+ finally:
+ t.finish = True
+ t.join()
+ except Exception:
+ # The exception is re-raised so we can sure the thread is finished and not using the display anymore
+ raise
+ finally:
+ display = old_display
+
+
def _get_galaxy_yml(b_galaxy_yml_path):
meta_info = get_collections_galaxy_meta_info()
@@ -729,27 +800,35 @@ def _get_mime_data(b_collection_path):
return b"\r\n".join(form), content_type
-def _wait_import(task_url, api):
+def _wait_import(task_url, api, timeout):
headers = api._auth_header()
- display.vvv('Waiting until galaxy import task %s has completed' % task_url)
+ state = 'waiting'
+ resp = None
- wait = 2
- while True:
- resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers, method='GET',
- validate_certs=api.validate_certs))
+ display.display("Waiting until Galaxy import task %s has completed" % task_url)
+ with _display_progress():
+ start = time.time()
+ wait = 2
- if resp.get('finished_at', None):
- break
- elif wait > 20:
- # We try for a maximum of ~60 seconds before giving up in case something has gone wrong on the server end.
- raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
- % to_native(task_url))
+ while timeout == 0 or (time.time() - start) < timeout:
+ resp = json.load(open_url(to_native(task_url, errors='surrogate_or_strict'), headers=headers,
+ method='GET', validate_certs=api.validate_certs))
+ state = resp.get('state', 'waiting')
+
+ if resp.get('finished_at', None):
+ break
+
+ display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again'
+ % (state, wait))
+ time.sleep(wait)
+
+ # poor man's exponential backoff algo so we don't flood the Galaxy API, cap at 30 seconds.
+ wait = min(30, wait * 1.5)
- status = resp.get('status', 'waiting')
- display.vvv('Galaxy import process has a status of %s, wait %d seconds before trying again' % (status, wait))
- time.sleep(wait)
- wait *= 1.5 # poor man's exponential backoff algo so we don't flood the Galaxy API.
+ if state == 'waiting':
+ raise AnsibleError("Timeout while waiting for the Galaxy import process to finish, check progress at '%s'"
+ % to_native(task_url))
for message in resp.get('messages', []):
level = message['level']
@@ -760,7 +839,7 @@ def _wait_import(task_url, api):
else:
display.vvv("Galaxy import message: %s - %s" % (level, message['message']))
- if resp['state'] == 'failed':
+ if state == 'failed':
code = to_native(resp['error'].get('code', 'UNKNOWN'))
description = to_native(resp['error'].get('description', "Unknown error, see %s for more details" % task_url))
raise AnsibleError("Galaxy import process failed: %s (Code: %s)" % (description, code))