summaryrefslogtreecommitdiff
path: root/Lib/email
diff options
context:
space:
mode:
authorR. David Murray <rdmurray@bitdance.com>2017-12-03 18:51:41 -0500
committerGitHub <noreply@github.com>2017-12-03 18:51:41 -0500
commit85d5c18c9d83a1d54eecc4c2ad4dce63194107c6 (patch)
tree39edcca989e60b95a64e317440057ee45aea8fc0 /Lib/email
parent29ba688034fc4eef0693b86002cf7bee55d692af (diff)
downloadcpython-git-85d5c18c9d83a1d54eecc4c2ad4dce63194107c6.tar.gz
bpo-27240 Rewrite the email header folding algorithm. (#3488)
The original algorithm tried to delegate the folding to the tokens so that those tokens whose folding rules differed could specify the differences. However, this resulted in a lot of duplicated code because most of the rules were the same. The new algorithm moves all folding logic into a set of functions external to the token classes, but puts the information about which tokens can be folded in which ways on the tokens...with the exception of mime-parameters, which are a special case (which was not even implemented in the old folder). This algorithm can still probably be improved and hopefully simplified somewhat. Note that some of the test expectations are changed. I believe the changes are toward more desirable and consistent behavior: in general when (re) folding a line the canonical version of the tokens is generated, rather than preserving errors or extra whitespace.
Diffstat (limited to 'Lib/email')
-rw-r--r--Lib/email/_header_value_parser.py729
-rw-r--r--Lib/email/headerregistry.py9
2 files changed, 292 insertions, 446 deletions
diff --git a/Lib/email/_header_value_parser.py b/Lib/email/_header_value_parser.py
index b4737c806e..b34c58bf85 100644
--- a/Lib/email/_header_value_parser.py
+++ b/Lib/email/_header_value_parser.py
@@ -97,96 +97,14 @@ def quote_string(value):
return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"'
#
-# Accumulator for header folding
-#
-
-class _Folded:
-
- def __init__(self, maxlen, policy):
- self.maxlen = maxlen
- self.policy = policy
- self.lastlen = 0
- self.stickyspace = None
- self.firstline = True
- self.done = []
- self.current = []
-
- def newline(self):
- self.done.extend(self.current)
- self.done.append(self.policy.linesep)
- self.current.clear()
- self.lastlen = 0
-
- def finalize(self):
- if self.current:
- self.newline()
-
- def __str__(self):
- return ''.join(self.done)
-
- def append(self, stoken):
- self.current.append(stoken)
-
- def append_if_fits(self, token, stoken=None):
- if stoken is None:
- stoken = str(token)
- l = len(stoken)
- if self.stickyspace is not None:
- stickyspace_len = len(self.stickyspace)
- if self.lastlen + stickyspace_len + l <= self.maxlen:
- self.current.append(self.stickyspace)
- self.lastlen += stickyspace_len
- self.current.append(stoken)
- self.lastlen += l
- self.stickyspace = None
- self.firstline = False
- return True
- if token.has_fws:
- ws = token.pop_leading_fws()
- if ws is not None:
- self.stickyspace += str(ws)
- stickyspace_len += len(ws)
- token._fold(self)
- return True
- if stickyspace_len and l + 1 <= self.maxlen:
- margin = self.maxlen - l
- if 0 < margin < stickyspace_len:
- trim = stickyspace_len - margin
- self.current.append(self.stickyspace[:trim])
- self.stickyspace = self.stickyspace[trim:]
- stickyspace_len = trim
- self.newline()
- self.current.append(self.stickyspace)
- self.current.append(stoken)
- self.lastlen = l + stickyspace_len
- self.stickyspace = None
- self.firstline = False
- return True
- if not self.firstline:
- self.newline()
- self.current.append(self.stickyspace)
- self.current.append(stoken)
- self.stickyspace = None
- self.firstline = False
- return True
- if self.lastlen + l <= self.maxlen:
- self.current.append(stoken)
- self.lastlen += l
- return True
- if l < self.maxlen:
- self.newline()
- self.current.append(stoken)
- self.lastlen = l
- return True
- return False
-
-#
# TokenList and its subclasses
#
class TokenList(list):
token_type = None
+ syntactic_break = True
+ ew_combine_allowed = True
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
@@ -207,84 +125,13 @@ class TokenList(list):
def all_defects(self):
return sum((x.all_defects for x in self), self.defects)
- #
- # Folding API
- #
- # parts():
- #
- # return a list of objects that constitute the "higher level syntactic
- # objects" specified by the RFC as the best places to fold a header line.
- # The returned objects must include leading folding white space, even if
- # this means mutating the underlying parse tree of the object. Each object
- # is only responsible for returning *its* parts, and should not drill down
- # to any lower level except as required to meet the leading folding white
- # space constraint.
- #
- # _fold(folded):
- #
- # folded: the result accumulator. This is an instance of _Folded.
- # (XXX: I haven't finished factoring this out yet, the folding code
- # pretty much uses this as a state object.) When the folded.current
- # contains as much text as will fit, the _fold method should call
- # folded.newline.
- # folded.lastlen: the current length of the test stored in folded.current.
- # folded.maxlen: The maximum number of characters that may appear on a
- # folded line. Differs from the policy setting in that "no limit" is
- # represented by +inf, which means it can be used in the trivially
- # logical fashion in comparisons.
- #
- # Currently no subclasses implement parts, and I think this will remain
- # true. A subclass only needs to implement _fold when the generic version
- # isn't sufficient. _fold will need to be implemented primarily when it is
- # possible for encoded words to appear in the specialized token-list, since
- # there is no generic algorithm that can know where exactly the encoded
- # words are allowed. A _fold implementation is responsible for filling
- # lines in the same general way that the top level _fold does. It may, and
- # should, call the _fold method of sub-objects in a similar fashion to that
- # of the top level _fold.
- #
- # XXX: I'm hoping it will be possible to factor the existing code further
- # to reduce redundancy and make the logic clearer.
-
- @property
- def parts(self):
- klass = self.__class__
- this = []
- for token in self:
- if token.startswith_fws():
- if this:
- yield this[0] if len(this)==1 else klass(this)
- this.clear()
- end_ws = token.pop_trailing_ws()
- this.append(token)
- if end_ws:
- yield klass(this)
- this = [end_ws]
- if this:
- yield this[0] if len(this)==1 else klass(this)
-
def startswith_fws(self):
return self[0].startswith_fws()
- def pop_leading_fws(self):
- if self[0].token_type == 'fws':
- return self.pop(0)
- return self[0].pop_leading_fws()
-
- def pop_trailing_ws(self):
- if self[-1].token_type == 'cfws':
- return self.pop(-1)
- return self[-1].pop_trailing_ws()
-
@property
- def has_fws(self):
- for part in self:
- if part.has_fws:
- return True
- return False
-
- def has_leading_comment(self):
- return self[0].has_leading_comment()
+ def as_ew_allowed(self):
+ """True if all top level tokens of this part may be RFC2047 encoded."""
+ return all(part.as_ew_allowed for part in self)
@property
def comments(self):
@@ -294,69 +141,13 @@ class TokenList(list):
return comments
def fold(self, *, policy):
- # max_line_length 0/None means no limit, ie: infinitely long.
- maxlen = policy.max_line_length or float("+inf")
- folded = _Folded(maxlen, policy)
- self._fold(folded)
- folded.finalize()
- return str(folded)
-
- def as_encoded_word(self, charset):
- # This works only for things returned by 'parts', which include
- # the leading fws, if any, that should be used.
- res = []
- ws = self.pop_leading_fws()
- if ws:
- res.append(ws)
- trailer = self.pop(-1) if self[-1].token_type=='fws' else ''
- res.append(_ew.encode(str(self), charset))
- res.append(trailer)
- return ''.join(res)
-
- def cte_encode(self, charset, policy):
- res = []
- for part in self:
- res.append(part.cte_encode(charset, policy))
- return ''.join(res)
-
- def _fold(self, folded):
- encoding = 'utf-8' if folded.policy.utf8 else 'ascii'
- for part in self.parts:
- tstr = str(part)
- tlen = len(tstr)
- try:
- str(part).encode(encoding)
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- # XXX: this should be a policy setting when utf8 is False.
- charset = 'utf-8'
- tstr = part.cte_encode(charset, folded.policy)
- tlen = len(tstr)
- if folded.append_if_fits(part, tstr):
- continue
- # Peel off the leading whitespace if any and make it sticky, to
- # avoid infinite recursion.
- ws = part.pop_leading_fws()
- if ws is not None:
- folded.stickyspace = str(ws)
- if folded.append_if_fits(part):
- continue
- if part.has_fws:
- part._fold(folded)
- continue
- # There are no fold points in this one; it is too long for a single
- # line and can't be split...we just have to put it on its own line.
- folded.append(tstr)
- folded.newline()
+ return _refold_parse_tree(self, policy=policy)
def pprint(self, indent=''):
- print('\n'.join(self._pp(indent='')))
+ print(self.ppstr(indent=indent))
def ppstr(self, indent=''):
- return '\n'.join(self._pp(indent=''))
+ return '\n'.join(self._pp(indent=indent))
def _pp(self, indent=''):
yield '{}{}/{}('.format(
@@ -391,173 +182,11 @@ class UnstructuredTokenList(TokenList):
token_type = 'unstructured'
- def _fold(self, folded):
- last_ew = None
- encoding = 'utf-8' if folded.policy.utf8 else 'ascii'
- for part in self.parts:
- tstr = str(part)
- is_ew = False
- try:
- str(part).encode(encoding)
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- charset = 'utf-8'
- if last_ew is not None:
- # We've already done an EW, combine this one with it
- # if there's room.
- chunk = get_unstructured(
- ''.join(folded.current[last_ew:]+[tstr])).as_encoded_word(charset)
- oldlastlen = sum(len(x) for x in folded.current[:last_ew])
- schunk = str(chunk)
- lchunk = len(schunk)
- if oldlastlen + lchunk <= folded.maxlen:
- del folded.current[last_ew:]
- folded.append(schunk)
- folded.lastlen = oldlastlen + lchunk
- continue
- tstr = part.as_encoded_word(charset)
- is_ew = True
- if folded.append_if_fits(part, tstr):
- if is_ew:
- last_ew = len(folded.current) - 1
- continue
- if is_ew or last_ew:
- # It's too big to fit on the line, but since we've
- # got encoded words we can use encoded word folding.
- part._fold_as_ew(folded)
- continue
- # Peel off the leading whitespace if any and make it sticky, to
- # avoid infinite recursion.
- ws = part.pop_leading_fws()
- if ws is not None:
- folded.stickyspace = str(ws)
- if folded.append_if_fits(part):
- continue
- if part.has_fws:
- part._fold(folded)
- continue
- # It can't be split...we just have to put it on its own line.
- folded.append(tstr)
- folded.newline()
- last_ew = None
-
- def cte_encode(self, charset, policy):
- res = []
- last_ew = None
- for part in self:
- spart = str(part)
- try:
- spart.encode('us-ascii')
- res.append(spart)
- except UnicodeEncodeError:
- if last_ew is None:
- res.append(part.cte_encode(charset, policy))
- last_ew = len(res)
- else:
- tl = get_unstructured(''.join(res[last_ew:] + [spart]))
- res.append(tl.as_encoded_word(charset))
- return ''.join(res)
-
class Phrase(TokenList):
token_type = 'phrase'
- def _fold(self, folded):
- # As with Unstructured, we can have pure ASCII with or without
- # surrogateescape encoded bytes, or we could have unicode. But this
- # case is more complicated, since we have to deal with the various
- # sub-token types and how they can be composed in the face of
- # unicode-that-needs-CTE-encoding, and the fact that if a token a
- # comment that becomes a barrier across which we can't compose encoded
- # words.
- last_ew = None
- encoding = 'utf-8' if folded.policy.utf8 else 'ascii'
- for part in self.parts:
- tstr = str(part)
- tlen = len(tstr)
- has_ew = False
- try:
- str(part).encode(encoding)
- except UnicodeEncodeError:
- if any(isinstance(x, errors.UndecodableBytesDefect)
- for x in part.all_defects):
- charset = 'unknown-8bit'
- else:
- charset = 'utf-8'
- if last_ew is not None and not part.has_leading_comment():
- # We've already done an EW, let's see if we can combine
- # this one with it. The last_ew logic ensures that all we
- # have at this point is atoms, no comments or quoted
- # strings. So we can treat the text between the last
- # encoded word and the content of this token as
- # unstructured text, and things will work correctly. But
- # we have to strip off any trailing comment on this token
- # first, and if it is a quoted string we have to pull out
- # the content (we're encoding it, so it no longer needs to
- # be quoted).
- if part[-1].token_type == 'cfws' and part.comments:
- remainder = part.pop(-1)
- else:
- remainder = ''
- for i, token in enumerate(part):
- if token.token_type == 'bare-quoted-string':
- part[i] = UnstructuredTokenList(token[:])
- chunk = get_unstructured(
- ''.join(folded.current[last_ew:]+[tstr])).as_encoded_word(charset)
- schunk = str(chunk)
- lchunk = len(schunk)
- if last_ew + lchunk <= folded.maxlen:
- del folded.current[last_ew:]
- folded.append(schunk)
- folded.lastlen = sum(len(x) for x in folded.current)
- continue
- tstr = part.as_encoded_word(charset)
- tlen = len(tstr)
- has_ew = True
- if folded.append_if_fits(part, tstr):
- if has_ew and not part.comments:
- last_ew = len(folded.current) - 1
- elif part.comments or part.token_type == 'quoted-string':
- # If a comment is involved we can't combine EWs. And if a
- # quoted string is involved, it's not worth the effort to
- # try to combine them.
- last_ew = None
- continue
- part._fold(folded)
-
- def cte_encode(self, charset, policy):
- res = []
- last_ew = None
- is_ew = False
- for part in self:
- spart = str(part)
- try:
- spart.encode('us-ascii')
- res.append(spart)
- except UnicodeEncodeError:
- is_ew = True
- if last_ew is None:
- if not part.comments:
- last_ew = len(res)
- res.append(part.cte_encode(charset, policy))
- elif not part.has_leading_comment():
- if part[-1].token_type == 'cfws' and part.comments:
- remainder = part.pop(-1)
- else:
- remainder = ''
- for i, token in enumerate(part):
- if token.token_type == 'bare-quoted-string':
- part[i] = UnstructuredTokenList(token[:])
- tl = get_unstructured(''.join(res[last_ew:] + [spart]))
- res[last_ew:] = [tl.as_encoded_word(charset)]
- if part.comments or (not is_ew and part.token_type == 'quoted-string'):
- last_ew = None
- return ''.join(res)
-
class Word(TokenList):
token_type = 'word'
@@ -567,9 +196,6 @@ class CFWSList(WhiteSpaceTokenList):
token_type = 'cfws'
- def has_leading_comment(self):
- return bool(self.comments)
-
class Atom(TokenList):
@@ -579,6 +205,7 @@ class Atom(TokenList):
class Token(TokenList):
token_type = 'token'
+ encode_as_ew = False
class EncodedWord(TokenList):
@@ -588,13 +215,6 @@ class EncodedWord(TokenList):
charset = None
lang = None
- @property
- def encoded(self):
- if self.cte is not None:
- return self.cte
- _ew.encode(str(self), self.charset)
-
-
class QuotedString(TokenList):
@@ -865,6 +485,7 @@ class InvalidMailbox(TokenList):
class Domain(TokenList):
token_type = 'domain'
+ as_ew_allowed = False
@property
def domain(self):
@@ -879,11 +500,13 @@ class DotAtom(TokenList):
class DotAtomText(TokenList):
token_type = 'dot-atom-text'
+ as_ew_allowed = True
class AddrSpec(TokenList):
token_type = 'addr-spec'
+ as_ew_allowed = False
@property
def local_part(self):
@@ -916,11 +539,13 @@ class AddrSpec(TokenList):
class ObsLocalPart(TokenList):
token_type = 'obs-local-part'
+ as_ew_allowed = False
class DisplayName(Phrase):
token_type = 'display-name'
+ ew_combine_allowed = False
@property
def display_name(self):
@@ -960,6 +585,7 @@ class DisplayName(Phrase):
class LocalPart(TokenList):
token_type = 'local-part'
+ as_ew_allowed = False
@property
def value(self):
@@ -995,6 +621,7 @@ class LocalPart(TokenList):
class DomainLiteral(TokenList):
token_type = 'domain-literal'
+ as_ew_allowed = False
@property
def domain(self):
@@ -1081,6 +708,7 @@ class Value(TokenList):
class MimeParameters(TokenList):
token_type = 'mime-parameters'
+ syntactic_break = False
@property
def params(self):
@@ -1165,6 +793,10 @@ class MimeParameters(TokenList):
class ParameterizedHeaderValue(TokenList):
+ # Set this false so that the value doesn't wind up on a new line even
+ # if it and the parameters would fit there but not on the first line.
+ syntactic_break = False
+
@property
def params(self):
for token in reversed(self):
@@ -1172,18 +804,11 @@ class ParameterizedHeaderValue(TokenList):
return token.params
return {}
- @property
- def parts(self):
- if self and self[-1].token_type == 'mime-parameters':
- # We don't want to start a new line if all of the params don't fit
- # after the value, so unwrap the parameter list.
- return TokenList(self[:-1] + self[-1])
- return TokenList(self).parts
-
class ContentType(ParameterizedHeaderValue):
token_type = 'content-type'
+ as_ew_allowed = False
maintype = 'text'
subtype = 'plain'
@@ -1191,40 +816,27 @@ class ContentType(ParameterizedHeaderValue):
class ContentDisposition(ParameterizedHeaderValue):
token_type = 'content-disposition'
+ as_ew_allowed = False
content_disposition = None
class ContentTransferEncoding(TokenList):
token_type = 'content-transfer-encoding'
+ as_ew_allowed = False
cte = '7bit'
class HeaderLabel(TokenList):
token_type = 'header-label'
+ as_ew_allowed = False
class Header(TokenList):
token_type = 'header'
- def _fold(self, folded):
- folded.append(str(self.pop(0)))
- folded.lastlen = len(folded.current[0])
- # The first line of the header is different from all others: we don't
- # want to start a new object on a new line if it has any fold points in
- # it that would allow part of it to be on the first header line.
- # Further, if the first fold point would fit on the new line, we want
- # to do that, but if it doesn't we want to put it on the first line.
- # Folded supports this via the stickyspace attribute. If this
- # attribute is not None, it does the special handling.
- folded.stickyspace = str(self.pop(0)) if self[0].token_type == 'cfws' else ''
- rest = self.pop(0)
- if self:
- raise ValueError("Malformed Header token list")
- rest._fold(folded)
-
#
# Terminal classes and instances
@@ -1232,6 +844,10 @@ class Header(TokenList):
class Terminal(str):
+ as_ew_allowed = True
+ ew_combine_allowed = True
+ syntactic_break = True
+
def __new__(cls, value, token_type):
self = super().__new__(cls, value)
self.token_type = token_type
@@ -1241,6 +857,9 @@ class Terminal(str):
def __repr__(self):
return "{}({})".format(self.__class__.__name__, super().__repr__())
+ def pprint(self):
+ print(self.__class__.__name__ + '/' + self.token_type)
+
@property
def all_defects(self):
return list(self.defects)
@@ -1254,29 +873,14 @@ class Terminal(str):
'' if not self.defects else ' {}'.format(self.defects),
)]
- def cte_encode(self, charset, policy):
- value = str(self)
- try:
- value.encode('us-ascii')
- return value
- except UnicodeEncodeError:
- return _ew.encode(value, charset)
-
def pop_trailing_ws(self):
# This terminates the recursion.
return None
- def pop_leading_fws(self):
- # This terminates the recursion.
- return None
-
@property
def comments(self):
return []
- def has_leading_comment(self):
- return False
-
def __getnewargs__(self):
return(str(self), self.token_type)
@@ -1290,8 +894,6 @@ class WhiteSpaceTerminal(Terminal):
def startswith_fws(self):
return True
- has_fws = True
-
class ValueTerminal(Terminal):
@@ -1302,11 +904,6 @@ class ValueTerminal(Terminal):
def startswith_fws(self):
return False
- has_fws = False
-
- def as_encoded_word(self, charset):
- return _ew.encode(str(self), charset)
-
class EWWhiteSpaceTerminal(WhiteSpaceTerminal):
@@ -1314,15 +911,9 @@ class EWWhiteSpaceTerminal(WhiteSpaceTerminal):
def value(self):
return ''
- @property
- def encoded(self):
- return self[:]
-
def __str__(self):
return ''
- has_fws = True
-
# XXX these need to become classes and used as instances so
# that a program can't change them in a parse tree and screw
@@ -2751,7 +2342,7 @@ def get_parameter(value):
if value[0] != "'":
raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
"delimiter, but found {!r}".format(value))
- appendto.append(ValueTerminal("'", 'RFC2231 delimiter'))
+ appendto.append(ValueTerminal("'", 'RFC2231-delimiter'))
value = value[1:]
if value and value[0] != "'":
token, value = get_attrtext(value)
@@ -2760,7 +2351,7 @@ def get_parameter(value):
if not value or value[0] != "'":
raise errors.HeaderParseError("Expected RFC2231 char/lang encoding "
"delimiter, but found {}".format(value))
- appendto.append(ValueTerminal("'", 'RFC2231 delimiter'))
+ appendto.append(ValueTerminal("'", 'RFC2231-delimiter'))
value = value[1:]
if remainder is not None:
# Treat the rest of value as bare quoted string content.
@@ -2965,3 +2556,255 @@ def parse_content_transfer_encoding_header(value):
token, value = get_phrase(value)
cte_header.append(token)
return cte_header
+
+
+#
+# Header folding
+#
+# Header folding is complex, with lots of rules and corner cases. The
+# following code does its best to obey the rules and handle the corner
+# cases, but you can be sure there are few bugs:)
+#
+# This folder generally canonicalizes as it goes, preferring the stringified
+# version of each token. The tokens contain information that supports the
+# folder, including which tokens can be encoded in which ways.
+#
+# Folded text is accumulated in a simple list of strings ('lines'), each
+# one of which should be less than policy.max_line_length ('maxlen').
+#
+
+def _steal_trailing_WSP_if_exists(lines):
+ wsp = ''
+ if lines and lines[-1] and lines[-1][-1] in WSP:
+ wsp = lines[-1][-1]
+ lines[-1] = lines[-1][:-1]
+ return wsp
+
+def _refold_parse_tree(parse_tree, *, policy):
+ """Return string of contents of parse_tree folded according to RFC rules.
+
+ """
+ # max_line_length 0/None means no limit, ie: infinitely long.
+ maxlen = policy.max_line_length or float("+inf")
+ encoding = 'utf-8' if policy.utf8 else 'us-ascii'
+ lines = ['']
+ last_ew = None
+ wrap_as_ew_blocked = 0
+ want_encoding = False
+ end_ew_not_allowed = Terminal('', 'wrap_as_ew_blocked')
+ parts = list(parse_tree)
+ while parts:
+ part = parts.pop(0)
+ if part is end_ew_not_allowed:
+ wrap_as_ew_blocked -= 1
+ continue
+ tstr = str(part)
+ try:
+ tstr.encode(encoding)
+ charset = encoding
+ except UnicodeEncodeError:
+ if any(isinstance(x, errors.UndecodableBytesDefect)
+ for x in part.all_defects):
+ charset = 'unknown-8bit'
+ else:
+ # If policy.utf8 is false this should really be taken from a
+ # 'charset' property on the policy.
+ charset = 'utf-8'
+ want_encoding = True
+ if part.token_type == 'mime-parameters':
+ # Mime parameter folding (using RFC2231) is extra special.
+ _fold_mime_parameters(part, lines, maxlen, encoding)
+ continue
+ if want_encoding and not wrap_as_ew_blocked:
+ if not part.as_ew_allowed:
+ want_encoding = False
+ last_ew = None
+ if part.syntactic_break:
+ encoded_part = part.fold(policy=policy)[:-1] # strip nl
+ if policy.linesep not in encoded_part:
+ # It fits on a single line
+ if len(encoded_part) > maxlen - len(lines[-1]):
+ # But not on this one, so start a new one.
+ newline = _steal_trailing_WSP_if_exists(lines)
+ # XXX what if encoded_part has no leading FWS?
+ lines.append(newline)
+ lines[-1] += encoded_part
+ continue
+ # Either this is not a major syntactic break, so we don't
+ # want it on a line by itself even if it fits, or it
+ # doesn't fit on a line by itself. Either way, fall through
+ # to unpacking the subparts and wrapping them.
+ if not hasattr(part, 'encode'):
+ # It's not a Terminal, do each piece individually.
+ parts = list(part) + parts
+ else:
+ # It's a terminal, wrap it as an encoded word, possibly
+ # combining it with previously encoded words if allowed.
+ last_ew = _fold_as_ew(tstr, lines, maxlen, last_ew,
+ part.ew_combine_allowed, charset)
+ want_encoding = False
+ continue
+ if len(tstr) <= maxlen - len(lines[-1]):
+ lines[-1] += tstr
+ continue
+ # This part is too long to fit. The RFC wants us to break at
+ # "major syntactic breaks", so unless we don't consider this
+ # to be one, check if it will fit on the next line by itself.
+ if (part.syntactic_break and
+ len(tstr) + 1 <= maxlen):
+ newline = _steal_trailing_WSP_if_exists(lines)
+ if newline or part.startswith_fws():
+ lines.append(newline + tstr)
+ continue
+ if not hasattr(part, 'encode'):
+ # It's not a terminal, try folding the subparts.
+ newparts = list(part)
+ if not part.as_ew_allowed:
+ wrap_as_ew_blocked += 1
+ newparts.append(end_ew_not_allowed)
+ parts = newparts + parts
+ continue
+ if part.as_ew_allowed and not wrap_as_ew_blocked:
+ # It doesn't need CTE encoding, but encode it anyway so we can
+ # wrap it.
+ parts.insert(0, part)
+ want_encoding = True
+ continue
+ # We can't figure out how to wrap, it, so give up.
+ newline = _steal_trailing_WSP_if_exists(lines)
+ if newline or part.startswith_fws():
+ lines.append(newline + tstr)
+ else:
+ # We can't fold it onto the next line either...
+ lines[-1] += tstr
+ return policy.linesep.join(lines) + policy.linesep
+
+def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset):
+ """Fold string to_encode into lines as encoded word, combining if allowed.
+ Return the new value for last_ew, or None if ew_combine_allowed is False.
+
+ If there is already an encoded word in the last line of lines (indicated by
+ a non-None value for last_ew) and ew_combine_allowed is true, decode the
+ existing ew, combine it with to_encode, and re-encode. Otherwise, encode
+ to_encode. In either case, split to_encode as necessary so that the
+ encoded segments fit within maxlen.
+
+ """
+ if last_ew is not None and ew_combine_allowed:
+ to_encode = str(
+ get_unstructured(lines[-1][last_ew:] + to_encode))
+ lines[-1] = lines[-1][:last_ew]
+ if to_encode[0] in WSP:
+ # We're joining this to non-encoded text, so don't encode
+ # the leading blank.
+ leading_wsp = to_encode[0]
+ to_encode = to_encode[1:]
+ if (len(lines[-1]) == maxlen):
+ lines.append(_steal_trailing_WSP_if_exists(lines))
+ lines[-1] += leading_wsp
+ trailing_wsp = ''
+ if to_encode[-1] in WSP:
+ # Likewise for the trailing space.
+ trailing_wsp = to_encode[-1]
+ to_encode = to_encode[:-1]
+ new_last_ew = len(lines[-1]) if last_ew is None else last_ew
+ while to_encode:
+ remaining_space = maxlen - len(lines[-1])
+ # The RFC2047 chrome takes up 7 characters plus the length
+ # of the charset name.
+ encode_as = 'utf-8' if charset == 'us-ascii' else charset
+ text_space = remaining_space - len(encode_as) - 7
+ if text_space <= 0:
+ lines.append(' ')
+ # XXX We'll get an infinite loop here if maxlen is <= 7
+ continue
+ first_part = to_encode[:text_space]
+ ew = _ew.encode(first_part, charset=encode_as)
+ excess = len(ew) - remaining_space
+ if excess > 0:
+ # encode always chooses the shortest encoding, so this
+ # is guaranteed to fit at this point.
+ first_part = first_part[:-excess]
+ ew = _ew.encode(first_part)
+ lines[-1] += ew
+ to_encode = to_encode[len(first_part):]
+ if to_encode:
+ lines.append(' ')
+ new_last_ew = len(lines[-1])
+ lines[-1] += trailing_wsp
+ return new_last_ew if ew_combine_allowed else None
+
+def _fold_mime_parameters(part, lines, maxlen, encoding):
+ """Fold TokenList 'part' into the 'lines' list as mime parameters.
+
+ Using the decoded list of parameters and values, format them according to
+ the RFC rules, including using RFC2231 encoding if the value cannot be
+ expressed in 'encoding' and/or the paramter+value is too long to fit within
+ 'maxlen'.
+
+ """
+ # Special case for RFC2231 encoding: start from decoded values and use
+ # RFC2231 encoding iff needed.
+ #
+ # Note that the 1 and 2s being added to the length calculations are
+ # accounting for the possibly-needed spaces and semicolons we'll be adding.
+ #
+ for name, value in part.params:
+ # XXX What if this ';' puts us over maxlen the first time through the
+ # loop? We should split the header value onto a newline in that case,
+ # but to do that we need to recognize the need earlier or reparse the
+ # header, so I'm going to ignore that bug for now. It'll only put us
+ # one character over.
+ if not lines[-1].rstrip().endswith(';'):
+ lines[-1] += ';'
+ charset = encoding
+ error_handler = 'strict'
+ try:
+ value.encode(encoding)
+ encoding_required = False
+ except UnicodeEncodeError:
+ encoding_required = True
+ if utils._has_surrogates(value):
+ charset = 'unknown-8bit'
+ error_handler = 'surrogateescape'
+ else:
+ charset = 'utf-8'
+ if encoding_required:
+ encoded_value = urllib.parse.quote(
+ value, safe='', errors=error_handler)
+ tstr = "{}*={}''{}".format(name, charset, encoded_value)
+ else:
+ tstr = '{}={}'.format(name, quote_string(value))
+ if len(lines[-1]) + len(tstr) + 1 < maxlen:
+ lines[-1] = lines[-1] + ' ' + tstr
+ continue
+ elif len(tstr) + 2 <= maxlen:
+ lines.append(' ' + tstr)
+ continue
+ # We need multiple sections. We are allowed to mix encoded and
+ # non-encoded sections, but we aren't going to. We'll encode them all.
+ section = 0
+ extra_chrome = charset + "''"
+ while value:
+ chrome_len = len(name) + len(str(section)) + 3 + len(extra_chrome)
+ if maxlen <= chrome_len + 3:
+ # We need room for the leading blank, the trailing semicolon,
+ # and at least one character of the value. If we don't
+ # have that, we'd be stuck, so in that case fall back to
+ # the RFC standard width.
+ maxlen = 78
+ splitpoint = maxchars = maxlen - chrome_len - 2
+ while True:
+ partial = value[:splitpoint]
+ encoded_value = urllib.parse.quote(
+ partial, safe='', errors=error_handler)
+ if len(encoded_value) <= maxchars:
+ break
+ splitpoint -= 1
+ lines.append(" {}*{}*={}{}".format(
+ name, section, extra_chrome, encoded_value))
+ extra_chrome = ''
+ section += 1
+ value = value[splitpoint:]
+ if value:
+ lines[-1] += ';'
diff --git a/Lib/email/headerregistry.py b/Lib/email/headerregistry.py
index 81fee146dc..00652049f2 100644
--- a/Lib/email/headerregistry.py
+++ b/Lib/email/headerregistry.py
@@ -245,13 +245,16 @@ class BaseHeader(str):
the header name and the ': ' separator.
"""
- # At some point we need to only put fws here if it was in the source.
+ # At some point we need to put fws here iif it was in the source.
header = parser.Header([
parser.HeaderLabel([
parser.ValueTerminal(self.name, 'header-name'),
parser.ValueTerminal(':', 'header-sep')]),
- parser.CFWSList([parser.WhiteSpaceTerminal(' ', 'fws')]),
- self._parse_tree])
+ ])
+ if self._parse_tree:
+ header.append(
+ parser.CFWSList([parser.WhiteSpaceTerminal(' ', 'fws')]))
+ header.append(self._parse_tree)
return header.fold(policy=policy)