diff options
Diffstat (limited to 'src/pip/_vendor/requests/auth.py')
-rw-r--r-- | src/pip/_vendor/requests/auth.py | 156 |
1 files changed, 83 insertions, 73 deletions
diff --git a/src/pip/_vendor/requests/auth.py b/src/pip/_vendor/requests/auth.py index eeface39a..9733686dd 100644 --- a/src/pip/_vendor/requests/auth.py +++ b/src/pip/_vendor/requests/auth.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- - """ requests.auth ~~~~~~~~~~~~~ @@ -7,22 +5,21 @@ requests.auth This module contains the authentication handlers for Requests. """ +import hashlib import os import re -import time -import hashlib import threading +import time import warnings - from base64 import b64encode -from .compat import urlparse, str, basestring -from .cookies import extract_cookies_to_jar from ._internal_utils import to_native_string +from .compat import basestring, str, urlparse +from .cookies import extract_cookies_to_jar from .utils import parse_dict_header -CONTENT_TYPE_FORM_URLENCODED = 'application/x-www-form-urlencoded' -CONTENT_TYPE_MULTI_PART = 'multipart/form-data' +CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded" +CONTENT_TYPE_MULTI_PART = "multipart/form-data" def _basic_auth_str(username, password): @@ -57,23 +54,23 @@ def _basic_auth_str(username, password): # -- End Removal -- if isinstance(username, str): - username = username.encode('latin1') + username = username.encode("latin1") if isinstance(password, str): - password = password.encode('latin1') + password = password.encode("latin1") - authstr = 'Basic ' + to_native_string( - b64encode(b':'.join((username, password))).strip() + authstr = "Basic " + to_native_string( + b64encode(b":".join((username, password))).strip() ) return authstr -class AuthBase(object): +class AuthBase: """Base class that all auth implementations derive from""" def __call__(self, r): - raise NotImplementedError('Auth hooks must be callable.') + raise NotImplementedError("Auth hooks must be callable.") class HTTPBasicAuth(AuthBase): @@ -84,16 +81,18 @@ class HTTPBasicAuth(AuthBase): self.password = password def __eq__(self, other): - return all([ - self.username == getattr(other, 'username', None), - self.password == getattr(other, 'password', None) - ]) + return all( + [ + self.username == getattr(other, "username", None), + self.password == getattr(other, "password", None), + ] + ) def __ne__(self, other): return not self == other def __call__(self, r): - r.headers['Authorization'] = _basic_auth_str(self.username, self.password) + r.headers["Authorization"] = _basic_auth_str(self.username, self.password) return r @@ -101,7 +100,7 @@ class HTTPProxyAuth(HTTPBasicAuth): """Attaches HTTP Proxy Authentication to a given Request object.""" def __call__(self, r): - r.headers['Proxy-Authorization'] = _basic_auth_str(self.username, self.password) + r.headers["Proxy-Authorization"] = _basic_auth_str(self.username, self.password) return r @@ -116,9 +115,9 @@ class HTTPDigestAuth(AuthBase): def init_per_thread_state(self): # Ensure state is initialized just once per-thread - if not hasattr(self._thread_local, 'init'): + if not hasattr(self._thread_local, "init"): self._thread_local.init = True - self._thread_local.last_nonce = '' + self._thread_local.last_nonce = "" self._thread_local.nonce_count = 0 self._thread_local.chal = {} self._thread_local.pos = None @@ -129,44 +128,52 @@ class HTTPDigestAuth(AuthBase): :rtype: str """ - realm = self._thread_local.chal['realm'] - nonce = self._thread_local.chal['nonce'] - qop = self._thread_local.chal.get('qop') - algorithm = self._thread_local.chal.get('algorithm') - opaque = self._thread_local.chal.get('opaque') + realm = self._thread_local.chal["realm"] + nonce = self._thread_local.chal["nonce"] + qop = self._thread_local.chal.get("qop") + algorithm = self._thread_local.chal.get("algorithm") + opaque = self._thread_local.chal.get("opaque") hash_utf8 = None if algorithm is None: - _algorithm = 'MD5' + _algorithm = "MD5" else: _algorithm = algorithm.upper() # lambdas assume digest modules are imported at the top level - if _algorithm == 'MD5' or _algorithm == 'MD5-SESS': + if _algorithm == "MD5" or _algorithm == "MD5-SESS": + def md5_utf8(x): if isinstance(x, str): - x = x.encode('utf-8') + x = x.encode("utf-8") return hashlib.md5(x).hexdigest() + hash_utf8 = md5_utf8 - elif _algorithm == 'SHA': + elif _algorithm == "SHA": + def sha_utf8(x): if isinstance(x, str): - x = x.encode('utf-8') + x = x.encode("utf-8") return hashlib.sha1(x).hexdigest() + hash_utf8 = sha_utf8 - elif _algorithm == 'SHA-256': + elif _algorithm == "SHA-256": + def sha256_utf8(x): if isinstance(x, str): - x = x.encode('utf-8') + x = x.encode("utf-8") return hashlib.sha256(x).hexdigest() + hash_utf8 = sha256_utf8 - elif _algorithm == 'SHA-512': + elif _algorithm == "SHA-512": + def sha512_utf8(x): if isinstance(x, str): - x = x.encode('utf-8') + x = x.encode("utf-8") return hashlib.sha512(x).hexdigest() + hash_utf8 = sha512_utf8 - KD = lambda s, d: hash_utf8("%s:%s" % (s, d)) + KD = lambda s, d: hash_utf8(f"{s}:{d}") # noqa:E731 if hash_utf8 is None: return None @@ -177,10 +184,10 @@ class HTTPDigestAuth(AuthBase): #: path is request-uri defined in RFC 2616 which should not be empty path = p_parsed.path or "/" if p_parsed.query: - path += '?' + p_parsed.query + path += f"?{p_parsed.query}" - A1 = '%s:%s:%s' % (self.username, realm, self.password) - A2 = '%s:%s' % (method, path) + A1 = f"{self.username}:{realm}:{self.password}" + A2 = f"{method}:{path}" HA1 = hash_utf8(A1) HA2 = hash_utf8(A2) @@ -189,22 +196,20 @@ class HTTPDigestAuth(AuthBase): self._thread_local.nonce_count += 1 else: self._thread_local.nonce_count = 1 - ncvalue = '%08x' % self._thread_local.nonce_count - s = str(self._thread_local.nonce_count).encode('utf-8') - s += nonce.encode('utf-8') - s += time.ctime().encode('utf-8') + ncvalue = f"{self._thread_local.nonce_count:08x}" + s = str(self._thread_local.nonce_count).encode("utf-8") + s += nonce.encode("utf-8") + s += time.ctime().encode("utf-8") s += os.urandom(8) - cnonce = (hashlib.sha1(s).hexdigest()[:16]) - if _algorithm == 'MD5-SESS': - HA1 = hash_utf8('%s:%s:%s' % (HA1, nonce, cnonce)) + cnonce = hashlib.sha1(s).hexdigest()[:16] + if _algorithm == "MD5-SESS": + HA1 = hash_utf8(f"{HA1}:{nonce}:{cnonce}") if not qop: - respdig = KD(HA1, "%s:%s" % (nonce, HA2)) - elif qop == 'auth' or 'auth' in qop.split(','): - noncebit = "%s:%s:%s:%s:%s" % ( - nonce, ncvalue, cnonce, 'auth', HA2 - ) + respdig = KD(HA1, f"{nonce}:{HA2}") + elif qop == "auth" or "auth" in qop.split(","): + noncebit = f"{nonce}:{ncvalue}:{cnonce}:auth:{HA2}" respdig = KD(HA1, noncebit) else: # XXX handle auth-int. @@ -213,18 +218,20 @@ class HTTPDigestAuth(AuthBase): self._thread_local.last_nonce = nonce # XXX should the partial digests be encoded too? - base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \ - 'response="%s"' % (self.username, realm, nonce, path, respdig) + base = ( + f'username="{self.username}", realm="{realm}", nonce="{nonce}", ' + f'uri="{path}", response="{respdig}"' + ) if opaque: - base += ', opaque="%s"' % opaque + base += f', opaque="{opaque}"' if algorithm: - base += ', algorithm="%s"' % algorithm + base += f', algorithm="{algorithm}"' if entdig: - base += ', digest="%s"' % entdig + base += f', digest="{entdig}"' if qop: - base += ', qop="auth", nc=%s, cnonce="%s"' % (ncvalue, cnonce) + base += f', qop="auth", nc={ncvalue}, cnonce="{cnonce}"' - return 'Digest %s' % (base) + return f"Digest {base}" def handle_redirect(self, r, **kwargs): """Reset num_401_calls counter on redirects.""" @@ -248,13 +255,13 @@ class HTTPDigestAuth(AuthBase): # Rewind the file position indicator of the body to where # it was to resend the request. r.request.body.seek(self._thread_local.pos) - s_auth = r.headers.get('www-authenticate', '') + s_auth = r.headers.get("www-authenticate", "") - if 'digest' in s_auth.lower() and self._thread_local.num_401_calls < 2: + if "digest" in s_auth.lower() and self._thread_local.num_401_calls < 2: self._thread_local.num_401_calls += 1 - pat = re.compile(r'digest ', flags=re.IGNORECASE) - self._thread_local.chal = parse_dict_header(pat.sub('', s_auth, count=1)) + pat = re.compile(r"digest ", flags=re.IGNORECASE) + self._thread_local.chal = parse_dict_header(pat.sub("", s_auth, count=1)) # Consume content and release the original connection # to allow our new request to reuse the same one. @@ -264,8 +271,9 @@ class HTTPDigestAuth(AuthBase): extract_cookies_to_jar(prep._cookies, r.request, r.raw) prep.prepare_cookies(prep._cookies) - prep.headers['Authorization'] = self.build_digest_header( - prep.method, prep.url) + prep.headers["Authorization"] = self.build_digest_header( + prep.method, prep.url + ) _r = r.connection.send(prep, **kwargs) _r.history.append(r) _r.request = prep @@ -280,7 +288,7 @@ class HTTPDigestAuth(AuthBase): self.init_per_thread_state() # If we have a saved nonce, skip the 401 if self._thread_local.last_nonce: - r.headers['Authorization'] = self.build_digest_header(r.method, r.url) + r.headers["Authorization"] = self.build_digest_header(r.method, r.url) try: self._thread_local.pos = r.body.tell() except AttributeError: @@ -289,17 +297,19 @@ class HTTPDigestAuth(AuthBase): # file position of the previous body. Ensure it's set to # None. self._thread_local.pos = None - r.register_hook('response', self.handle_401) - r.register_hook('response', self.handle_redirect) + r.register_hook("response", self.handle_401) + r.register_hook("response", self.handle_redirect) self._thread_local.num_401_calls = 1 return r def __eq__(self, other): - return all([ - self.username == getattr(other, 'username', None), - self.password == getattr(other, 'password', None) - ]) + return all( + [ + self.username == getattr(other, "username", None), + self.password == getattr(other, "password", None), + ] + ) def __ne__(self, other): return not self == other |