summaryrefslogtreecommitdiff
path: root/src/webob
diff options
context:
space:
mode:
authorIra Lun <sammyrosajoe@gmail.com>2017-08-29 21:08:37 +0100
committerIra Lun <sammyrosajoe@gmail.com>2017-08-29 21:08:37 +0100
commita6d4f7dd33acfca818b572629d5cf6da3a65dd04 (patch)
tree06197dc22fa45b5e89618d3feee2af70a77a2f26 /src/webob
parentbfac3ea69886c4ed893a9f4bc234dd937150cbe2 (diff)
downloadwebob-a6d4f7dd33acfca818b572629d5cf6da3a65dd04.tar.gz
Rewrite Accept class, and add docs and tests.
Diffstat (limited to 'src/webob')
-rw-r--r--src/webob/acceptparse.py425
1 files changed, 307 insertions, 118 deletions
diff --git a/src/webob/acceptparse.py b/src/webob/acceptparse.py
index 2844d4a..ae714ce 100644
--- a/src/webob/acceptparse.py
+++ b/src/webob/acceptparse.py
@@ -87,148 +87,337 @@ def _list_1_or_more__compiled_re(element_re):
class Accept(object):
"""
- Represents a generic ``Accept-*`` style header.
+ Represent an ``Accept`` header.
- This object should not be modified. To add items you can use
- ``accept_obj + 'accept_thing'`` to get a new object
+ Base class for :class:`AcceptValidHeader`, :class:`AcceptNoHeader`, and
+ :class:`AcceptInvalidHeader`.
"""
- def __init__(self, header_value):
- self.header_value = header_value
- self.parsed = list(self.parse(header_value))
- self._parsed_nonzero = [(m,q) for (m,q) in self.parsed if q]
+ # RFC 6838 describes syntax rules for media types that are different to
+ # (and stricter than) those in RFC 7231, but if RFC 7231 intended us to
+ # follow the rules in RFC 6838 for media ranges, it would not have
+ # specified its own syntax rules for media ranges, so it appears we should
+ # use the rules in RFC 7231 for now.
+
+ # RFC 5234 Appendix B.1 "Core Rules":
+ # VCHAR = %x21-7E
+ # ; visible (printing) characters
+ vchar_re = '\x21-\x7e'
+ # RFC 7230 Section 3.2.6 "Field Value Components":
+ # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE
+ # qdtext = HTAB / SP /%x21 / %x23-5B / %x5D-7E / obs-text
+ # obs-text = %x80-FF
+ # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text )
+ obs_text_re = '\x80-\xff'
+ qdtext_re = '[\t \x21\x23-\x5b\\\x5d-\x7e' + obs_text_re + ']'
+ # The '\\' between \x5b and \x5d is needed to escape \x5d (']')
+ quoted_pair_re = r'\\' + '[\t ' + vchar_re + obs_text_re + ']'
+ quoted_string_re = \
+ '"(?:(?:' + qdtext_re + ')|(?:' + quoted_pair_re + '))*"'
+
+ # RFC 7231 Section 3.1.1.1 "Media Type":
+ # type = token
+ # subtype = token
+ # parameter = token "=" ( token / quoted-string )
+ type_re = token_re
+ subtype_re = token_re
+ parameter_re = token_re + '=' + \
+ '(?:(?:' + token_re + ')|(?:' + quoted_string_re + '))'
+
+ # Section 5.3.2 "Accept":
+ # media-range = ( "*/*"
+ # / ( type "/" "*" )
+ # / ( type "/" subtype )
+ # ) *( OWS ";" OWS parameter )
+ media_range_re = (
+ '(' +
+ '(?:' + type_re + '/' + subtype_re + ')' +
+ # '*' is included through type_re and subtype_re, so this covers */*
+ # and type/*
+ ')' +
+ '(' +
+ '(?:' + OWS_re + ';' + OWS_re +
+ '(?![qQ]=)' + # media type parameter cannot be named "q"
+ parameter_re + ')*' +
+ ')'
+ )
+ # accept-params = weight *( accept-ext )
+ # accept-ext = OWS ";" OWS token [ "=" ( token / quoted-string ) ]
+ accept_ext_re = (
+ OWS_re + ';' + OWS_re + token_re + '(?:' +
+ '=(?:' +
+ '(?:' + token_re + ')|(?:' + quoted_string_re + ')' +
+ ')' +
+ ')?'
+ )
+ accept_params_re = weight_re + '((?:' + accept_ext_re + ')*)'
- @staticmethod
- def parse(value):
- """
- Parse ``Accept-*`` style header.
+ media_range_n_accept_params_re = media_range_re + '(?:' + \
+ accept_params_re + ')?'
+ media_range_n_accept_params_compiled_re = re.compile(
+ media_range_n_accept_params_re,
+ )
- Return iterator of ``(value, quality)`` pairs.
- ``quality`` defaults to 1.
- """
- for match in part_re.finditer(','+value):
- name = match.group(1)
- quality = match.group(2) or ''
- if quality:
- try:
- quality = max(min(float(quality), 1), 0)
- yield (name, quality)
- continue
- except ValueError:
- pass
- yield (name, 1)
+ accept_compiled_re = _list_0_or_more__compiled_re(
+ element_re=media_range_n_accept_params_re,
+ )
- def __repr__(self):
- return '<%s(%r)>' % (self.__class__.__name__, str(self))
+ # For parsing repeated groups within the media type parameters and
+ # extension parameters segments
+ parameters_compiled_re = re.compile(
+ OWS_re + ';' + OWS_re + '(' + token_re + ')=(' + token_re + '|' +
+ quoted_string_re + ')',
+ )
+ accept_ext_compiled_re = re.compile(
+ OWS_re + ';' + OWS_re + '(' + token_re + ')' +
+ '(?:' +
+ '=(' +
+ '(?:' +
+ '(?:' + token_re + ')|(?:' + quoted_string_re + ')' +
+ ')' +
+ ')' +
+ ')?',
+ )
- def __iter__(self):
- for m,q in sorted(
- self._parsed_nonzero,
- key=lambda i: i[1],
- reverse=True
- ):
- yield m
+ # For parsing the media types in the `offers` argument to
+ # .acceptable_offers(), we re-use the media range regex for media types.
+ # This is not intended to be a validation of the offers; its main purpose
+ # is to extract the media type and any media type parameters.
+ media_type_re = media_range_re
+ media_type_compiled_re = re.compile('^' + media_type_re + '$')
- def __str__(self):
- result = []
- for mask, quality in self.parsed:
- if quality != 1:
- mask = '%s;q=%0.*f' % (
- mask, min(len(str(quality).split('.')[1]), 3), quality)
- result.append(mask)
- return ', '.join(result)
-
- def __add__(self, other, reversed=False):
- if isinstance(other, Accept):
- other = other.header_value
- if hasattr(other, 'items'):
- other = sorted(other.items(), key=lambda item: -item[1])
- if isinstance(other, (list, tuple)):
- result = []
- for item in other:
- if isinstance(item, (list, tuple)):
- name, quality = item
- result.append('%s; q=%s' % (name, quality))
- else:
- result.append(item)
- other = ', '.join(result)
- other = str(other)
- my_value = self.header_value
- if reversed:
- other, my_value = my_value, other
- if not other:
- new_value = my_value
- elif not my_value:
- new_value = other
+ @classmethod
+ def _escape_and_quote_parameter_value(cls, param_value):
+ """
+ Escape and quote parameter value where necessary.
+
+ For media type and extension parameter values.
+ """
+ if param_value == '':
+ param_value = '""'
else:
- new_value = my_value + ', ' + other
- return self.__class__(new_value)
+ param_value = param_value.replace('\\', '\\\\').replace(
+ '"', r'\"',
+ )
+ if not token_compiled_re.match(param_value):
+ param_value = '"' + param_value + '"'
+ return param_value
- def __radd__(self, other):
- return self.__add__(other, True)
+ @classmethod
+ def _form_extension_params_segment(cls, extension_params):
+ """
+ Convert iterable of extension parameters to str segment for header.
- def __contains__(self, offer):
+ `extension_params` is an iterable where each item is either a parameter
+ string or a (name, value) tuple.
"""
- Returns true if the given object is listed in the accepted
- types.
+ extension_params_segment = ''
+ for item in extension_params:
+ try:
+ extension_params_segment += (';' + item)
+ except TypeError:
+ param_name, param_value = item
+ param_value = cls._escape_and_quote_parameter_value(
+ param_value=param_value,
+ )
+ extension_params_segment += (
+ ';' + param_name + '=' + param_value
+ )
+ return extension_params_segment
+
+ @classmethod
+ def _form_media_range(cls, type_subtype, media_type_params):
"""
- for mask, quality in self._parsed_nonzero:
- if self._match(mask, offer):
- return True
+ Combine `type_subtype` and `media_type_params` to form a media range.
- def quality(self, offer):
+ `type_subtype` is a ``str``, and `media_type_params` is an iterable of
+ (parameter name, parameter value) tuples.
"""
- Return the quality of the given offer. Returns None if there
- is no match (not 0).
+ media_type_params_segment = ''
+ for param_name, param_value in media_type_params:
+ param_value = cls._escape_and_quote_parameter_value(
+ param_value=param_value,
+ )
+ media_type_params_segment += (';' + param_name + '=' + param_value)
+ return type_subtype + media_type_params_segment
+
+ @classmethod
+ def _iterable_to_header_element(cls, iterable):
"""
- bestq = 0
- for mask, q in self.parsed:
- if self._match(mask, offer):
- bestq = max(bestq, q)
- return bestq or None
+ Convert iterable of tuples into header element ``str``.
- def best_match(self, offers, default_match=None):
+ Each tuple is expected to be in one of two forms: (media_range, qvalue,
+ extension_params_segment), or (media_range, qvalue).
"""
- Returns the best match in the sequence of offered types.
+ try:
+ media_range, qvalue, extension_params_segment = iterable
+ except ValueError:
+ media_range, qvalue = iterable
+ extension_params_segment = ''
- The sequence can be a simple sequence, or you can have
- ``(match, server_quality)`` items in the sequence. If you
- have these tuples then the client quality is multiplied by the
- server_quality to get a total. If two matches have equal
- weight, then the one that shows up first in the `offers` list
- will be returned.
+ if qvalue == 1.0:
+ if extension_params_segment:
+ element = '{};q=1{}'.format(
+ media_range, extension_params_segment,
+ )
+ else:
+ element = media_range
+ elif qvalue == 0.0:
+ element = '{};q=0{}'.format(media_range, extension_params_segment)
+ else:
+ element = '{};q={}{}'.format(
+ media_range, qvalue, extension_params_segment,
+ )
+ return element
+
+ @classmethod
+ def _parse_media_type_params(cls, media_type_params_segment):
+ """
+ Parse media type parameters segment into list of (name, value) tuples.
+ """
+ media_type_params = cls.parameters_compiled_re.findall(
+ media_type_params_segment,
+ )
+ for index, (name, value) in enumerate(media_type_params):
+ if value.startswith('"') and value.endswith('"'):
+ value = cls._process_quoted_string_token(token=value)
+ media_type_params[index] = (name, value)
+ return media_type_params
- But among matches with the same quality the match to a more specific
- requested type will be chosen. For example a match to text/* trumps */*.
+ @classmethod
+ def _process_quoted_string_token(cls, token):
+ """
+ Return unescaped and unquoted value from quoted token.
+ """
+ # RFC 7230, section 3.2.6 "Field Value Components": "Recipients that
+ # process the value of a quoted-string MUST handle a quoted-pair as if
+ # it were replaced by the octet following the backslash."
+ return re.sub(r'\\(?![\\])', '', token[1:-1]).replace('\\\\', '\\')
- default_match (default None) is returned if there is no intersection.
+ @classmethod
+ def _python_value_to_header_str(cls, value):
"""
- best_quality = -1
- best_offer = default_match
- matched_by = '*/*'
- for offer in offers:
- if isinstance(offer, (tuple, list)):
- offer, server_quality = offer
+ Convert Python value to header string for __add__/__radd__.
+ """
+ if isinstance(value, str):
+ return value
+ if hasattr(value, 'items'):
+ if value == {}:
+ value = []
else:
- server_quality = 1
- for mask, quality in self._parsed_nonzero:
- possible_quality = server_quality * quality
- if possible_quality < best_quality:
- continue
- elif possible_quality == best_quality:
- # 'text/plain' overrides 'message/*' overrides '*/*'
- # (if all match w/ the same q=)
- if matched_by.count('*') <= mask.count('*'):
- continue
- if self._match(mask, offer):
- best_quality = possible_quality
- best_offer = offer
- matched_by = mask
- return best_offer
+ value_list = []
+ for media_range, item in value.items():
+ # item is either (media range, (qvalue, extension
+ # parameters segment)), or (media range, qvalue) (supported
+ # for backward compatibility)
+ if isinstance(item, (float, int)):
+ value_list.append((media_range, item, ''))
+ else:
+ value_list.append((media_range, item[0], item[1]))
+ value = sorted(
+ value_list,
+ key=lambda item: item[1], # qvalue
+ reverse=True,
+ )
+ if isinstance(value, (tuple, list)):
+ header_elements = []
+ for item in value:
+ if isinstance(item, (tuple, list)):
+ item = cls._iterable_to_header_element(iterable=item)
+ header_elements.append(item)
+ header_str = ', '.join(header_elements)
+ else:
+ header_str = str(value)
+ return header_str
- def _match(self, mask, offer):
- _check_offer(offer)
- return mask == '*' or offer.lower() == mask.lower()
+ @classmethod
+ def parse(cls, value):
+ """
+ Parse an ``Accept`` header.
+
+ :param value: (``str``) header value
+ :return: If `value` is a valid ``Accept`` header, returns an iterator
+ of (*media_range*, *qvalue*, *media_type_params*,
+ *extension_params*) tuples, as parsed from the header from
+ left to right.
+
+ | *media_range* is the media range, including any media type
+ parameters. The media range is returned in a canonicalised
+ form (except the case of the characters are unchanged):
+ unnecessary spaces around the semicolons before media type
+ parameters are removed; the parameter values are returned in
+ a form where only the '``\``' and '``"``' characters are
+ escaped, and the values are quoted with double quotes only
+ if they need to be quoted.
+
+ | *qvalue* is the quality value of the media range.
+
+ | *media_type_params* is the media type parameters, as a list
+ of (parameter name, value) tuples.
+
+ | *extension_params* is the extension parameters, as a list
+ where each item is either a parameter string or a (parameter
+ name, value) tuple.
+ :raises ValueError: if `value` is an invalid header
+ """
+ # Check if header is valid
+ # Using Python stdlib's `re` module, there is currently no way to check
+ # the match *and* get all the groups using the same regex, so we have
+ # to do this in steps using multiple regexes.
+ if cls.accept_compiled_re.match(value) is None:
+ raise ValueError('Invalid value for an Accept header.')
+ def generator(value):
+ for match in (
+ cls.media_range_n_accept_params_compiled_re.finditer(value)
+ ):
+ groups = match.groups()
+
+ type_subtype = groups[0]
+
+ media_type_params = cls._parse_media_type_params(
+ media_type_params_segment=groups[1],
+ )
+
+ media_range = cls._form_media_range(
+ type_subtype=type_subtype,
+ media_type_params=media_type_params,
+ )
+
+ # qvalue (groups[2]) and extension_params (groups[3]) are both
+ # None if neither qvalue or extension parameters are found in
+ # the match.
+
+ qvalue = groups[2]
+ qvalue = float(qvalue) if qvalue else 1.0
+
+ extension_params = groups[3]
+ if extension_params:
+ extension_params = cls.accept_ext_compiled_re.findall(
+ extension_params,
+ )
+ for index, (token_key, token_value) in enumerate(
+ extension_params
+ ):
+ if token_value:
+ if (
+ token_value.startswith('"') and
+ token_value.endswith('"')
+ ):
+ token_value = cls._process_quoted_string_token(
+ token=token_value,
+ )
+ extension_params[index] = (
+ token_key, token_value,
+ )
+ else:
+ extension_params[index] = token_key
+ else:
+ extension_params = []
+
+ yield (
+ media_range, qvalue, media_type_params, extension_params,
+ )
+ return generator(value=value)
class NilAccept(object):