summaryrefslogtreecommitdiff
path: root/cloudinit/util.py
diff options
context:
space:
mode:
Diffstat (limited to 'cloudinit/util.py')
-rw-r--r--cloudinit/util.py110
1 files changed, 96 insertions, 14 deletions
diff --git a/cloudinit/util.py b/cloudinit/util.py
index 8ba3e2b6..fc777b82 100644
--- a/cloudinit/util.py
+++ b/cloudinit/util.py
@@ -11,6 +11,7 @@
import contextlib
import copy as obj_copy
import email
+import functools
import glob
import grp
import gzip
@@ -33,16 +34,18 @@ import sys
import time
from base64 import b64decode, b64encode
from collections import deque, namedtuple
+from contextlib import suppress
from errno import EACCES, ENOENT
from functools import lru_cache, total_ordering
from pathlib import Path
-from typing import Callable, Deque, Dict, List, TypeVar
+from typing import Callable, Deque, Dict, List, Optional, TypeVar
from urllib import parse
from cloudinit import features, importer
from cloudinit import log as logging
from cloudinit import (
mergers,
+ net,
safeyaml,
subp,
temp_utils,
@@ -1232,8 +1235,8 @@ def get_fqdn_from_hosts(hostname, filename="/etc/hosts"):
return fqdn
-def is_resolvable(name):
- """determine if a url is resolvable, return a boolean
+def is_resolvable(url) -> bool:
+ """determine if a url's network address is resolvable, return a boolean
This also attempts to be resilent against dns redirection.
Note, that normal nsswitch resolution is used here. So in order
@@ -1245,6 +1248,8 @@ def is_resolvable(name):
be resolved inside the search list.
"""
global _DNS_REDIRECT_IP
+ parsed_url = parse.urlparse(url)
+ name = parsed_url.hostname
if _DNS_REDIRECT_IP is None:
badips = set()
badnames = (
@@ -1252,7 +1257,7 @@ def is_resolvable(name):
"example.invalid.",
"__cloud_init_expected_not_found__",
)
- badresults = {}
+ badresults: dict = {}
for iname in badnames:
try:
result = socket.getaddrinfo(
@@ -1269,12 +1274,14 @@ def is_resolvable(name):
LOG.debug("detected dns redirection: %s", badresults)
try:
+ # ip addresses need no resolution
+ with suppress(ValueError):
+ if net.is_ip_address(parsed_url.netloc.strip("[]")):
+ return True
result = socket.getaddrinfo(name, None)
# check first result's sockaddr field
addr = result[0][4][0]
- if addr in _DNS_REDIRECT_IP:
- return False
- return True
+ return addr not in _DNS_REDIRECT_IP
except (socket.gaierror, socket.error):
return False
@@ -1297,7 +1304,7 @@ def is_resolvable_url(url):
logfunc=LOG.debug,
msg="Resolving URL: " + url,
func=is_resolvable,
- args=(parse.urlparse(url).hostname,),
+ args=(url,),
)
@@ -1516,12 +1523,6 @@ def blkid(devs=None, disable_cache=False):
return ret
-def peek_file(fname, max_bytes):
- LOG.debug("Peeking at %s (max_bytes=%s)", fname, max_bytes)
- with open(fname, "rb") as ifh:
- return ifh.read(max_bytes)
-
-
def uniq_list(in_list):
out_list = []
for i in in_list:
@@ -3097,3 +3098,84 @@ class Version(namedtuple("Version", ["major", "minor", "patch", "rev"])):
if self.rev > other.rev:
return 1
return -1
+
+
+def deprecate(
+ *,
+ deprecated: str,
+ deprecated_version: str,
+ extra_message: Optional[str] = None,
+ schedule: int = 5,
+):
+ """Mark a "thing" as deprecated. Deduplicated deprecations are
+ logged.
+
+ @param deprecated: Noun to be deprecated. Write this as the start
+ of a sentence, with no period. Version and extra message will
+ be appended.
+ @param deprecated_version: The version in which the thing was
+ deprecated
+ @param extra_message: A remedy for the user's problem. A good
+ message will be actionable and specific (i.e., don't use a
+ generic "Use updated key." if the user used a deprecated key).
+ End the string with a period.
+ @param schedule: Manually set the deprecation schedule. Defaults to
+ 5 years. Leave a comment explaining your reason for deviation if
+ setting this value.
+
+ Note: uses keyword-only arguments to improve legibility
+ """
+ if not hasattr(deprecate, "_log"):
+ deprecate._log = set() # type: ignore
+ message = extra_message or ""
+ dedup = hash(deprecated + message + deprecated_version + str(schedule))
+ version = Version.from_str(deprecated_version)
+ version_removed = Version(version.major + schedule, version.minor)
+ if dedup not in deprecate._log: # type: ignore
+ deprecate._log.add(dedup) # type: ignore
+ deprecate_msg = (
+ f"{deprecated} is deprecated in "
+ f"{deprecated_version} and scheduled to be removed in "
+ f"{version_removed}. {message}"
+ ).rstrip()
+ if hasattr(LOG, "deprecated"):
+ LOG.deprecated(deprecate_msg)
+ else:
+ LOG.warning(deprecate_msg)
+
+
+def deprecate_call(
+ *, deprecated_version: str, extra_message: str, schedule: int = 5
+):
+ """Mark a "thing" as deprecated. Deduplicated deprecations are
+ logged.
+
+ @param deprecated_version: The version in which the thing was
+ deprecated
+ @param extra_message: A remedy for the user's problem. A good
+ message will be actionable and specific (i.e., don't use a
+ generic "Use updated key." if the user used a deprecated key).
+ End the string with a period.
+ @param schedule: Manually set the deprecation schedule. Defaults to
+ 5 years. Leave a comment explaining your reason for deviation if
+ setting this value.
+
+ Note: uses keyword-only arguments to improve legibility
+ """
+
+ def wrapper(func):
+ @functools.wraps(func)
+ def decorator(*args, **kwargs):
+ # don't log message multiple times
+ out = func(*args, **kwargs)
+ deprecate(
+ deprecated_version=deprecated_version,
+ deprecated=func.__name__,
+ extra_message=extra_message,
+ schedule=schedule,
+ )
+ return out
+
+ return decorator
+
+ return wrapper