summaryrefslogtreecommitdiff
path: root/requests_cache/cache_keys.py
diff options
context:
space:
mode:
Diffstat (limited to 'requests_cache/cache_keys.py')
-rw-r--r--requests_cache/cache_keys.py243
1 files changed, 124 insertions, 119 deletions
diff --git a/requests_cache/cache_keys.py b/requests_cache/cache_keys.py
index d8f787c..953d5c0 100644
--- a/requests_cache/cache_keys.py
+++ b/requests_cache/cache_keys.py
@@ -8,178 +8,184 @@ from __future__ import annotations
import json
from hashlib import blake2b
-from operator import itemgetter
-from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Tuple, Union
+from logging import getLogger
+from typing import TYPE_CHECKING, Dict, Iterable, List, Mapping, Optional, Union
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
from requests import Request, Session
from requests.models import CaseInsensitiveDict
-from requests.utils import default_headers
from url_normalize import url_normalize
from . import get_valid_kwargs
if TYPE_CHECKING:
- from .models import AnyRequest
+ from .models import AnyPreparedRequest, AnyRequest, CachedResponse
-DEFAULT_REQUEST_HEADERS = default_headers()
+__all__ = ['create_key', 'normalize_request']
+# Request headers that are always excluded from cache keys, but not redacted from cached responses
DEFAULT_EXCLUDE_HEADERS = {'Cache-Control', 'If-None-Match', 'If-Modified-Since'}
+
+ParamList = Optional[Iterable[str]]
RequestContent = Union[Mapping, str, bytes]
+logger = getLogger(__name__)
+
def create_key(
request: AnyRequest = None,
- ignored_parameters: Iterable[str] = None,
- match_headers: Union[Iterable[str], bool] = False,
- **kwargs,
+ ignored_parameters: ParamList = None,
+ match_headers: Union[ParamList, bool] = False,
+ **request_kwargs,
) -> str:
"""Create a normalized cache key from either a request object or :py:class:`~requests.Request`
arguments
+
+ Args:
+ request: Request object to generate a cache key from
+ ignored_parameters: Request parames, headers, and/or body params to not match against
+ match_headers: Match only the specified headers, or ``True`` to match all headers
+ request_kwargs: Request arguments to generate a cache key from
"""
- # Create a PreparedRequest, if needed
+ # Convert raw request arguments into a request object, if needed
if not request:
- request_kwargs = get_valid_kwargs(Request.__init__, kwargs)
- request = Session().prepare_request(Request(**request_kwargs))
- if TYPE_CHECKING:
- assert request is not None
-
- # Add method and relevant request settings
+ request = Request(**get_valid_kwargs(Request.__init__, request_kwargs))
+
+ # Normalize and gather all relevant request info to match against
+ request = normalize_request(request, ignored_parameters)
+ key_parts = [
+ request.method or '',
+ request.url,
+ request.body or '',
+ request_kwargs.get('verify', True),
+ *get_matched_headers(request.headers, match_headers),
+ ]
+
+ # Generate a hash based on this info
key = blake2b(digest_size=8)
- key.update(encode((request.method or '').upper()))
- key.update(encode(kwargs.get('verify', True)))
-
- # Add filtered/normalized URL + request params
- url = remove_ignored_url_params(request.url, ignored_parameters)
- key.update(encode(url_normalize(url)))
-
- # Add filtered request body
- body = remove_ignored_body_params(request, ignored_parameters)
- if body:
- key.update(body)
-
- # Add filtered/normalized headers
- headers = get_matched_headers(request.headers, ignored_parameters, match_headers)
- for k, v in headers.items():
- key.update(encode(f'{k}={v}'))
-
+ for part in key_parts:
+ key.update(encode(part))
return key.hexdigest()
def get_matched_headers(
- headers: CaseInsensitiveDict, ignored_parameters: Optional[Iterable[str]], match_headers
-) -> Dict:
- """Get only the headers we should match against, given an optional include list and/or exclude
- list. Also normalizes headers (sorted/lowercased keys).
+ headers: CaseInsensitiveDict, match_headers: Union[ParamList, bool]
+) -> List[str]:
+ """Get only the headers we should match against as a list of ``k=v`` strings, given an optional
+ include list.
"""
if not match_headers:
- return {}
+ return []
- included = set(match_headers if isinstance(match_headers, Iterable) else headers.keys())
- included -= set(ignored_parameters or [])
- included -= DEFAULT_EXCLUDE_HEADERS
- return {k.lower(): headers[k] for k in sorted(included) if k in headers}
+ if isinstance(match_headers, Iterable):
+ included = set(match_headers) - DEFAULT_EXCLUDE_HEADERS
+ else:
+ included = set(headers) - DEFAULT_EXCLUDE_HEADERS
+ return [f'{k.lower()}={headers[k]}' for k in included if k in headers]
-def remove_ignored_headers(
- headers: Mapping, ignored_parameters: Optional[Iterable[str]]
-) -> CaseInsensitiveDict:
- """Remove any ignored request headers"""
- if not ignored_parameters:
- return CaseInsensitiveDict(headers)
- headers = CaseInsensitiveDict(headers)
- for k in ignored_parameters:
- headers.pop(k, None)
- return headers
+def normalize_request(request: AnyRequest, ignored_parameters: ParamList) -> AnyPreparedRequest:
+ """Normalize and remove ignored parameters from request URL, body, and headers.
+ This is used for both:
+ * Increasing cache hits by generating more precise cache keys
+ * Redacting potentially sensitive info from cached requests
+
+ Args:
+ request: Request object to normalize
+ ignored_parameters: Request parames, headers, and/or body params to not match against and
+ to remove from the request
+ """
+ if isinstance(request, Request):
+ norm_request = Session().prepare_request(request)
+ else:
+ norm_request = request.copy()
+
+ norm_request.method = (norm_request.method or '').upper()
+ norm_request.url = normalize_url(norm_request.url, ignored_parameters)
+ norm_request.headers = normalize_headers(norm_request.headers, ignored_parameters)
+ norm_request.body = normalize_body(norm_request, ignored_parameters)
+ return norm_request
-def remove_ignored_params(
- request: AnyRequest, ignored_parameters: Optional[Iterable[str]]
-) -> AnyRequest:
- """Remove ignored parameters from request URL, body, and headers"""
- if not ignored_parameters:
- return request
- request.headers = remove_ignored_headers(request.headers, ignored_parameters)
- request.url = remove_ignored_url_params(request.url, ignored_parameters)
- request.body = remove_ignored_body_params(request, ignored_parameters)
- return request
+def normalize_headers(headers: Mapping[str, str], ignored_parameters: ParamList) -> CaseInsensitiveDict:
+ """Sort and filter request headers"""
+ if ignored_parameters:
+ headers = filter_sort_dict(headers, ignored_parameters)
+ return CaseInsensitiveDict(headers)
-def remove_ignored_url_params(url: Optional[str], ignored_parameters: Optional[Iterable[str]]) -> str:
- """Remove any ignored request parameters from the URL"""
- if not ignored_parameters or not url:
- return url or ''
+def normalize_url(url: str, ignored_parameters: ParamList) -> str:
+ """Normalize and filter a URL. This includes request parameters, IDN domains, scheme, host,
+ port, etc.
+ """
+ # Strip query params from URL, sort and filter, and reassemble into a complete URL
url_tokens = urlparse(url)
- query = _filter_params(parse_qsl(url_tokens.query), ignored_parameters)
- return urlunparse(
+ url = urlunparse(
(
url_tokens.scheme,
url_tokens.netloc,
url_tokens.path,
url_tokens.params,
- urlencode(query),
+ normalize_params(url_tokens.query, ignored_parameters),
url_tokens.fragment,
)
)
+ return url_normalize(url)
-def remove_ignored_body_params(
- request: AnyRequest, ignored_parameters: Optional[Iterable[str]]
-) -> bytes:
- """Remove any ignored parameters from the request body"""
- original_body = request.body
- filtered_body: Union[str, bytes] = b''
- content_type = request.headers.get('content-type')
- if not ignored_parameters or not original_body or not content_type:
- return encode(original_body)
-
- if content_type == 'application/x-www-form-urlencoded':
- body = _filter_params(parse_qsl(decode(original_body)), ignored_parameters)
- filtered_body = urlencode(body)
- elif content_type == 'application/json':
- body = json.loads(decode(original_body)).items()
- body = _filter_params(sorted(body), ignored_parameters)
- filtered_body = json.dumps(body)
- else:
- filtered_body = original_body
+
+def normalize_body(request: AnyPreparedRequest, ignored_parameters: ParamList) -> bytes:
+ """Normalize and filter a request body if possible, depending on Content-Type"""
+ original_body = request.body or b''
+ content_type = request.headers.get('Content-Type')
+
+ # Filter and sort params if possible
+ filtered_body: Union[str, bytes] = original_body
+ if content_type == 'application/json':
+ filtered_body = normalize_json_body(original_body, ignored_parameters)
+ elif content_type == 'application/x-www-form-urlencoded':
+ filtered_body = normalize_params(original_body, ignored_parameters)
return encode(filtered_body)
-def _filter_params(
- data: List[Tuple[str, str]], ignored_parameters: Iterable[str]
-) -> List[Tuple[str, str]]:
- return [(k, v) for k, v in data if k not in set(ignored_parameters)]
+# TODO: Skip this for a very large response body?
+def normalize_json_body(
+ original_body: Union[str, bytes], ignored_parameters: ParamList
+) -> Union[str, bytes]:
+ """Normalize and filter a request body with serialized JSON data"""
+ try:
+ body = json.loads(decode(original_body))
+ body = filter_sort_dict(body, ignored_parameters)
+ return json.dumps(body)
+ # If it's invalid JSON, then don't mess with it
+ except (AttributeError, TypeError, ValueError):
+ logger.warning('Invalid JSON body:', exc_info=True)
+ return original_body
-def normalize_dict(
- items: Optional[RequestContent], normalize_data: bool = True
-) -> Optional[RequestContent]:
- """Sort items in a dict
+# TODO: More thorough tests
+def normalize_params(value: Union[str, bytes], ignored_parameters: ParamList) -> str:
+ """Normalize and filter urlencoded params from either a URL or request body with form data"""
+ params = dict(parse_qsl(decode(value)))
+ params = filter_sort_dict(params, ignored_parameters)
+ return urlencode(params)
- Args:
- items: Request params, data, or json
- normalize_data: Also normalize stringified JSON
- """
- if not items:
- return None
- if isinstance(items, Mapping):
- return sort_dict(items)
- if normalize_data and isinstance(items, (bytes, str)):
- # Attempt to load body as JSON; not doing this by default as it could impact performance
- try:
- dict_items = json.loads(decode(items))
- dict_items = json.dumps(sort_dict(dict_items))
- return dict_items.encode('utf-8') if isinstance(items, bytes) else dict_items
- except Exception:
- pass
- return items
+def redact_response(response: CachedResponse, ignored_parameters: ParamList) -> CachedResponse:
+ """Redact any ignored parameters (potentially containing sensitive info) from a cached request"""
+ if ignored_parameters:
+ response.url = normalize_url(response.url, ignored_parameters)
+ response.request = normalize_request(response.request, ignored_parameters) # type: ignore
+ return response
-def sort_dict(d: Mapping) -> Dict:
- return dict(sorted(d.items(), key=itemgetter(0)))
+def decode(value, encoding='utf-8') -> str:
+ """Decode a value from bytes, if hasn't already been.
+ Note: ``PreparedRequest.body`` is always encoded in utf-8.
+ """
+ return value.decode(encoding) if isinstance(value, bytes) else value
def encode(value, encoding='utf-8') -> bytes:
@@ -187,8 +193,7 @@ def encode(value, encoding='utf-8') -> bytes:
return value if isinstance(value, bytes) else str(value).encode(encoding)
-def decode(value, encoding='utf-8') -> str:
- """Decode a value from bytes, if hasn't already been.
- Note: ``PreparedRequest.body`` is always encoded in utf-8.
- """
- return value.decode(encoding) if isinstance(value, bytes) else value
+def filter_sort_dict(data: Mapping[str, str], ignored_parameters: ParamList) -> Dict[str, str]:
+ if not ignored_parameters:
+ return dict(sorted(data.items()))
+ return {k: v for k, v in sorted(data.items()) if k not in set(ignored_parameters)}