diff options
Diffstat (limited to 'src/pip/_vendor/requests/utils.py')
-rw-r--r-- | src/pip/_vendor/requests/utils.py | 439 |
1 files changed, 256 insertions, 183 deletions
diff --git a/src/pip/_vendor/requests/utils.py b/src/pip/_vendor/requests/utils.py index fcb996690..33f394d26 100644 --- a/src/pip/_vendor/requests/utils.py +++ b/src/pip/_vendor/requests/utils.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - """ requests.utils ~~~~~~~~~~~~~~ @@ -20,27 +18,46 @@ import tempfile import warnings import zipfile from collections import OrderedDict -from pip._vendor.urllib3.util import make_headers -from .__version__ import __version__ +from pip._vendor.urllib3.util import make_headers, parse_url + from . import certs +from .__version__ import __version__ + # to_native_string is unused here, but imported here for backwards compatibility -from ._internal_utils import to_native_string +from ._internal_utils import HEADER_VALIDATORS, to_native_string # noqa: F401 +from .compat import ( + Mapping, + basestring, + bytes, + getproxies, + getproxies_environment, + integer_types, +) from .compat import parse_http_list as _parse_list_header from .compat import ( - quote, urlparse, bytes, str, unquote, getproxies, - proxy_bypass, urlunparse, basestring, integer_types, is_py3, - proxy_bypass_environment, getproxies_environment, Mapping) + proxy_bypass, + proxy_bypass_environment, + quote, + str, + unquote, + urlparse, + urlunparse, +) from .cookies import cookiejar_from_dict -from .structures import CaseInsensitiveDict from .exceptions import ( - InvalidURL, InvalidHeader, FileModeWarning, UnrewindableBodyError) + FileModeWarning, + InvalidHeader, + InvalidURL, + UnrewindableBodyError, +) +from .structures import CaseInsensitiveDict -NETRC_FILES = ('.netrc', '_netrc') +NETRC_FILES = (".netrc", "_netrc") DEFAULT_CA_BUNDLE_PATH = certs.where() -DEFAULT_PORTS = {'http': 80, 'https': 443} +DEFAULT_PORTS = {"http": 80, "https": 443} # Ensure that ', ' is used to preserve previous delimiter behavior. DEFAULT_ACCEPT_ENCODING = ", ".join( @@ -48,28 +65,25 @@ DEFAULT_ACCEPT_ENCODING = ", ".join( ) -if sys.platform == 'win32': +if sys.platform == "win32": # provide a proxy_bypass version on Windows without DNS lookups def proxy_bypass_registry(host): try: - if is_py3: - import winreg - else: - import _winreg as winreg + import winreg except ImportError: return False try: - internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER, - r'Software\Microsoft\Windows\CurrentVersion\Internet Settings') + internetSettings = winreg.OpenKey( + winreg.HKEY_CURRENT_USER, + r"Software\Microsoft\Windows\CurrentVersion\Internet Settings", + ) # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it - proxyEnable = int(winreg.QueryValueEx(internetSettings, - 'ProxyEnable')[0]) + proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0]) # ProxyOverride is almost always a string - proxyOverride = winreg.QueryValueEx(internetSettings, - 'ProxyOverride')[0] - except OSError: + proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0] + except (OSError, ValueError): return False if not proxyEnable or not proxyOverride: return False @@ -77,15 +91,15 @@ if sys.platform == 'win32': # make a check value list from the registry entry: replace the # '<local>' string by the localhost entry and the corresponding # canonical entry. - proxyOverride = proxyOverride.split(';') + proxyOverride = proxyOverride.split(";") # now check if we match one of the registry values. for test in proxyOverride: - if test == '<local>': - if '.' not in host: + if test == "<local>": + if "." not in host: return True - test = test.replace(".", r"\.") # mask dots - test = test.replace("*", r".*") # change glob sequence - test = test.replace("?", r".") # change glob char + test = test.replace(".", r"\.") # mask dots + test = test.replace("*", r".*") # change glob sequence + test = test.replace("?", r".") # change glob char if re.match(test, host, re.I): return True return False @@ -105,7 +119,7 @@ if sys.platform == 'win32': def dict_to_sequence(d): """Returns an internal sequence dictionary update.""" - if hasattr(d, 'items'): + if hasattr(d, "items"): d = d.items() return d @@ -115,37 +129,42 @@ def super_len(o): total_length = None current_position = 0 - if hasattr(o, '__len__'): + if hasattr(o, "__len__"): total_length = len(o) - elif hasattr(o, 'len'): + elif hasattr(o, "len"): total_length = o.len - elif hasattr(o, 'fileno'): + elif hasattr(o, "fileno"): try: fileno = o.fileno() - except io.UnsupportedOperation: + except (io.UnsupportedOperation, AttributeError): + # AttributeError is a surprising exception, seeing as how we've just checked + # that `hasattr(o, 'fileno')`. It happens for objects obtained via + # `Tarfile.extractfile()`, per issue 5229. pass else: total_length = os.fstat(fileno).st_size # Having used fstat to determine the file length, we need to # confirm that this file was opened up in binary mode. - if 'b' not in o.mode: - warnings.warn(( - "Requests has determined the content-length for this " - "request using the binary size of the file: however, the " - "file has been opened in text mode (i.e. without the 'b' " - "flag in the mode). This may lead to an incorrect " - "content-length. In Requests 3.0, support will be removed " - "for files in text mode."), - FileModeWarning + if "b" not in o.mode: + warnings.warn( + ( + "Requests has determined the content-length for this " + "request using the binary size of the file: however, the " + "file has been opened in text mode (i.e. without the 'b' " + "flag in the mode). This may lead to an incorrect " + "content-length. In Requests 3.0, support will be removed " + "for files in text mode." + ), + FileModeWarning, ) - if hasattr(o, 'tell'): + if hasattr(o, "tell"): try: current_position = o.tell() - except (OSError, IOError): + except OSError: # This can happen in some weird situations, such as when the file # is actually a special file descriptor like stdin. In this # instance, we don't know what the length is, so set it to zero and @@ -153,8 +172,8 @@ def super_len(o): if total_length is not None: current_position = total_length else: - if hasattr(o, 'seek') and total_length is None: - # StringIO and BytesIO have seek but no useable fileno + if hasattr(o, "seek") and total_length is None: + # StringIO and BytesIO have seek but no usable fileno try: # seek to end of file o.seek(0, 2) @@ -163,7 +182,7 @@ def super_len(o): # seek back to current position to support # partially read file-like objects o.seek(current_position or 0) - except (OSError, IOError): + except OSError: total_length = 0 if total_length is None: @@ -175,14 +194,14 @@ def super_len(o): def get_netrc_auth(url, raise_errors=False): """Returns the Requests tuple auth for a given url from netrc.""" - netrc_file = os.environ.get('NETRC') + netrc_file = os.environ.get("NETRC") if netrc_file is not None: netrc_locations = (netrc_file,) else: - netrc_locations = ('~/{}'.format(f) for f in NETRC_FILES) + netrc_locations = (f"~/{f}" for f in NETRC_FILES) try: - from netrc import netrc, NetrcParseError + from netrc import NetrcParseError, netrc netrc_path = None @@ -207,18 +226,18 @@ def get_netrc_auth(url, raise_errors=False): # Strip port numbers from netloc. This weird `if...encode`` dance is # used for Python 3.2, which doesn't support unicode literals. - splitstr = b':' + splitstr = b":" if isinstance(url, str): - splitstr = splitstr.decode('ascii') + splitstr = splitstr.decode("ascii") host = ri.netloc.split(splitstr)[0] try: _netrc = netrc(netrc_path).authenticators(host) if _netrc: # Return with login / password - login_i = (0 if _netrc[0] else 1) + login_i = 0 if _netrc[0] else 1 return (_netrc[login_i], _netrc[2]) - except (NetrcParseError, IOError): + except (NetrcParseError, OSError): # If there was a parsing error or a permissions issue reading the file, # we'll just skip netrc auth unless explicitly asked to raise errors. if raise_errors: @@ -231,9 +250,8 @@ def get_netrc_auth(url, raise_errors=False): def guess_filename(obj): """Tries to guess the filename of the given object.""" - name = getattr(obj, 'name', None) - if (name and isinstance(name, basestring) and name[0] != '<' and - name[-1] != '>'): + name = getattr(obj, "name", None) + if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">": return os.path.basename(name) @@ -251,7 +269,11 @@ def extract_zipped_paths(path): archive, member = os.path.split(path) while archive and not os.path.exists(archive): archive, prefix = os.path.split(archive) - member = '/'.join([prefix, member]) + if not prefix: + # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split), + # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users + break + member = "/".join([prefix, member]) if not zipfile.is_zipfile(archive): return path @@ -262,7 +284,7 @@ def extract_zipped_paths(path): # we have a valid zip archive and a valid member of that archive tmp = tempfile.gettempdir() - extracted_path = os.path.join(tmp, member.split('/')[-1]) + extracted_path = os.path.join(tmp, member.split("/")[-1]) if not os.path.exists(extracted_path): # use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition with atomic_open(extracted_path) as file_handler: @@ -273,12 +295,11 @@ def extract_zipped_paths(path): @contextlib.contextmanager def atomic_open(filename): """Write a file to the disk in an atomic fashion""" - replacer = os.rename if sys.version_info[0] == 2 else os.replace tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename)) try: - with os.fdopen(tmp_descriptor, 'wb') as tmp_handler: + with os.fdopen(tmp_descriptor, "wb") as tmp_handler: yield tmp_handler - replacer(tmp_name, filename) + os.replace(tmp_name, filename) except BaseException: os.remove(tmp_name) raise @@ -306,7 +327,7 @@ def from_key_val_list(value): return None if isinstance(value, (str, bytes, bool, int)): - raise ValueError('cannot encode objects that are not 2-tuples') + raise ValueError("cannot encode objects that are not 2-tuples") return OrderedDict(value) @@ -332,7 +353,7 @@ def to_key_val_list(value): return None if isinstance(value, (str, bytes, bool, int)): - raise ValueError('cannot encode objects that are not 2-tuples') + raise ValueError("cannot encode objects that are not 2-tuples") if isinstance(value, Mapping): value = value.items() @@ -397,10 +418,10 @@ def parse_dict_header(value): """ result = {} for item in _parse_list_header(value): - if '=' not in item: + if "=" not in item: result[item] = None continue - name, value = item.split('=', 1) + name, value = item.split("=", 1) if value[:1] == value[-1:] == '"': value = unquote_header_value(value[1:-1]) result[name] = value @@ -428,8 +449,8 @@ def unquote_header_value(value, is_filename=False): # replace sequence below on a UNC path has the effect of turning # the leading double slash into a single slash and then # _fix_ie_filename() doesn't work correctly. See #458. - if not is_filename or value[:2] != '\\\\': - return value.replace('\\\\', '\\').replace('\\"', '"') + if not is_filename or value[:2] != "\\\\": + return value.replace("\\\\", "\\").replace('\\"', '"') return value @@ -464,19 +485,24 @@ def get_encodings_from_content(content): :param content: bytestring to extract encodings from. """ - warnings.warn(( - 'In requests 3.0, get_encodings_from_content will be removed. For ' - 'more information, please see the discussion on issue #2266. (This' - ' warning should only appear once.)'), - DeprecationWarning) + warnings.warn( + ( + "In requests 3.0, get_encodings_from_content will be removed. For " + "more information, please see the discussion on issue #2266. (This" + " warning should only appear once.)" + ), + DeprecationWarning, + ) charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I) pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I) xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]') - return (charset_re.findall(content) + - pragma_re.findall(content) + - xml_re.findall(content)) + return ( + charset_re.findall(content) + + pragma_re.findall(content) + + xml_re.findall(content) + ) def _parse_content_type_header(header): @@ -487,7 +513,7 @@ def _parse_content_type_header(header): parameters """ - tokens = header.split(';') + tokens = header.split(";") content_type, params = tokens[0].strip(), tokens[1:] params_dict = {} items_to_strip = "\"' " @@ -499,7 +525,7 @@ def _parse_content_type_header(header): index_of_equals = param.find("=") if index_of_equals != -1: key = param[:index_of_equals].strip(items_to_strip) - value = param[index_of_equals + 1:].strip(items_to_strip) + value = param[index_of_equals + 1 :].strip(items_to_strip) params_dict[key.lower()] = value return content_type, params_dict @@ -511,38 +537,37 @@ def get_encoding_from_headers(headers): :rtype: str """ - content_type = headers.get('content-type') + content_type = headers.get("content-type") if not content_type: return None content_type, params = _parse_content_type_header(content_type) - if 'charset' in params: - return params['charset'].strip("'\"") + if "charset" in params: + return params["charset"].strip("'\"") - if 'text' in content_type: - return 'ISO-8859-1' + if "text" in content_type: + return "ISO-8859-1" - if 'application/json' in content_type: + if "application/json" in content_type: # Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset - return 'utf-8' + return "utf-8" def stream_decode_response_unicode(iterator, r): - """Stream decodes a iterator.""" + """Stream decodes an iterator.""" if r.encoding is None: - for item in iterator: - yield item + yield from iterator return - decoder = codecs.getincrementaldecoder(r.encoding)(errors='replace') + decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace") for chunk in iterator: rv = decoder.decode(chunk) if rv: yield rv - rv = decoder.decode(b'', final=True) + rv = decoder.decode(b"", final=True) if rv: yield rv @@ -553,7 +578,7 @@ def iter_slices(string, slice_length): if slice_length is None or slice_length <= 0: slice_length = len(string) while pos < len(string): - yield string[pos:pos + slice_length] + yield string[pos : pos + slice_length] pos += slice_length @@ -569,11 +594,14 @@ def get_unicode_from_response(r): :rtype: str """ - warnings.warn(( - 'In requests 3.0, get_unicode_from_response will be removed. For ' - 'more information, please see the discussion on issue #2266. (This' - ' warning should only appear once.)'), - DeprecationWarning) + warnings.warn( + ( + "In requests 3.0, get_unicode_from_response will be removed. For " + "more information, please see the discussion on issue #2266. (This" + " warning should only appear once.)" + ), + DeprecationWarning, + ) tried_encodings = [] @@ -588,14 +616,15 @@ def get_unicode_from_response(r): # Fall back: try: - return str(r.content, encoding, errors='replace') + return str(r.content, encoding, errors="replace") except TypeError: return r.content # The unreserved URI characters (RFC 3986) UNRESERVED_SET = frozenset( - "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~") + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~" +) def unquote_unreserved(uri): @@ -604,22 +633,22 @@ def unquote_unreserved(uri): :rtype: str """ - parts = uri.split('%') + parts = uri.split("%") for i in range(1, len(parts)): h = parts[i][0:2] if len(h) == 2 and h.isalnum(): try: c = chr(int(h, 16)) except ValueError: - raise InvalidURL("Invalid percent-escape sequence: '%s'" % h) + raise InvalidURL(f"Invalid percent-escape sequence: '{h}'") if c in UNRESERVED_SET: parts[i] = c + parts[i][2:] else: - parts[i] = '%' + parts[i] + parts[i] = f"%{parts[i]}" else: - parts[i] = '%' + parts[i] - return ''.join(parts) + parts[i] = f"%{parts[i]}" + return "".join(parts) def requote_uri(uri): @@ -652,10 +681,10 @@ def address_in_network(ip, net): :rtype: bool """ - ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0] - netaddr, bits = net.split('/') - netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0] - network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask + ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0] + netaddr, bits = net.split("/") + netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0] + network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask return (ipaddr & netmask) == (network & netmask) @@ -666,8 +695,8 @@ def dotted_netmask(mask): :rtype: str """ - bits = 0xffffffff ^ (1 << 32 - mask) - 1 - return socket.inet_ntoa(struct.pack('>I', bits)) + bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1 + return socket.inet_ntoa(struct.pack(">I", bits)) def is_ipv4_address(string_ip): @@ -676,7 +705,7 @@ def is_ipv4_address(string_ip): """ try: socket.inet_aton(string_ip) - except socket.error: + except OSError: return False return True @@ -687,9 +716,9 @@ def is_valid_cidr(string_network): :rtype: bool """ - if string_network.count('/') == 1: + if string_network.count("/") == 1: try: - mask = int(string_network.split('/')[1]) + mask = int(string_network.split("/")[1]) except ValueError: return False @@ -697,8 +726,8 @@ def is_valid_cidr(string_network): return False try: - socket.inet_aton(string_network.split('/')[0]) - except socket.error: + socket.inet_aton(string_network.split("/")[0]) + except OSError: return False else: return False @@ -735,13 +764,14 @@ def should_bypass_proxies(url, no_proxy): """ # Prioritize lowercase environment variables over uppercase # to keep a consistent behaviour with other http projects (curl, wget). - get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper()) + def get_proxy(key): + return os.environ.get(key) or os.environ.get(key.upper()) # First check whether no_proxy is defined. If it is, check that the URL # we're getting isn't in the no_proxy list. no_proxy_arg = no_proxy if no_proxy is None: - no_proxy = get_proxy('no_proxy') + no_proxy = get_proxy("no_proxy") parsed = urlparse(url) if parsed.hostname is None: @@ -751,9 +781,7 @@ def should_bypass_proxies(url, no_proxy): if no_proxy: # We need to check whether we match here. We need to see if we match # the end of the hostname, both with and without the port. - no_proxy = ( - host for host in no_proxy.replace(' ', '').split(',') if host - ) + no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host) if is_ipv4_address(parsed.hostname): for proxy_ip in no_proxy: @@ -767,7 +795,7 @@ def should_bypass_proxies(url, no_proxy): else: host_with_port = parsed.hostname if parsed.port: - host_with_port += ':{}'.format(parsed.port) + host_with_port += f":{parsed.port}" for host in no_proxy: if parsed.hostname.endswith(host) or host_with_port.endswith(host): @@ -775,7 +803,7 @@ def should_bypass_proxies(url, no_proxy): # to apply the proxies on this URL. return True - with set_environ('no_proxy', no_proxy_arg): + with set_environ("no_proxy", no_proxy_arg): # parsed.hostname can be `None` in cases such as a file URI. try: bypass = proxy_bypass(parsed.hostname) @@ -809,13 +837,13 @@ def select_proxy(url, proxies): proxies = proxies or {} urlparts = urlparse(url) if urlparts.hostname is None: - return proxies.get(urlparts.scheme, proxies.get('all')) + return proxies.get(urlparts.scheme, proxies.get("all")) proxy_keys = [ - urlparts.scheme + '://' + urlparts.hostname, + urlparts.scheme + "://" + urlparts.hostname, urlparts.scheme, - 'all://' + urlparts.hostname, - 'all', + "all://" + urlparts.hostname, + "all", ] proxy = None for proxy_key in proxy_keys: @@ -826,25 +854,54 @@ def select_proxy(url, proxies): return proxy +def resolve_proxies(request, proxies, trust_env=True): + """This method takes proxy information from a request and configuration + input to resolve a mapping of target proxies. This will consider settings + such a NO_PROXY to strip proxy configurations. + + :param request: Request or PreparedRequest + :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs + :param trust_env: Boolean declaring whether to trust environment configs + + :rtype: dict + """ + proxies = proxies if proxies is not None else {} + url = request.url + scheme = urlparse(url).scheme + no_proxy = proxies.get("no_proxy") + new_proxies = proxies.copy() + + if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy): + environ_proxies = get_environ_proxies(url, no_proxy=no_proxy) + + proxy = environ_proxies.get(scheme, environ_proxies.get("all")) + + if proxy: + new_proxies.setdefault(scheme, proxy) + return new_proxies + + def default_user_agent(name="python-requests"): """ Return a string representing the default user agent. :rtype: str """ - return '%s/%s' % (name, __version__) + return f"{name}/{__version__}" def default_headers(): """ :rtype: requests.structures.CaseInsensitiveDict """ - return CaseInsensitiveDict({ - 'User-Agent': default_user_agent(), - 'Accept-Encoding': DEFAULT_ACCEPT_ENCODING, - 'Accept': '*/*', - 'Connection': 'keep-alive', - }) + return CaseInsensitiveDict( + { + "User-Agent": default_user_agent(), + "Accept-Encoding": DEFAULT_ACCEPT_ENCODING, + "Accept": "*/*", + "Connection": "keep-alive", + } + ) def parse_header_links(value): @@ -857,23 +914,23 @@ def parse_header_links(value): links = [] - replace_chars = ' \'"' + replace_chars = " '\"" value = value.strip(replace_chars) if not value: return links - for val in re.split(', *<', value): + for val in re.split(", *<", value): try: - url, params = val.split(';', 1) + url, params = val.split(";", 1) except ValueError: - url, params = val, '' + url, params = val, "" - link = {'url': url.strip('<> \'"')} + link = {"url": url.strip("<> '\"")} - for param in params.split(';'): + for param in params.split(";"): try: - key, value = param.split('=') + key, value = param.split("=") except ValueError: break @@ -885,7 +942,7 @@ def parse_header_links(value): # Null bytes; no need to recreate these on each call to guess_json_utf -_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3 +_null = "\x00".encode("ascii") # encoding to ASCII for Python 3 _null2 = _null * 2 _null3 = _null * 3 @@ -899,25 +956,25 @@ def guess_json_utf(data): # determine the encoding. Also detect a BOM, if present. sample = data[:4] if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): - return 'utf-32' # BOM included + return "utf-32" # BOM included if sample[:3] == codecs.BOM_UTF8: - return 'utf-8-sig' # BOM included, MS style (discouraged) + return "utf-8-sig" # BOM included, MS style (discouraged) if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE): - return 'utf-16' # BOM included + return "utf-16" # BOM included nullcount = sample.count(_null) if nullcount == 0: - return 'utf-8' + return "utf-8" if nullcount == 2: - if sample[::2] == _null2: # 1st and 3rd are null - return 'utf-16-be' + if sample[::2] == _null2: # 1st and 3rd are null + return "utf-16-be" if sample[1::2] == _null2: # 2nd and 4th are null - return 'utf-16-le' + return "utf-16-le" # Did not detect 2 valid UTF-16 ascii-range characters if nullcount == 3: if sample[:3] == _null3: - return 'utf-32-be' + return "utf-32-be" if sample[1:] == _null3: - return 'utf-32-le' + return "utf-32-le" # Did not detect a valid UTF-32 ascii-range character return None @@ -928,15 +985,27 @@ def prepend_scheme_if_needed(url, new_scheme): :rtype: str """ - scheme, netloc, path, params, query, fragment = urlparse(url, new_scheme) - - # urlparse is a finicky beast, and sometimes decides that there isn't a - # netloc present. Assume that it's being over-cautious, and switch netloc - # and path if urlparse decided there was no netloc. + parsed = parse_url(url) + scheme, auth, host, port, path, query, fragment = parsed + + # A defect in urlparse determines that there isn't a netloc present in some + # urls. We previously assumed parsing was overly cautious, and swapped the + # netloc and path. Due to a lack of tests on the original defect, this is + # maintained with parse_url for backwards compatibility. + netloc = parsed.netloc if not netloc: netloc, path = path, netloc - return urlunparse((scheme, netloc, path, params, query, fragment)) + if auth: + # parse_url doesn't provide the netloc with auth + # so we'll add it ourselves. + netloc = "@".join([auth, netloc]) + if scheme is None: + scheme = new_scheme + if path is None: + path = "" + + return urlunparse((scheme, netloc, path, "", query, fragment)) def get_auth_from_url(url): @@ -950,35 +1019,36 @@ def get_auth_from_url(url): try: auth = (unquote(parsed.username), unquote(parsed.password)) except (AttributeError, TypeError): - auth = ('', '') + auth = ("", "") return auth -# Moved outside of function to avoid recompile every call -_CLEAN_HEADER_REGEX_BYTE = re.compile(b'^\\S[^\\r\\n]*$|^$') -_CLEAN_HEADER_REGEX_STR = re.compile(r'^\S[^\r\n]*$|^$') - - def check_header_validity(header): - """Verifies that header value is a string which doesn't contain - leading whitespace or return characters. This prevents unintended - header injection. + """Verifies that header parts don't contain leading whitespace + reserved characters, or return characters. :param header: tuple, in the format (name, value). """ name, value = header - if isinstance(value, bytes): - pat = _CLEAN_HEADER_REGEX_BYTE - else: - pat = _CLEAN_HEADER_REGEX_STR - try: - if not pat.match(value): - raise InvalidHeader("Invalid return character or leading space in header: %s" % name) - except TypeError: - raise InvalidHeader("Value for header {%s: %s} must be of type str or " - "bytes, not %s" % (name, value, type(value))) + for part in header: + if type(part) not in HEADER_VALIDATORS: + raise InvalidHeader( + f"Header part ({part!r}) from {{{name!r}: {value!r}}} must be " + f"of type str or bytes, not {type(part)}" + ) + + _validate_header_part(name, "name", HEADER_VALIDATORS[type(name)][0]) + _validate_header_part(value, "value", HEADER_VALIDATORS[type(value)][1]) + + +def _validate_header_part(header_part, header_kind, validator): + if not validator.match(header_part): + raise InvalidHeader( + f"Invalid leading whitespace, reserved character(s), or return" + f"character(s) in header {header_kind}: {header_part!r}" + ) def urldefragauth(url): @@ -993,21 +1063,24 @@ def urldefragauth(url): if not netloc: netloc, path = path, netloc - netloc = netloc.rsplit('@', 1)[-1] + netloc = netloc.rsplit("@", 1)[-1] - return urlunparse((scheme, netloc, path, params, query, '')) + return urlunparse((scheme, netloc, path, params, query, "")) def rewind_body(prepared_request): """Move file pointer back to its recorded starting position so it can be read again on redirect. """ - body_seek = getattr(prepared_request.body, 'seek', None) - if body_seek is not None and isinstance(prepared_request._body_position, integer_types): + body_seek = getattr(prepared_request.body, "seek", None) + if body_seek is not None and isinstance( + prepared_request._body_position, integer_types + ): try: body_seek(prepared_request._body_position) - except (IOError, OSError): - raise UnrewindableBodyError("An error occurred when rewinding request " - "body for redirect.") + except OSError: + raise UnrewindableBodyError( + "An error occurred when rewinding request body for redirect." + ) else: raise UnrewindableBodyError("Unable to rewind request body for redirect.") |