diff options
| author | Ira Lun <sammyrosajoe@gmail.com> | 2017-08-29 21:08:37 +0100 |
|---|---|---|
| committer | Ira Lun <sammyrosajoe@gmail.com> | 2017-08-29 21:08:37 +0100 |
| commit | a6d4f7dd33acfca818b572629d5cf6da3a65dd04 (patch) | |
| tree | 06197dc22fa45b5e89618d3feee2af70a77a2f26 /src/webob | |
| parent | bfac3ea69886c4ed893a9f4bc234dd937150cbe2 (diff) | |
| download | webob-a6d4f7dd33acfca818b572629d5cf6da3a65dd04.tar.gz | |
Rewrite Accept class, and add docs and tests.
Diffstat (limited to 'src/webob')
| -rw-r--r-- | src/webob/acceptparse.py | 425 |
1 files changed, 307 insertions, 118 deletions
diff --git a/src/webob/acceptparse.py b/src/webob/acceptparse.py index 2844d4a..ae714ce 100644 --- a/src/webob/acceptparse.py +++ b/src/webob/acceptparse.py @@ -87,148 +87,337 @@ def _list_1_or_more__compiled_re(element_re): class Accept(object): """ - Represents a generic ``Accept-*`` style header. + Represent an ``Accept`` header. - This object should not be modified. To add items you can use - ``accept_obj + 'accept_thing'`` to get a new object + Base class for :class:`AcceptValidHeader`, :class:`AcceptNoHeader`, and + :class:`AcceptInvalidHeader`. """ - def __init__(self, header_value): - self.header_value = header_value - self.parsed = list(self.parse(header_value)) - self._parsed_nonzero = [(m,q) for (m,q) in self.parsed if q] + # RFC 6838 describes syntax rules for media types that are different to + # (and stricter than) those in RFC 7231, but if RFC 7231 intended us to + # follow the rules in RFC 6838 for media ranges, it would not have + # specified its own syntax rules for media ranges, so it appears we should + # use the rules in RFC 7231 for now. + + # RFC 5234 Appendix B.1 "Core Rules": + # VCHAR = %x21-7E + # ; visible (printing) characters + vchar_re = '\x21-\x7e' + # RFC 7230 Section 3.2.6 "Field Value Components": + # quoted-string = DQUOTE *( qdtext / quoted-pair ) DQUOTE + # qdtext = HTAB / SP /%x21 / %x23-5B / %x5D-7E / obs-text + # obs-text = %x80-FF + # quoted-pair = "\" ( HTAB / SP / VCHAR / obs-text ) + obs_text_re = '\x80-\xff' + qdtext_re = '[\t \x21\x23-\x5b\\\x5d-\x7e' + obs_text_re + ']' + # The '\\' between \x5b and \x5d is needed to escape \x5d (']') + quoted_pair_re = r'\\' + '[\t ' + vchar_re + obs_text_re + ']' + quoted_string_re = \ + '"(?:(?:' + qdtext_re + ')|(?:' + quoted_pair_re + '))*"' + + # RFC 7231 Section 3.1.1.1 "Media Type": + # type = token + # subtype = token + # parameter = token "=" ( token / quoted-string ) + type_re = token_re + subtype_re = token_re + parameter_re = token_re + '=' + \ + '(?:(?:' + token_re + ')|(?:' + quoted_string_re + '))' + + # Section 5.3.2 "Accept": + # media-range = ( "*/*" + # / ( type "/" "*" ) + # / ( type "/" subtype ) + # ) *( OWS ";" OWS parameter ) + media_range_re = ( + '(' + + '(?:' + type_re + '/' + subtype_re + ')' + + # '*' is included through type_re and subtype_re, so this covers */* + # and type/* + ')' + + '(' + + '(?:' + OWS_re + ';' + OWS_re + + '(?![qQ]=)' + # media type parameter cannot be named "q" + parameter_re + ')*' + + ')' + ) + # accept-params = weight *( accept-ext ) + # accept-ext = OWS ";" OWS token [ "=" ( token / quoted-string ) ] + accept_ext_re = ( + OWS_re + ';' + OWS_re + token_re + '(?:' + + '=(?:' + + '(?:' + token_re + ')|(?:' + quoted_string_re + ')' + + ')' + + ')?' + ) + accept_params_re = weight_re + '((?:' + accept_ext_re + ')*)' - @staticmethod - def parse(value): - """ - Parse ``Accept-*`` style header. + media_range_n_accept_params_re = media_range_re + '(?:' + \ + accept_params_re + ')?' + media_range_n_accept_params_compiled_re = re.compile( + media_range_n_accept_params_re, + ) - Return iterator of ``(value, quality)`` pairs. - ``quality`` defaults to 1. - """ - for match in part_re.finditer(','+value): - name = match.group(1) - quality = match.group(2) or '' - if quality: - try: - quality = max(min(float(quality), 1), 0) - yield (name, quality) - continue - except ValueError: - pass - yield (name, 1) + accept_compiled_re = _list_0_or_more__compiled_re( + element_re=media_range_n_accept_params_re, + ) - def __repr__(self): - return '<%s(%r)>' % (self.__class__.__name__, str(self)) + # For parsing repeated groups within the media type parameters and + # extension parameters segments + parameters_compiled_re = re.compile( + OWS_re + ';' + OWS_re + '(' + token_re + ')=(' + token_re + '|' + + quoted_string_re + ')', + ) + accept_ext_compiled_re = re.compile( + OWS_re + ';' + OWS_re + '(' + token_re + ')' + + '(?:' + + '=(' + + '(?:' + + '(?:' + token_re + ')|(?:' + quoted_string_re + ')' + + ')' + + ')' + + ')?', + ) - def __iter__(self): - for m,q in sorted( - self._parsed_nonzero, - key=lambda i: i[1], - reverse=True - ): - yield m + # For parsing the media types in the `offers` argument to + # .acceptable_offers(), we re-use the media range regex for media types. + # This is not intended to be a validation of the offers; its main purpose + # is to extract the media type and any media type parameters. + media_type_re = media_range_re + media_type_compiled_re = re.compile('^' + media_type_re + '$') - def __str__(self): - result = [] - for mask, quality in self.parsed: - if quality != 1: - mask = '%s;q=%0.*f' % ( - mask, min(len(str(quality).split('.')[1]), 3), quality) - result.append(mask) - return ', '.join(result) - - def __add__(self, other, reversed=False): - if isinstance(other, Accept): - other = other.header_value - if hasattr(other, 'items'): - other = sorted(other.items(), key=lambda item: -item[1]) - if isinstance(other, (list, tuple)): - result = [] - for item in other: - if isinstance(item, (list, tuple)): - name, quality = item - result.append('%s; q=%s' % (name, quality)) - else: - result.append(item) - other = ', '.join(result) - other = str(other) - my_value = self.header_value - if reversed: - other, my_value = my_value, other - if not other: - new_value = my_value - elif not my_value: - new_value = other + @classmethod + def _escape_and_quote_parameter_value(cls, param_value): + """ + Escape and quote parameter value where necessary. + + For media type and extension parameter values. + """ + if param_value == '': + param_value = '""' else: - new_value = my_value + ', ' + other - return self.__class__(new_value) + param_value = param_value.replace('\\', '\\\\').replace( + '"', r'\"', + ) + if not token_compiled_re.match(param_value): + param_value = '"' + param_value + '"' + return param_value - def __radd__(self, other): - return self.__add__(other, True) + @classmethod + def _form_extension_params_segment(cls, extension_params): + """ + Convert iterable of extension parameters to str segment for header. - def __contains__(self, offer): + `extension_params` is an iterable where each item is either a parameter + string or a (name, value) tuple. """ - Returns true if the given object is listed in the accepted - types. + extension_params_segment = '' + for item in extension_params: + try: + extension_params_segment += (';' + item) + except TypeError: + param_name, param_value = item + param_value = cls._escape_and_quote_parameter_value( + param_value=param_value, + ) + extension_params_segment += ( + ';' + param_name + '=' + param_value + ) + return extension_params_segment + + @classmethod + def _form_media_range(cls, type_subtype, media_type_params): """ - for mask, quality in self._parsed_nonzero: - if self._match(mask, offer): - return True + Combine `type_subtype` and `media_type_params` to form a media range. - def quality(self, offer): + `type_subtype` is a ``str``, and `media_type_params` is an iterable of + (parameter name, parameter value) tuples. """ - Return the quality of the given offer. Returns None if there - is no match (not 0). + media_type_params_segment = '' + for param_name, param_value in media_type_params: + param_value = cls._escape_and_quote_parameter_value( + param_value=param_value, + ) + media_type_params_segment += (';' + param_name + '=' + param_value) + return type_subtype + media_type_params_segment + + @classmethod + def _iterable_to_header_element(cls, iterable): """ - bestq = 0 - for mask, q in self.parsed: - if self._match(mask, offer): - bestq = max(bestq, q) - return bestq or None + Convert iterable of tuples into header element ``str``. - def best_match(self, offers, default_match=None): + Each tuple is expected to be in one of two forms: (media_range, qvalue, + extension_params_segment), or (media_range, qvalue). """ - Returns the best match in the sequence of offered types. + try: + media_range, qvalue, extension_params_segment = iterable + except ValueError: + media_range, qvalue = iterable + extension_params_segment = '' - The sequence can be a simple sequence, or you can have - ``(match, server_quality)`` items in the sequence. If you - have these tuples then the client quality is multiplied by the - server_quality to get a total. If two matches have equal - weight, then the one that shows up first in the `offers` list - will be returned. + if qvalue == 1.0: + if extension_params_segment: + element = '{};q=1{}'.format( + media_range, extension_params_segment, + ) + else: + element = media_range + elif qvalue == 0.0: + element = '{};q=0{}'.format(media_range, extension_params_segment) + else: + element = '{};q={}{}'.format( + media_range, qvalue, extension_params_segment, + ) + return element + + @classmethod + def _parse_media_type_params(cls, media_type_params_segment): + """ + Parse media type parameters segment into list of (name, value) tuples. + """ + media_type_params = cls.parameters_compiled_re.findall( + media_type_params_segment, + ) + for index, (name, value) in enumerate(media_type_params): + if value.startswith('"') and value.endswith('"'): + value = cls._process_quoted_string_token(token=value) + media_type_params[index] = (name, value) + return media_type_params - But among matches with the same quality the match to a more specific - requested type will be chosen. For example a match to text/* trumps */*. + @classmethod + def _process_quoted_string_token(cls, token): + """ + Return unescaped and unquoted value from quoted token. + """ + # RFC 7230, section 3.2.6 "Field Value Components": "Recipients that + # process the value of a quoted-string MUST handle a quoted-pair as if + # it were replaced by the octet following the backslash." + return re.sub(r'\\(?![\\])', '', token[1:-1]).replace('\\\\', '\\') - default_match (default None) is returned if there is no intersection. + @classmethod + def _python_value_to_header_str(cls, value): """ - best_quality = -1 - best_offer = default_match - matched_by = '*/*' - for offer in offers: - if isinstance(offer, (tuple, list)): - offer, server_quality = offer + Convert Python value to header string for __add__/__radd__. + """ + if isinstance(value, str): + return value + if hasattr(value, 'items'): + if value == {}: + value = [] else: - server_quality = 1 - for mask, quality in self._parsed_nonzero: - possible_quality = server_quality * quality - if possible_quality < best_quality: - continue - elif possible_quality == best_quality: - # 'text/plain' overrides 'message/*' overrides '*/*' - # (if all match w/ the same q=) - if matched_by.count('*') <= mask.count('*'): - continue - if self._match(mask, offer): - best_quality = possible_quality - best_offer = offer - matched_by = mask - return best_offer + value_list = [] + for media_range, item in value.items(): + # item is either (media range, (qvalue, extension + # parameters segment)), or (media range, qvalue) (supported + # for backward compatibility) + if isinstance(item, (float, int)): + value_list.append((media_range, item, '')) + else: + value_list.append((media_range, item[0], item[1])) + value = sorted( + value_list, + key=lambda item: item[1], # qvalue + reverse=True, + ) + if isinstance(value, (tuple, list)): + header_elements = [] + for item in value: + if isinstance(item, (tuple, list)): + item = cls._iterable_to_header_element(iterable=item) + header_elements.append(item) + header_str = ', '.join(header_elements) + else: + header_str = str(value) + return header_str - def _match(self, mask, offer): - _check_offer(offer) - return mask == '*' or offer.lower() == mask.lower() + @classmethod + def parse(cls, value): + """ + Parse an ``Accept`` header. + + :param value: (``str``) header value + :return: If `value` is a valid ``Accept`` header, returns an iterator + of (*media_range*, *qvalue*, *media_type_params*, + *extension_params*) tuples, as parsed from the header from + left to right. + + | *media_range* is the media range, including any media type + parameters. The media range is returned in a canonicalised + form (except the case of the characters are unchanged): + unnecessary spaces around the semicolons before media type + parameters are removed; the parameter values are returned in + a form where only the '``\``' and '``"``' characters are + escaped, and the values are quoted with double quotes only + if they need to be quoted. + + | *qvalue* is the quality value of the media range. + + | *media_type_params* is the media type parameters, as a list + of (parameter name, value) tuples. + + | *extension_params* is the extension parameters, as a list + where each item is either a parameter string or a (parameter + name, value) tuple. + :raises ValueError: if `value` is an invalid header + """ + # Check if header is valid + # Using Python stdlib's `re` module, there is currently no way to check + # the match *and* get all the groups using the same regex, so we have + # to do this in steps using multiple regexes. + if cls.accept_compiled_re.match(value) is None: + raise ValueError('Invalid value for an Accept header.') + def generator(value): + for match in ( + cls.media_range_n_accept_params_compiled_re.finditer(value) + ): + groups = match.groups() + + type_subtype = groups[0] + + media_type_params = cls._parse_media_type_params( + media_type_params_segment=groups[1], + ) + + media_range = cls._form_media_range( + type_subtype=type_subtype, + media_type_params=media_type_params, + ) + + # qvalue (groups[2]) and extension_params (groups[3]) are both + # None if neither qvalue or extension parameters are found in + # the match. + + qvalue = groups[2] + qvalue = float(qvalue) if qvalue else 1.0 + + extension_params = groups[3] + if extension_params: + extension_params = cls.accept_ext_compiled_re.findall( + extension_params, + ) + for index, (token_key, token_value) in enumerate( + extension_params + ): + if token_value: + if ( + token_value.startswith('"') and + token_value.endswith('"') + ): + token_value = cls._process_quoted_string_token( + token=token_value, + ) + extension_params[index] = ( + token_key, token_value, + ) + else: + extension_params[index] = token_key + else: + extension_params = [] + + yield ( + media_range, qvalue, media_type_params, extension_params, + ) + return generator(value=value) class NilAccept(object): |
