summaryrefslogtreecommitdiff
path: root/src/pip/_vendor/requests/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/pip/_vendor/requests/utils.py')
-rw-r--r--src/pip/_vendor/requests/utils.py439
1 files changed, 256 insertions, 183 deletions
diff --git a/src/pip/_vendor/requests/utils.py b/src/pip/_vendor/requests/utils.py
index fcb996690..33f394d26 100644
--- a/src/pip/_vendor/requests/utils.py
+++ b/src/pip/_vendor/requests/utils.py
@@ -1,5 +1,3 @@
-# -*- coding: utf-8 -*-
-
"""
requests.utils
~~~~~~~~~~~~~~
@@ -20,27 +18,46 @@ import tempfile
import warnings
import zipfile
from collections import OrderedDict
-from pip._vendor.urllib3.util import make_headers
-from .__version__ import __version__
+from pip._vendor.urllib3.util import make_headers, parse_url
+
from . import certs
+from .__version__ import __version__
+
# to_native_string is unused here, but imported here for backwards compatibility
-from ._internal_utils import to_native_string
+from ._internal_utils import HEADER_VALIDATORS, to_native_string # noqa: F401
+from .compat import (
+ Mapping,
+ basestring,
+ bytes,
+ getproxies,
+ getproxies_environment,
+ integer_types,
+)
from .compat import parse_http_list as _parse_list_header
from .compat import (
- quote, urlparse, bytes, str, unquote, getproxies,
- proxy_bypass, urlunparse, basestring, integer_types, is_py3,
- proxy_bypass_environment, getproxies_environment, Mapping)
+ proxy_bypass,
+ proxy_bypass_environment,
+ quote,
+ str,
+ unquote,
+ urlparse,
+ urlunparse,
+)
from .cookies import cookiejar_from_dict
-from .structures import CaseInsensitiveDict
from .exceptions import (
- InvalidURL, InvalidHeader, FileModeWarning, UnrewindableBodyError)
+ FileModeWarning,
+ InvalidHeader,
+ InvalidURL,
+ UnrewindableBodyError,
+)
+from .structures import CaseInsensitiveDict
-NETRC_FILES = ('.netrc', '_netrc')
+NETRC_FILES = (".netrc", "_netrc")
DEFAULT_CA_BUNDLE_PATH = certs.where()
-DEFAULT_PORTS = {'http': 80, 'https': 443}
+DEFAULT_PORTS = {"http": 80, "https": 443}
# Ensure that ', ' is used to preserve previous delimiter behavior.
DEFAULT_ACCEPT_ENCODING = ", ".join(
@@ -48,28 +65,25 @@ DEFAULT_ACCEPT_ENCODING = ", ".join(
)
-if sys.platform == 'win32':
+if sys.platform == "win32":
# provide a proxy_bypass version on Windows without DNS lookups
def proxy_bypass_registry(host):
try:
- if is_py3:
- import winreg
- else:
- import _winreg as winreg
+ import winreg
except ImportError:
return False
try:
- internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
- r'Software\Microsoft\Windows\CurrentVersion\Internet Settings')
+ internetSettings = winreg.OpenKey(
+ winreg.HKEY_CURRENT_USER,
+ r"Software\Microsoft\Windows\CurrentVersion\Internet Settings",
+ )
# ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
- proxyEnable = int(winreg.QueryValueEx(internetSettings,
- 'ProxyEnable')[0])
+ proxyEnable = int(winreg.QueryValueEx(internetSettings, "ProxyEnable")[0])
# ProxyOverride is almost always a string
- proxyOverride = winreg.QueryValueEx(internetSettings,
- 'ProxyOverride')[0]
- except OSError:
+ proxyOverride = winreg.QueryValueEx(internetSettings, "ProxyOverride")[0]
+ except (OSError, ValueError):
return False
if not proxyEnable or not proxyOverride:
return False
@@ -77,15 +91,15 @@ if sys.platform == 'win32':
# make a check value list from the registry entry: replace the
# '<local>' string by the localhost entry and the corresponding
# canonical entry.
- proxyOverride = proxyOverride.split(';')
+ proxyOverride = proxyOverride.split(";")
# now check if we match one of the registry values.
for test in proxyOverride:
- if test == '<local>':
- if '.' not in host:
+ if test == "<local>":
+ if "." not in host:
return True
- test = test.replace(".", r"\.") # mask dots
- test = test.replace("*", r".*") # change glob sequence
- test = test.replace("?", r".") # change glob char
+ test = test.replace(".", r"\.") # mask dots
+ test = test.replace("*", r".*") # change glob sequence
+ test = test.replace("?", r".") # change glob char
if re.match(test, host, re.I):
return True
return False
@@ -105,7 +119,7 @@ if sys.platform == 'win32':
def dict_to_sequence(d):
"""Returns an internal sequence dictionary update."""
- if hasattr(d, 'items'):
+ if hasattr(d, "items"):
d = d.items()
return d
@@ -115,37 +129,42 @@ def super_len(o):
total_length = None
current_position = 0
- if hasattr(o, '__len__'):
+ if hasattr(o, "__len__"):
total_length = len(o)
- elif hasattr(o, 'len'):
+ elif hasattr(o, "len"):
total_length = o.len
- elif hasattr(o, 'fileno'):
+ elif hasattr(o, "fileno"):
try:
fileno = o.fileno()
- except io.UnsupportedOperation:
+ except (io.UnsupportedOperation, AttributeError):
+ # AttributeError is a surprising exception, seeing as how we've just checked
+ # that `hasattr(o, 'fileno')`. It happens for objects obtained via
+ # `Tarfile.extractfile()`, per issue 5229.
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
- if 'b' not in o.mode:
- warnings.warn((
- "Requests has determined the content-length for this "
- "request using the binary size of the file: however, the "
- "file has been opened in text mode (i.e. without the 'b' "
- "flag in the mode). This may lead to an incorrect "
- "content-length. In Requests 3.0, support will be removed "
- "for files in text mode."),
- FileModeWarning
+ if "b" not in o.mode:
+ warnings.warn(
+ (
+ "Requests has determined the content-length for this "
+ "request using the binary size of the file: however, the "
+ "file has been opened in text mode (i.e. without the 'b' "
+ "flag in the mode). This may lead to an incorrect "
+ "content-length. In Requests 3.0, support will be removed "
+ "for files in text mode."
+ ),
+ FileModeWarning,
)
- if hasattr(o, 'tell'):
+ if hasattr(o, "tell"):
try:
current_position = o.tell()
- except (OSError, IOError):
+ except OSError:
# This can happen in some weird situations, such as when the file
# is actually a special file descriptor like stdin. In this
# instance, we don't know what the length is, so set it to zero and
@@ -153,8 +172,8 @@ def super_len(o):
if total_length is not None:
current_position = total_length
else:
- if hasattr(o, 'seek') and total_length is None:
- # StringIO and BytesIO have seek but no useable fileno
+ if hasattr(o, "seek") and total_length is None:
+ # StringIO and BytesIO have seek but no usable fileno
try:
# seek to end of file
o.seek(0, 2)
@@ -163,7 +182,7 @@ def super_len(o):
# seek back to current position to support
# partially read file-like objects
o.seek(current_position or 0)
- except (OSError, IOError):
+ except OSError:
total_length = 0
if total_length is None:
@@ -175,14 +194,14 @@ def super_len(o):
def get_netrc_auth(url, raise_errors=False):
"""Returns the Requests tuple auth for a given url from netrc."""
- netrc_file = os.environ.get('NETRC')
+ netrc_file = os.environ.get("NETRC")
if netrc_file is not None:
netrc_locations = (netrc_file,)
else:
- netrc_locations = ('~/{}'.format(f) for f in NETRC_FILES)
+ netrc_locations = (f"~/{f}" for f in NETRC_FILES)
try:
- from netrc import netrc, NetrcParseError
+ from netrc import NetrcParseError, netrc
netrc_path = None
@@ -207,18 +226,18 @@ def get_netrc_auth(url, raise_errors=False):
# Strip port numbers from netloc. This weird `if...encode`` dance is
# used for Python 3.2, which doesn't support unicode literals.
- splitstr = b':'
+ splitstr = b":"
if isinstance(url, str):
- splitstr = splitstr.decode('ascii')
+ splitstr = splitstr.decode("ascii")
host = ri.netloc.split(splitstr)[0]
try:
_netrc = netrc(netrc_path).authenticators(host)
if _netrc:
# Return with login / password
- login_i = (0 if _netrc[0] else 1)
+ login_i = 0 if _netrc[0] else 1
return (_netrc[login_i], _netrc[2])
- except (NetrcParseError, IOError):
+ except (NetrcParseError, OSError):
# If there was a parsing error or a permissions issue reading the file,
# we'll just skip netrc auth unless explicitly asked to raise errors.
if raise_errors:
@@ -231,9 +250,8 @@ def get_netrc_auth(url, raise_errors=False):
def guess_filename(obj):
"""Tries to guess the filename of the given object."""
- name = getattr(obj, 'name', None)
- if (name and isinstance(name, basestring) and name[0] != '<' and
- name[-1] != '>'):
+ name = getattr(obj, "name", None)
+ if name and isinstance(name, basestring) and name[0] != "<" and name[-1] != ">":
return os.path.basename(name)
@@ -251,7 +269,11 @@ def extract_zipped_paths(path):
archive, member = os.path.split(path)
while archive and not os.path.exists(archive):
archive, prefix = os.path.split(archive)
- member = '/'.join([prefix, member])
+ if not prefix:
+ # If we don't check for an empty prefix after the split (in other words, archive remains unchanged after the split),
+ # we _can_ end up in an infinite loop on a rare corner case affecting a small number of users
+ break
+ member = "/".join([prefix, member])
if not zipfile.is_zipfile(archive):
return path
@@ -262,7 +284,7 @@ def extract_zipped_paths(path):
# we have a valid zip archive and a valid member of that archive
tmp = tempfile.gettempdir()
- extracted_path = os.path.join(tmp, member.split('/')[-1])
+ extracted_path = os.path.join(tmp, member.split("/")[-1])
if not os.path.exists(extracted_path):
# use read + write to avoid the creating nested folders, we only want the file, avoids mkdir racing condition
with atomic_open(extracted_path) as file_handler:
@@ -273,12 +295,11 @@ def extract_zipped_paths(path):
@contextlib.contextmanager
def atomic_open(filename):
"""Write a file to the disk in an atomic fashion"""
- replacer = os.rename if sys.version_info[0] == 2 else os.replace
tmp_descriptor, tmp_name = tempfile.mkstemp(dir=os.path.dirname(filename))
try:
- with os.fdopen(tmp_descriptor, 'wb') as tmp_handler:
+ with os.fdopen(tmp_descriptor, "wb") as tmp_handler:
yield tmp_handler
- replacer(tmp_name, filename)
+ os.replace(tmp_name, filename)
except BaseException:
os.remove(tmp_name)
raise
@@ -306,7 +327,7 @@ def from_key_val_list(value):
return None
if isinstance(value, (str, bytes, bool, int)):
- raise ValueError('cannot encode objects that are not 2-tuples')
+ raise ValueError("cannot encode objects that are not 2-tuples")
return OrderedDict(value)
@@ -332,7 +353,7 @@ def to_key_val_list(value):
return None
if isinstance(value, (str, bytes, bool, int)):
- raise ValueError('cannot encode objects that are not 2-tuples')
+ raise ValueError("cannot encode objects that are not 2-tuples")
if isinstance(value, Mapping):
value = value.items()
@@ -397,10 +418,10 @@ def parse_dict_header(value):
"""
result = {}
for item in _parse_list_header(value):
- if '=' not in item:
+ if "=" not in item:
result[item] = None
continue
- name, value = item.split('=', 1)
+ name, value = item.split("=", 1)
if value[:1] == value[-1:] == '"':
value = unquote_header_value(value[1:-1])
result[name] = value
@@ -428,8 +449,8 @@ def unquote_header_value(value, is_filename=False):
# replace sequence below on a UNC path has the effect of turning
# the leading double slash into a single slash and then
# _fix_ie_filename() doesn't work correctly. See #458.
- if not is_filename or value[:2] != '\\\\':
- return value.replace('\\\\', '\\').replace('\\"', '"')
+ if not is_filename or value[:2] != "\\\\":
+ return value.replace("\\\\", "\\").replace('\\"', '"')
return value
@@ -464,19 +485,24 @@ def get_encodings_from_content(content):
:param content: bytestring to extract encodings from.
"""
- warnings.warn((
- 'In requests 3.0, get_encodings_from_content will be removed. For '
- 'more information, please see the discussion on issue #2266. (This'
- ' warning should only appear once.)'),
- DeprecationWarning)
+ warnings.warn(
+ (
+ "In requests 3.0, get_encodings_from_content will be removed. For "
+ "more information, please see the discussion on issue #2266. (This"
+ " warning should only appear once.)"
+ ),
+ DeprecationWarning,
+ )
charset_re = re.compile(r'<meta.*?charset=["\']*(.+?)["\'>]', flags=re.I)
pragma_re = re.compile(r'<meta.*?content=["\']*;?charset=(.+?)["\'>]', flags=re.I)
xml_re = re.compile(r'^<\?xml.*?encoding=["\']*(.+?)["\'>]')
- return (charset_re.findall(content) +
- pragma_re.findall(content) +
- xml_re.findall(content))
+ return (
+ charset_re.findall(content)
+ + pragma_re.findall(content)
+ + xml_re.findall(content)
+ )
def _parse_content_type_header(header):
@@ -487,7 +513,7 @@ def _parse_content_type_header(header):
parameters
"""
- tokens = header.split(';')
+ tokens = header.split(";")
content_type, params = tokens[0].strip(), tokens[1:]
params_dict = {}
items_to_strip = "\"' "
@@ -499,7 +525,7 @@ def _parse_content_type_header(header):
index_of_equals = param.find("=")
if index_of_equals != -1:
key = param[:index_of_equals].strip(items_to_strip)
- value = param[index_of_equals + 1:].strip(items_to_strip)
+ value = param[index_of_equals + 1 :].strip(items_to_strip)
params_dict[key.lower()] = value
return content_type, params_dict
@@ -511,38 +537,37 @@ def get_encoding_from_headers(headers):
:rtype: str
"""
- content_type = headers.get('content-type')
+ content_type = headers.get("content-type")
if not content_type:
return None
content_type, params = _parse_content_type_header(content_type)
- if 'charset' in params:
- return params['charset'].strip("'\"")
+ if "charset" in params:
+ return params["charset"].strip("'\"")
- if 'text' in content_type:
- return 'ISO-8859-1'
+ if "text" in content_type:
+ return "ISO-8859-1"
- if 'application/json' in content_type:
+ if "application/json" in content_type:
# Assume UTF-8 based on RFC 4627: https://www.ietf.org/rfc/rfc4627.txt since the charset was unset
- return 'utf-8'
+ return "utf-8"
def stream_decode_response_unicode(iterator, r):
- """Stream decodes a iterator."""
+ """Stream decodes an iterator."""
if r.encoding is None:
- for item in iterator:
- yield item
+ yield from iterator
return
- decoder = codecs.getincrementaldecoder(r.encoding)(errors='replace')
+ decoder = codecs.getincrementaldecoder(r.encoding)(errors="replace")
for chunk in iterator:
rv = decoder.decode(chunk)
if rv:
yield rv
- rv = decoder.decode(b'', final=True)
+ rv = decoder.decode(b"", final=True)
if rv:
yield rv
@@ -553,7 +578,7 @@ def iter_slices(string, slice_length):
if slice_length is None or slice_length <= 0:
slice_length = len(string)
while pos < len(string):
- yield string[pos:pos + slice_length]
+ yield string[pos : pos + slice_length]
pos += slice_length
@@ -569,11 +594,14 @@ def get_unicode_from_response(r):
:rtype: str
"""
- warnings.warn((
- 'In requests 3.0, get_unicode_from_response will be removed. For '
- 'more information, please see the discussion on issue #2266. (This'
- ' warning should only appear once.)'),
- DeprecationWarning)
+ warnings.warn(
+ (
+ "In requests 3.0, get_unicode_from_response will be removed. For "
+ "more information, please see the discussion on issue #2266. (This"
+ " warning should only appear once.)"
+ ),
+ DeprecationWarning,
+ )
tried_encodings = []
@@ -588,14 +616,15 @@ def get_unicode_from_response(r):
# Fall back:
try:
- return str(r.content, encoding, errors='replace')
+ return str(r.content, encoding, errors="replace")
except TypeError:
return r.content
# The unreserved URI characters (RFC 3986)
UNRESERVED_SET = frozenset(
- "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~")
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" + "0123456789-._~"
+)
def unquote_unreserved(uri):
@@ -604,22 +633,22 @@ def unquote_unreserved(uri):
:rtype: str
"""
- parts = uri.split('%')
+ parts = uri.split("%")
for i in range(1, len(parts)):
h = parts[i][0:2]
if len(h) == 2 and h.isalnum():
try:
c = chr(int(h, 16))
except ValueError:
- raise InvalidURL("Invalid percent-escape sequence: '%s'" % h)
+ raise InvalidURL(f"Invalid percent-escape sequence: '{h}'")
if c in UNRESERVED_SET:
parts[i] = c + parts[i][2:]
else:
- parts[i] = '%' + parts[i]
+ parts[i] = f"%{parts[i]}"
else:
- parts[i] = '%' + parts[i]
- return ''.join(parts)
+ parts[i] = f"%{parts[i]}"
+ return "".join(parts)
def requote_uri(uri):
@@ -652,10 +681,10 @@ def address_in_network(ip, net):
:rtype: bool
"""
- ipaddr = struct.unpack('=L', socket.inet_aton(ip))[0]
- netaddr, bits = net.split('/')
- netmask = struct.unpack('=L', socket.inet_aton(dotted_netmask(int(bits))))[0]
- network = struct.unpack('=L', socket.inet_aton(netaddr))[0] & netmask
+ ipaddr = struct.unpack("=L", socket.inet_aton(ip))[0]
+ netaddr, bits = net.split("/")
+ netmask = struct.unpack("=L", socket.inet_aton(dotted_netmask(int(bits))))[0]
+ network = struct.unpack("=L", socket.inet_aton(netaddr))[0] & netmask
return (ipaddr & netmask) == (network & netmask)
@@ -666,8 +695,8 @@ def dotted_netmask(mask):
:rtype: str
"""
- bits = 0xffffffff ^ (1 << 32 - mask) - 1
- return socket.inet_ntoa(struct.pack('>I', bits))
+ bits = 0xFFFFFFFF ^ (1 << 32 - mask) - 1
+ return socket.inet_ntoa(struct.pack(">I", bits))
def is_ipv4_address(string_ip):
@@ -676,7 +705,7 @@ def is_ipv4_address(string_ip):
"""
try:
socket.inet_aton(string_ip)
- except socket.error:
+ except OSError:
return False
return True
@@ -687,9 +716,9 @@ def is_valid_cidr(string_network):
:rtype: bool
"""
- if string_network.count('/') == 1:
+ if string_network.count("/") == 1:
try:
- mask = int(string_network.split('/')[1])
+ mask = int(string_network.split("/")[1])
except ValueError:
return False
@@ -697,8 +726,8 @@ def is_valid_cidr(string_network):
return False
try:
- socket.inet_aton(string_network.split('/')[0])
- except socket.error:
+ socket.inet_aton(string_network.split("/")[0])
+ except OSError:
return False
else:
return False
@@ -735,13 +764,14 @@ def should_bypass_proxies(url, no_proxy):
"""
# Prioritize lowercase environment variables over uppercase
# to keep a consistent behaviour with other http projects (curl, wget).
- get_proxy = lambda k: os.environ.get(k) or os.environ.get(k.upper())
+ def get_proxy(key):
+ return os.environ.get(key) or os.environ.get(key.upper())
# First check whether no_proxy is defined. If it is, check that the URL
# we're getting isn't in the no_proxy list.
no_proxy_arg = no_proxy
if no_proxy is None:
- no_proxy = get_proxy('no_proxy')
+ no_proxy = get_proxy("no_proxy")
parsed = urlparse(url)
if parsed.hostname is None:
@@ -751,9 +781,7 @@ def should_bypass_proxies(url, no_proxy):
if no_proxy:
# We need to check whether we match here. We need to see if we match
# the end of the hostname, both with and without the port.
- no_proxy = (
- host for host in no_proxy.replace(' ', '').split(',') if host
- )
+ no_proxy = (host for host in no_proxy.replace(" ", "").split(",") if host)
if is_ipv4_address(parsed.hostname):
for proxy_ip in no_proxy:
@@ -767,7 +795,7 @@ def should_bypass_proxies(url, no_proxy):
else:
host_with_port = parsed.hostname
if parsed.port:
- host_with_port += ':{}'.format(parsed.port)
+ host_with_port += f":{parsed.port}"
for host in no_proxy:
if parsed.hostname.endswith(host) or host_with_port.endswith(host):
@@ -775,7 +803,7 @@ def should_bypass_proxies(url, no_proxy):
# to apply the proxies on this URL.
return True
- with set_environ('no_proxy', no_proxy_arg):
+ with set_environ("no_proxy", no_proxy_arg):
# parsed.hostname can be `None` in cases such as a file URI.
try:
bypass = proxy_bypass(parsed.hostname)
@@ -809,13 +837,13 @@ def select_proxy(url, proxies):
proxies = proxies or {}
urlparts = urlparse(url)
if urlparts.hostname is None:
- return proxies.get(urlparts.scheme, proxies.get('all'))
+ return proxies.get(urlparts.scheme, proxies.get("all"))
proxy_keys = [
- urlparts.scheme + '://' + urlparts.hostname,
+ urlparts.scheme + "://" + urlparts.hostname,
urlparts.scheme,
- 'all://' + urlparts.hostname,
- 'all',
+ "all://" + urlparts.hostname,
+ "all",
]
proxy = None
for proxy_key in proxy_keys:
@@ -826,25 +854,54 @@ def select_proxy(url, proxies):
return proxy
+def resolve_proxies(request, proxies, trust_env=True):
+ """This method takes proxy information from a request and configuration
+ input to resolve a mapping of target proxies. This will consider settings
+ such a NO_PROXY to strip proxy configurations.
+
+ :param request: Request or PreparedRequest
+ :param proxies: A dictionary of schemes or schemes and hosts to proxy URLs
+ :param trust_env: Boolean declaring whether to trust environment configs
+
+ :rtype: dict
+ """
+ proxies = proxies if proxies is not None else {}
+ url = request.url
+ scheme = urlparse(url).scheme
+ no_proxy = proxies.get("no_proxy")
+ new_proxies = proxies.copy()
+
+ if trust_env and not should_bypass_proxies(url, no_proxy=no_proxy):
+ environ_proxies = get_environ_proxies(url, no_proxy=no_proxy)
+
+ proxy = environ_proxies.get(scheme, environ_proxies.get("all"))
+
+ if proxy:
+ new_proxies.setdefault(scheme, proxy)
+ return new_proxies
+
+
def default_user_agent(name="python-requests"):
"""
Return a string representing the default user agent.
:rtype: str
"""
- return '%s/%s' % (name, __version__)
+ return f"{name}/{__version__}"
def default_headers():
"""
:rtype: requests.structures.CaseInsensitiveDict
"""
- return CaseInsensitiveDict({
- 'User-Agent': default_user_agent(),
- 'Accept-Encoding': DEFAULT_ACCEPT_ENCODING,
- 'Accept': '*/*',
- 'Connection': 'keep-alive',
- })
+ return CaseInsensitiveDict(
+ {
+ "User-Agent": default_user_agent(),
+ "Accept-Encoding": DEFAULT_ACCEPT_ENCODING,
+ "Accept": "*/*",
+ "Connection": "keep-alive",
+ }
+ )
def parse_header_links(value):
@@ -857,23 +914,23 @@ def parse_header_links(value):
links = []
- replace_chars = ' \'"'
+ replace_chars = " '\""
value = value.strip(replace_chars)
if not value:
return links
- for val in re.split(', *<', value):
+ for val in re.split(", *<", value):
try:
- url, params = val.split(';', 1)
+ url, params = val.split(";", 1)
except ValueError:
- url, params = val, ''
+ url, params = val, ""
- link = {'url': url.strip('<> \'"')}
+ link = {"url": url.strip("<> '\"")}
- for param in params.split(';'):
+ for param in params.split(";"):
try:
- key, value = param.split('=')
+ key, value = param.split("=")
except ValueError:
break
@@ -885,7 +942,7 @@ def parse_header_links(value):
# Null bytes; no need to recreate these on each call to guess_json_utf
-_null = '\x00'.encode('ascii') # encoding to ASCII for Python 3
+_null = "\x00".encode("ascii") # encoding to ASCII for Python 3
_null2 = _null * 2
_null3 = _null * 3
@@ -899,25 +956,25 @@ def guess_json_utf(data):
# determine the encoding. Also detect a BOM, if present.
sample = data[:4]
if sample in (codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
- return 'utf-32' # BOM included
+ return "utf-32" # BOM included
if sample[:3] == codecs.BOM_UTF8:
- return 'utf-8-sig' # BOM included, MS style (discouraged)
+ return "utf-8-sig" # BOM included, MS style (discouraged)
if sample[:2] in (codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE):
- return 'utf-16' # BOM included
+ return "utf-16" # BOM included
nullcount = sample.count(_null)
if nullcount == 0:
- return 'utf-8'
+ return "utf-8"
if nullcount == 2:
- if sample[::2] == _null2: # 1st and 3rd are null
- return 'utf-16-be'
+ if sample[::2] == _null2: # 1st and 3rd are null
+ return "utf-16-be"
if sample[1::2] == _null2: # 2nd and 4th are null
- return 'utf-16-le'
+ return "utf-16-le"
# Did not detect 2 valid UTF-16 ascii-range characters
if nullcount == 3:
if sample[:3] == _null3:
- return 'utf-32-be'
+ return "utf-32-be"
if sample[1:] == _null3:
- return 'utf-32-le'
+ return "utf-32-le"
# Did not detect a valid UTF-32 ascii-range character
return None
@@ -928,15 +985,27 @@ def prepend_scheme_if_needed(url, new_scheme):
:rtype: str
"""
- scheme, netloc, path, params, query, fragment = urlparse(url, new_scheme)
-
- # urlparse is a finicky beast, and sometimes decides that there isn't a
- # netloc present. Assume that it's being over-cautious, and switch netloc
- # and path if urlparse decided there was no netloc.
+ parsed = parse_url(url)
+ scheme, auth, host, port, path, query, fragment = parsed
+
+ # A defect in urlparse determines that there isn't a netloc present in some
+ # urls. We previously assumed parsing was overly cautious, and swapped the
+ # netloc and path. Due to a lack of tests on the original defect, this is
+ # maintained with parse_url for backwards compatibility.
+ netloc = parsed.netloc
if not netloc:
netloc, path = path, netloc
- return urlunparse((scheme, netloc, path, params, query, fragment))
+ if auth:
+ # parse_url doesn't provide the netloc with auth
+ # so we'll add it ourselves.
+ netloc = "@".join([auth, netloc])
+ if scheme is None:
+ scheme = new_scheme
+ if path is None:
+ path = ""
+
+ return urlunparse((scheme, netloc, path, "", query, fragment))
def get_auth_from_url(url):
@@ -950,35 +1019,36 @@ def get_auth_from_url(url):
try:
auth = (unquote(parsed.username), unquote(parsed.password))
except (AttributeError, TypeError):
- auth = ('', '')
+ auth = ("", "")
return auth
-# Moved outside of function to avoid recompile every call
-_CLEAN_HEADER_REGEX_BYTE = re.compile(b'^\\S[^\\r\\n]*$|^$')
-_CLEAN_HEADER_REGEX_STR = re.compile(r'^\S[^\r\n]*$|^$')
-
-
def check_header_validity(header):
- """Verifies that header value is a string which doesn't contain
- leading whitespace or return characters. This prevents unintended
- header injection.
+ """Verifies that header parts don't contain leading whitespace
+ reserved characters, or return characters.
:param header: tuple, in the format (name, value).
"""
name, value = header
- if isinstance(value, bytes):
- pat = _CLEAN_HEADER_REGEX_BYTE
- else:
- pat = _CLEAN_HEADER_REGEX_STR
- try:
- if not pat.match(value):
- raise InvalidHeader("Invalid return character or leading space in header: %s" % name)
- except TypeError:
- raise InvalidHeader("Value for header {%s: %s} must be of type str or "
- "bytes, not %s" % (name, value, type(value)))
+ for part in header:
+ if type(part) not in HEADER_VALIDATORS:
+ raise InvalidHeader(
+ f"Header part ({part!r}) from {{{name!r}: {value!r}}} must be "
+ f"of type str or bytes, not {type(part)}"
+ )
+
+ _validate_header_part(name, "name", HEADER_VALIDATORS[type(name)][0])
+ _validate_header_part(value, "value", HEADER_VALIDATORS[type(value)][1])
+
+
+def _validate_header_part(header_part, header_kind, validator):
+ if not validator.match(header_part):
+ raise InvalidHeader(
+ f"Invalid leading whitespace, reserved character(s), or return"
+ f"character(s) in header {header_kind}: {header_part!r}"
+ )
def urldefragauth(url):
@@ -993,21 +1063,24 @@ def urldefragauth(url):
if not netloc:
netloc, path = path, netloc
- netloc = netloc.rsplit('@', 1)[-1]
+ netloc = netloc.rsplit("@", 1)[-1]
- return urlunparse((scheme, netloc, path, params, query, ''))
+ return urlunparse((scheme, netloc, path, params, query, ""))
def rewind_body(prepared_request):
"""Move file pointer back to its recorded starting position
so it can be read again on redirect.
"""
- body_seek = getattr(prepared_request.body, 'seek', None)
- if body_seek is not None and isinstance(prepared_request._body_position, integer_types):
+ body_seek = getattr(prepared_request.body, "seek", None)
+ if body_seek is not None and isinstance(
+ prepared_request._body_position, integer_types
+ ):
try:
body_seek(prepared_request._body_position)
- except (IOError, OSError):
- raise UnrewindableBodyError("An error occurred when rewinding request "
- "body for redirect.")
+ except OSError:
+ raise UnrewindableBodyError(
+ "An error occurred when rewinding request body for redirect."
+ )
else:
raise UnrewindableBodyError("Unable to rewind request body for redirect.")