diff options
Diffstat (limited to 'cloudinit/util.py')
-rw-r--r-- | cloudinit/util.py | 110 |
1 files changed, 96 insertions, 14 deletions
diff --git a/cloudinit/util.py b/cloudinit/util.py index 8ba3e2b6..fc777b82 100644 --- a/cloudinit/util.py +++ b/cloudinit/util.py @@ -11,6 +11,7 @@ import contextlib import copy as obj_copy import email +import functools import glob import grp import gzip @@ -33,16 +34,18 @@ import sys import time from base64 import b64decode, b64encode from collections import deque, namedtuple +from contextlib import suppress from errno import EACCES, ENOENT from functools import lru_cache, total_ordering from pathlib import Path -from typing import Callable, Deque, Dict, List, TypeVar +from typing import Callable, Deque, Dict, List, Optional, TypeVar from urllib import parse from cloudinit import features, importer from cloudinit import log as logging from cloudinit import ( mergers, + net, safeyaml, subp, temp_utils, @@ -1232,8 +1235,8 @@ def get_fqdn_from_hosts(hostname, filename="/etc/hosts"): return fqdn -def is_resolvable(name): - """determine if a url is resolvable, return a boolean +def is_resolvable(url) -> bool: + """determine if a url's network address is resolvable, return a boolean This also attempts to be resilent against dns redirection. Note, that normal nsswitch resolution is used here. So in order @@ -1245,6 +1248,8 @@ def is_resolvable(name): be resolved inside the search list. """ global _DNS_REDIRECT_IP + parsed_url = parse.urlparse(url) + name = parsed_url.hostname if _DNS_REDIRECT_IP is None: badips = set() badnames = ( @@ -1252,7 +1257,7 @@ def is_resolvable(name): "example.invalid.", "__cloud_init_expected_not_found__", ) - badresults = {} + badresults: dict = {} for iname in badnames: try: result = socket.getaddrinfo( @@ -1269,12 +1274,14 @@ def is_resolvable(name): LOG.debug("detected dns redirection: %s", badresults) try: + # ip addresses need no resolution + with suppress(ValueError): + if net.is_ip_address(parsed_url.netloc.strip("[]")): + return True result = socket.getaddrinfo(name, None) # check first result's sockaddr field addr = result[0][4][0] - if addr in _DNS_REDIRECT_IP: - return False - return True + return addr not in _DNS_REDIRECT_IP except (socket.gaierror, socket.error): return False @@ -1297,7 +1304,7 @@ def is_resolvable_url(url): logfunc=LOG.debug, msg="Resolving URL: " + url, func=is_resolvable, - args=(parse.urlparse(url).hostname,), + args=(url,), ) @@ -1516,12 +1523,6 @@ def blkid(devs=None, disable_cache=False): return ret -def peek_file(fname, max_bytes): - LOG.debug("Peeking at %s (max_bytes=%s)", fname, max_bytes) - with open(fname, "rb") as ifh: - return ifh.read(max_bytes) - - def uniq_list(in_list): out_list = [] for i in in_list: @@ -3097,3 +3098,84 @@ class Version(namedtuple("Version", ["major", "minor", "patch", "rev"])): if self.rev > other.rev: return 1 return -1 + + +def deprecate( + *, + deprecated: str, + deprecated_version: str, + extra_message: Optional[str] = None, + schedule: int = 5, +): + """Mark a "thing" as deprecated. Deduplicated deprecations are + logged. + + @param deprecated: Noun to be deprecated. Write this as the start + of a sentence, with no period. Version and extra message will + be appended. + @param deprecated_version: The version in which the thing was + deprecated + @param extra_message: A remedy for the user's problem. A good + message will be actionable and specific (i.e., don't use a + generic "Use updated key." if the user used a deprecated key). + End the string with a period. + @param schedule: Manually set the deprecation schedule. Defaults to + 5 years. Leave a comment explaining your reason for deviation if + setting this value. + + Note: uses keyword-only arguments to improve legibility + """ + if not hasattr(deprecate, "_log"): + deprecate._log = set() # type: ignore + message = extra_message or "" + dedup = hash(deprecated + message + deprecated_version + str(schedule)) + version = Version.from_str(deprecated_version) + version_removed = Version(version.major + schedule, version.minor) + if dedup not in deprecate._log: # type: ignore + deprecate._log.add(dedup) # type: ignore + deprecate_msg = ( + f"{deprecated} is deprecated in " + f"{deprecated_version} and scheduled to be removed in " + f"{version_removed}. {message}" + ).rstrip() + if hasattr(LOG, "deprecated"): + LOG.deprecated(deprecate_msg) + else: + LOG.warning(deprecate_msg) + + +def deprecate_call( + *, deprecated_version: str, extra_message: str, schedule: int = 5 +): + """Mark a "thing" as deprecated. Deduplicated deprecations are + logged. + + @param deprecated_version: The version in which the thing was + deprecated + @param extra_message: A remedy for the user's problem. A good + message will be actionable and specific (i.e., don't use a + generic "Use updated key." if the user used a deprecated key). + End the string with a period. + @param schedule: Manually set the deprecation schedule. Defaults to + 5 years. Leave a comment explaining your reason for deviation if + setting this value. + + Note: uses keyword-only arguments to improve legibility + """ + + def wrapper(func): + @functools.wraps(func) + def decorator(*args, **kwargs): + # don't log message multiple times + out = func(*args, **kwargs) + deprecate( + deprecated_version=deprecated_version, + deprecated=func.__name__, + extra_message=extra_message, + schedule=schedule, + ) + return out + + return decorator + + return wrapper |